[Apache, Big Data, Hadoop, Data Engineering] Архитектура непрерывной потоковой доставки в Cloudera Flow Management (перевод)
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Cloudera Flow Management, основанная на Apache NiFi и являющаяся частью платформы Cloudera DataFlow, используется некоторыми из крупнейших организаций в мире для обеспечения простого в использовании, мощного и надежного способа распределения и высокоскоростной обработки данных в современной экосистеме больших данных. Клиенты все чаще используют CFM для ускорения обработки потоковых данных на предприятии от концепции до реализации. Интерфейс разработки потоков Cloudera отличается от типичных стилей структурированного кодирования, что часто создает проблему применения лучших практик непрерывного совершенствования/непрерывной доставки (CI/CD) в стиле DevOps для доставки потоков.В этом блоге мы рассмотрим сквозной жизненный цикл процесса потока данных, который способствует непрерывной доставке с минимальным временем простоя. Мы надеемся, что вы почувствуете вдохновение, чтобы внедрить некоторые из этих идей в свои собственные процессы Cloudera Flow Management CD. Ниже представлена архитектура подобного решения:
Архитектура непрерывной потоковой доставки В этом процессе можно выделить следующие шаги.1.Разработка Разработчики проектируют DataFlow и тестируют его в общем многопользовательском DEV кластере. Команда может работать только в своих собственных группах процессов, контролируемых политиками Apache Ranger для NiFi. Если какие-либо группы потребуют изменить или добавить службу корневого контроллера, администратор поможет с этим изменением и скопирует его в выделенную группу процессов root_controller_services. 2. Контроль версий Для контроля версий разработчики вручную добавляют или обновляют протестированные потоки в DEV раздел NiFi Registry. При заданных политиках в Apache Ranger для NiFi Registry одна команда может наблюдать только за назначенным ей сегментом. Администратор обновляет версию группы процессов root_controller_services в сегменте администратора реестра. 3. Запрос Source Change Чтобы разработчики не переносили потоки, которые все еще проходят тестирование, в среду более высокого уровня, они должны создать файл запроса на изменение (source change) для коллегиальной проверки исходника после тестирования потоков. Этот файл находится в проекте git изменений исходника. Разработчики создают новую ветку этого проекта git, а затем обновляют файл source_change_request.json, чтобы он содержал измененные потоки и версии. Если какие-то службы корневого контроллера были обновлены, администратор обновит версию группы процессов root_controller_services в файле source_change_request.json.Как только все они будут готовы, владелец изменения должен дать запрос git pull, чтобы выполнить Peer Review.
{
"env": "dev",
"nifi_api_url": "https://nifi-dev.example.cloudera.com:9091/nifi-api",
"reg_api_url": "https://registry-dev.example.cloudera.com:61443/nifi-registry-api",
"reg_client_name": "registry-dev.example.cloudera.com",
"buckets": [
{
"name": "teama_bucket",
"flows": [
{"name": "flowa1_feeding", "version": "2"},
{"name": "flowa1_digestion", "version": "2"},
]
},
{
"name": "admin_bucket",
"flows": [
{"name": "root_controller_services", "version": "1"}
]
}
]
}
Пример source_change_request.json 4. Peer ReviewЧлены команды рецензируют код, проверяя статус потока и тестируя в DEV NiFi. 5. Утверждение изменений Source ChangeЕсли команда подтверждает, что изменение готово к продвижению, они могут объединить этот запрос с главной ветвью проекта изменения исходника. 6. Продвижение на более высокий уровень Cloudera предлагает создать задание Jenkins для запуска скрипта python Promo_source_changes.py, отслеживая главную ветвь проекта изменения исходника. Этот скрипт использует NiFi / NiFi Registry API для экспорта версии потока из DEV NiFi Registry, а затем импортирует ее в UAT NiFi Registry. Пример кода:
for bucket in source_env_conf['buckets']:
login_env(source_env_conf)
source_bucket = nipyapi.versioning.get_registry_bucket(bucket['name'])
for flow in bucket['flows'][:]:
# Check out a flow from Source NiFi Registry
login_env(source_env_conf)
source_versioned_flow = nipyapi.versioning.get_flow_in_bucket(source_bucket.identifier, identifier=flow['name'])
exported_flow = nipyapi.versioning.export_flow_version(bucket_id=source_bucket.identifier, flow_id=source_versioned_flow.identifier, version=flow['version'], mode='yaml')
# Version control this flow in Dest NiFi Registry
login_env(dest_env_conf)
dest_bucket = nipyapi.versioning.get_registry_bucket(dest_env_conf['buckets'][0])
dest_flow = nipyapi.versioning.get_flow_in_bucket(bucket_id=dest_bucket.identifier, identifier=flow['name'])
if dest_flow is None:
imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_name=flow['name'])
else:
imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_id=dest_flow.identifier)
log.info("Flow %s Version %s is imported from ENV %s into ENV %s.", flow['name'], flow['version'], source_env_conf['env'], dest_env_conf['env'])
# Remove promoted flow from the json
if flow in bucket['flows']:
bucket['flows'].remove(flow)
log.info("All %d flows in %s bucket %s are imported into %s bucket %s.", len(bucket['flows']), source_env_conf['env'], bucket['name'], dest_env_conf['env'], dest_env_conf['buckets'][0])
Пример promote_source_changes.py 7. Запрос Destination Change В реестре UAT NiFi скрипт generate_dest_change_request.sh зарегистрирован как перехватчик событий реестра.
<eventHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
<property name="Script Path">/var/cloudera/flow_cd/generate_dest_change_request.sh</property>
<property name="Working Directory">/var/cloudera/flow_cd/</property>
<property name="Whitelisted Event Type 1">CREATE_FLOW_VERSION</property>
</eventHookProvider>
Конфигурация NiFi Registry Event Hook
CREATE_FLOW_VERSION feeb0fbe-5d7e-4363-b58b-142fa80775e1
1a0b614c-3d0f-471a-b6b1-645e6091596d 4 flow_cd Update-Attributes
Пример события CREATE_FLOW_VERSION Это эквивалентно следующему:
CREATE_FLOW_VERSION [BUCKET_ID=5d81dc5e-79e1-4387-8022-79e505f5e3a0,
FLOW_ID=a89bf6b7-41f9-4a96-86d4-0aeb3c3c25be, VERSION=4, USER=flow_cd,
COMMENT=Update-Attributes]
Любое обновление версии отдельного потока запускает сценарий для создания новой ветви проекта git запроса на изменение цели и создания файла dest_change_request.json путем сравнения новой и основной версии.
{
"env":"uat",
"nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api",
"reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api",
"reg_client_name":"registry-uat.example.cloudera.com",
"bucket":{
"name":"uat_bucket",
"flows":[
{
"name":"flowb1_digestion",
"version":"4",
"sensitive_parameters":[
],
"comment":"Update Flow Attributes",
"deployed_version":"2",
"deployed_comment":"Update Logic"
}
]
}
}
Пример dest_change_request.json Для рассмотрения запроса release менеджером в git создается запрос на перенос.8. Утверждение внедрения кодаПоскольку каждое изменение версии группы процессов запускает одно событие реестра NiFi, если диспетчер версий содержит несколько групп процессов и несколько служб корневого контроллера, то для одного запроса на изменение исходника он будет получать несколько запросов на изменение цели.Диспетчеру версий необходимо объединить связанные запросы на изменение и просмотреть изменения. Если перечислены какие-либо чувствительные параметры, он должен подтвердить, требуются ли в целевом NiFi новые значения чувствительных параметров. После внесения этих изменений конфиденциальных параметров запрос на перенос может быть одобрен и объединен с master ветвью в git.
{
"env":"uat",
"nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api",
"reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api",
"reg_client_name":"registry-uat.example.cloudera.com",
"bucket":{
"name":"uat_bucket",
"flows":[
{
"name":"flowa1_digestion",
"version":"1",
"sensitive_parameters":[
{
"name":"access_key_id"
},
{
"name":"secret_access_key"
}
],
"comment":"Initial Version"
},
{
"name":"root_controller_services",
"version":"3",
"sensitive_parameters":[
{
"name":"oracle_password"
}
],
"comment":"root_DBCPConnectionPool",
"deployed_version":"2",
"deployed_comment":"root_PropertiesFileLookupService"
},
{
"name":"flowb1_digestion",
"version":"4",
"sensitive_parameters":[
],
"comment":"Update Flow Attributes",
"deployed_version":"2",
"deployed_comment":"Update Logic"
}
]
}
}
Пример комбинированного dest_change_request.json 9. Развертывание новой версииНаконец, объединенная главная ветвь запускает скрипт python deploy_dest_changes.py для автоматического развертывания новой версии потока в среде UAT - без простоев или с минимальным временем простоя.
# Connect flowx_feeding with flowx_digestion
if feeding_pg is not None and digestion_pg is not None:
digestion_inputport = nipyapi.canvas.list_all_input_ports(pg_id=digestion_pg.id)
feeding_outputport = nipyapi.canvas.list_all_output_ports(pg_id=feeding_pg.id)
if digestion_inputport[0] is None or feeding_outputport[0] is None:
raise SystemExit('Error: the flowx_feeding pg must have an output port, and the flowx_digestion pg must have an input port!')
nipyapi.canvas.create_connection(feeding_outputport[0], digestion_inputport[0])
nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True)
nipyapi.canvas.schedule_process_group(feeding_pg.id, scheduled=True)
Соединение подпотоков совершенно нового потока
# Stop the input port
nipyapi.canvas.schedule_components(pg_id=digestion_pg.id, scheduled=False, components=input_port)
all_connections = nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id)
queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections)
# Wait for Queues are empty
while (queued_count > 0):
log.info("There are still %d queued events, waiting for all are processed.", queued_count)
time.sleep(10)
all_connections = nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id)
queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections)
log.info("Process Group %s has no queued event, start updating new version now!", flow['name'])
nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=False)
nipyapi.versioning.update_flow_ver(process_group=digestion_pg, target_version=flow['version'])
nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True)
Развертывание новой версии активного подпотока Благодаря превосходному NiPyApi (https://github.com/Chaffelson/nipyapi), веденному Дэниелом Чаффельсоном, скрипты Python для Cloudera Flow Management оказаались намного проще, чем ожидалось.
===========
Источник:
habr.com
===========
===========
Автор оригинала: Wendell Bu
===========Похожие новости:
- [IT-инфраструктура, Учебный процесс в IT, Распределённые системы] Kafka, Lamoda и непреодолимое желание учиться
- [Big Data] Location Intelligence
- [Big Data, DevOps] Новый KubernetesExecutor 2.0 в Airflow 2.0 (перевод)
- [Data Mining, Big Data, Аналитика мобильных приложений, Управление продуктом] Хочу всё знать о клиенте! Или как обогатить сухие факты DWH цифровыми путями и свойствами клиента из Amplitude
- [Big Data] Нетология совместно с Yandex.Cloud запустила бесплатный курс по визуализации данных
- [Python, Data Mining, Big Data, R, Визуализация данных] Оценка кредитного портфеля на R
- [Big Data, Data Engineering] Курсы валют и аналитика – использование обменных курсов в Хранилище Данных
- [Java, Apache] Как на самом деле работает auto-commit в Kafka и можем ли мы на него расчитывать?
- [Big Data, Законодательство в IT, Финансы в IT] Минцифры предлагает продавать стартапам доступ к государственным данным
- [Big Data, Профессиональная литература, Машинное обучение, Статистика в IT] Книга «Байесовская статистика: Star Wars, LEGO, резиновые уточки и многое другое»
Теги для поиска: #_apache, #_big_data, #_hadoop, #_data_engineering, #_nepreryvnaja_dostavka (непрерывная доставка), #_streaming, #_nifi, #_cloudera, #_blog_kompanii_cloudera (
Блог компании Cloudera
), #_apache, #_big_data, #_hadoop, #_data_engineering
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 08:21
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Cloudera Flow Management, основанная на Apache NiFi и являющаяся частью платформы Cloudera DataFlow, используется некоторыми из крупнейших организаций в мире для обеспечения простого в использовании, мощного и надежного способа распределения и высокоскоростной обработки данных в современной экосистеме больших данных. Клиенты все чаще используют CFM для ускорения обработки потоковых данных на предприятии от концепции до реализации. Интерфейс разработки потоков Cloudera отличается от типичных стилей структурированного кодирования, что часто создает проблему применения лучших практик непрерывного совершенствования/непрерывной доставки (CI/CD) в стиле DevOps для доставки потоков.В этом блоге мы рассмотрим сквозной жизненный цикл процесса потока данных, который способствует непрерывной доставке с минимальным временем простоя. Мы надеемся, что вы почувствуете вдохновение, чтобы внедрить некоторые из этих идей в свои собственные процессы Cloudera Flow Management CD. Ниже представлена архитектура подобного решения: Архитектура непрерывной потоковой доставки В этом процессе можно выделить следующие шаги.1.Разработка Разработчики проектируют DataFlow и тестируют его в общем многопользовательском DEV кластере. Команда может работать только в своих собственных группах процессов, контролируемых политиками Apache Ranger для NiFi. Если какие-либо группы потребуют изменить или добавить службу корневого контроллера, администратор поможет с этим изменением и скопирует его в выделенную группу процессов root_controller_services. 2. Контроль версий Для контроля версий разработчики вручную добавляют или обновляют протестированные потоки в DEV раздел NiFi Registry. При заданных политиках в Apache Ranger для NiFi Registry одна команда может наблюдать только за назначенным ей сегментом. Администратор обновляет версию группы процессов root_controller_services в сегменте администратора реестра. 3. Запрос Source Change Чтобы разработчики не переносили потоки, которые все еще проходят тестирование, в среду более высокого уровня, они должны создать файл запроса на изменение (source change) для коллегиальной проверки исходника после тестирования потоков. Этот файл находится в проекте git изменений исходника. Разработчики создают новую ветку этого проекта git, а затем обновляют файл source_change_request.json, чтобы он содержал измененные потоки и версии. Если какие-то службы корневого контроллера были обновлены, администратор обновит версию группы процессов root_controller_services в файле source_change_request.json.Как только все они будут готовы, владелец изменения должен дать запрос git pull, чтобы выполнить Peer Review. {
"env": "dev", "nifi_api_url": "https://nifi-dev.example.cloudera.com:9091/nifi-api", "reg_api_url": "https://registry-dev.example.cloudera.com:61443/nifi-registry-api", "reg_client_name": "registry-dev.example.cloudera.com", "buckets": [ { "name": "teama_bucket", "flows": [ {"name": "flowa1_feeding", "version": "2"}, {"name": "flowa1_digestion", "version": "2"}, ] }, { "name": "admin_bucket", "flows": [ {"name": "root_controller_services", "version": "1"} ] } ] } for bucket in source_env_conf['buckets']:
login_env(source_env_conf) source_bucket = nipyapi.versioning.get_registry_bucket(bucket['name']) for flow in bucket['flows'][:]: # Check out a flow from Source NiFi Registry login_env(source_env_conf) source_versioned_flow = nipyapi.versioning.get_flow_in_bucket(source_bucket.identifier, identifier=flow['name']) exported_flow = nipyapi.versioning.export_flow_version(bucket_id=source_bucket.identifier, flow_id=source_versioned_flow.identifier, version=flow['version'], mode='yaml') # Version control this flow in Dest NiFi Registry login_env(dest_env_conf) dest_bucket = nipyapi.versioning.get_registry_bucket(dest_env_conf['buckets'][0]) dest_flow = nipyapi.versioning.get_flow_in_bucket(bucket_id=dest_bucket.identifier, identifier=flow['name']) if dest_flow is None: imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_name=flow['name']) else: imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_id=dest_flow.identifier) log.info("Flow %s Version %s is imported from ENV %s into ENV %s.", flow['name'], flow['version'], source_env_conf['env'], dest_env_conf['env']) # Remove promoted flow from the json if flow in bucket['flows']: bucket['flows'].remove(flow) log.info("All %d flows in %s bucket %s are imported into %s bucket %s.", len(bucket['flows']), source_env_conf['env'], bucket['name'], dest_env_conf['env'], dest_env_conf['buckets'][0]) <eventHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class> <property name="Script Path">/var/cloudera/flow_cd/generate_dest_change_request.sh</property> <property name="Working Directory">/var/cloudera/flow_cd/</property> <property name="Whitelisted Event Type 1">CREATE_FLOW_VERSION</property> </eventHookProvider> CREATE_FLOW_VERSION feeb0fbe-5d7e-4363-b58b-142fa80775e1
1a0b614c-3d0f-471a-b6b1-645e6091596d 4 flow_cd Update-Attributes CREATE_FLOW_VERSION [BUCKET_ID=5d81dc5e-79e1-4387-8022-79e505f5e3a0,
FLOW_ID=a89bf6b7-41f9-4a96-86d4-0aeb3c3c25be, VERSION=4, USER=flow_cd, COMMENT=Update-Attributes] {
"env":"uat", "nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api", "reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api", "reg_client_name":"registry-uat.example.cloudera.com", "bucket":{ "name":"uat_bucket", "flows":[ { "name":"flowb1_digestion", "version":"4", "sensitive_parameters":[ ], "comment":"Update Flow Attributes", "deployed_version":"2", "deployed_comment":"Update Logic" } ] } } {
"env":"uat", "nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api", "reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api", "reg_client_name":"registry-uat.example.cloudera.com", "bucket":{ "name":"uat_bucket", "flows":[ { "name":"flowa1_digestion", "version":"1", "sensitive_parameters":[ { "name":"access_key_id" }, { "name":"secret_access_key" } ], "comment":"Initial Version" }, { "name":"root_controller_services", "version":"3", "sensitive_parameters":[ { "name":"oracle_password" } ], "comment":"root_DBCPConnectionPool", "deployed_version":"2", "deployed_comment":"root_PropertiesFileLookupService" }, { "name":"flowb1_digestion", "version":"4", "sensitive_parameters":[ ], "comment":"Update Flow Attributes", "deployed_version":"2", "deployed_comment":"Update Logic" } ] } } # Connect flowx_feeding with flowx_digestion
if feeding_pg is not None and digestion_pg is not None: digestion_inputport = nipyapi.canvas.list_all_input_ports(pg_id=digestion_pg.id) feeding_outputport = nipyapi.canvas.list_all_output_ports(pg_id=feeding_pg.id) if digestion_inputport[0] is None or feeding_outputport[0] is None: raise SystemExit('Error: the flowx_feeding pg must have an output port, and the flowx_digestion pg must have an input port!') nipyapi.canvas.create_connection(feeding_outputport[0], digestion_inputport[0]) nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True) nipyapi.canvas.schedule_process_group(feeding_pg.id, scheduled=True) # Stop the input port
nipyapi.canvas.schedule_components(pg_id=digestion_pg.id, scheduled=False, components=input_port) all_connections = nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id) queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections) # Wait for Queues are empty while (queued_count > 0): log.info("There are still %d queued events, waiting for all are processed.", queued_count) time.sleep(10) all_connections = nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id) queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections) log.info("Process Group %s has no queued event, start updating new version now!", flow['name']) nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=False) nipyapi.versioning.update_flow_ver(process_group=digestion_pg, target_version=flow['version']) nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True) =========== Источник: habr.com =========== =========== Автор оригинала: Wendell Bu ===========Похожие новости:
Блог компании Cloudera ), #_apache, #_big_data, #_hadoop, #_data_engineering |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 08:21
Часовой пояс: UTC + 5