[Системы обмена сообщениями, Python, Программирование, Анализ и проектирование систем] Система отложенного исполнения на RabbitMQ
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Всем привет!
Меня зовут Никита, и я курирую разработку нескольких проектов в ДомКлик. Сегодня я хочу продолжить тему «веселых картинок» в мире RabbitMQ. В своей статье Алексей Казаков рассматривал такой мощный инструмент, как отложенные очереди, и разные реализации стратегии Retry. А сегодня мы поговорим, как использовать RabbitMQ для планирования периодических задач.
Зачем нам понадобилось создавать свой велосипед и почему мы отказались от Сelery и других инструментов менеджмента задач? Дело в том, что они не подошли под наши задачи и требования к отказоустойчивости, которые у нас в компании достаточно жесткие.
При переходе на Docker и Kubernetes многие разработчики сталкиваются с проблемами организации периодических задач, кроны запускаются с бубном, да и контроль процесса оставляет желать лучшего. А еще есть проблемы с пиковой нагрузкой в дневное время.
Передо мной стояла задача реализовать в проекте надежную систему обработки периодических задач, при этом легко масштабируемую и отказоустойчивую. Наш проект на Python, поэтому логично было посмотреть, насколько нам подходит Celery. Это хороший инструмент, но с ним мы часто сталкивались с проблемами надежности, масштабирования и организации бесшовных релизов. Одна пода — одна группа процессов. При масштабировании Celery приходится увеличивать ресурсы одной поды, потому что нет синхронизации между подами, а это означает остановку обработки задач, хоть и временную. А если задачи еще и длительного выполнения, то вы уже догадались, насколько сложно этим управлять. Второй очевидный недостаток: из коробки нет поддержки асинхронности, а для нас это важно, потому что задачи в основном содержат операции ввода-вывода, а Celery работает на тредах.
На тот момент (2018 год) мы не нашли подходящего готового инструмента, и начали разрабатывать свой. Взяв за основу функциональность отложенного выполнения задач и Dead Letter Exchange, мы решили создать систему обработки периодических задач. Концепт выглядел как-то так:
Попробую объяснить, что к чему.
- Задачи в виде сообщения попадают в коммутатор планировщика (Scheduler exchange).
- По routing_key попадают в нужную очередь «вызревания» (Hatchery queue), которая имеет параметр message_ttl, а также связь с коммутатором исполнения (Processor exchange) в качестве deal letter exchange. Очередь «вызревания» не связана с типом задач, она только играет роль «таймера», то есть вы можете создать столько очередей, сколько периодов вам требуется, и управлять через routing_key.
- Так как у очереди нет слушателей, сообщения после «вызревания» в очереди попадают в коммутатор исполнения (Processor exchange).
- Дальше свободный потребитель (Processing consumer) забирает сообщение и исполняет. После исполнения цикл при необходимости повторяется.
В чем же преимущество такой схемы?
- Поэтапность исполнения, то есть новая задача не поступит в обработку, если предыдущая не завершилась.
- Единый слушатель (consumer), то есть можно делать как универсальные воркеры, так специализированные. Масштабируется простым повышением количества нужных под.
- Деплой новых задач без нарушения работы текущих. Достаточно мягко обновить поды-слушатели и отправить в очередь соответствующее сообщение. То есть можно поднимать поды с новым кодом, которые займутся новыми сообщениями, а текущие процессы будут доживать в старых подах. Так мы получаем бесшовное обновление.
- Можно использовать асинхронный код и любую инфраструктуру, при этом независимы от стека.
- Можно контролировать исполнение задач на уровне нативного ack/reject, а также получаем дополнительную опциональную очередь (control queue), которая может отслеживать жизненный цикл задач.
Схема получилась на деле достаточно простой, мы быстро создали рабочий прототип. И код получился красивым. Достаточно callback-функцию пометить простым декоратором, контролирующим жизненный цикл сообщения.
def rmq_scheduler(routing_key_for_delay_queue, routing_key_for_processing_queue):
def decorator(func):
@wraps(func)
async def wrapper(channel, body, envelope, properties):
try:
res = await func(channel, body, envelope, properties)
await channel.publish(
payload=body,
exchange_name='',
routing_key=routing_key_for_delay_queue,
)
await channel.basic_client_ack(envelope.delivery_tag)
return res
except Exception as e:
log_error(e)
redelivered_count = get_count_of_redelivery_attempts(properties)
if redelivered_count <= 3:
await resend_msg(
channel=channel,
body=body,
properties=properties,
routing_key=routing_key_for_processing_queue)
else:
async with app.natalya_db_engine.acquire() as conn:
async with conn.begin():
await channel.publish(
payload=body,
exchange_name='',
routing_key=routing_key_for_delay_queue,
)
await channel.basic_client_ack(envelope.delivery_tag)
return wrapper
return decorator
Сейчас мы используем эту схему для выполнения только периодических последовательных задач, но ее можно использовать и тогда, когда важно начать выполнять задачу в определенной время, без смещения времени на само исполнение. Для этого достаточно повторно запланировать задачу после попадания сообщения в supervisor.
Правда, у этого подхода есть дополнительные накладные расходы. Нужно понимать, что в случае ошибки сообщение вернется в очередь, его подхватит другой воркер и сразу начнет выполнять. Потому нужно разделять обработку ошибок по степени критичности и заранее продумывать, что делать с сообщением при той или иной ошибке.
Возможные варианты:
- Ошибка исправится сама (например, это системная ошибка): отправляем noack и повторяем обработку ошибок.
- Ошибка бизнес логики: нужно прервать цикл — отправляем ack.
- Ошибка из пункта 1 повторяется слишком часто: отравляем reject и сигнализируем разработчикам. Тут возможны варианты. Можно создать deal letter-очередь для отстоя сообщений, чтобы после разбора вернуть сообщение, или же применить технику ретраев (указать message_ttl).
Пример декоратора:
def auto_ack_or_nack(log_message):
def decorator(func):
@wraps(func)
async def wrapper(channel, body, envelope, properties):
try:
res = await func(channel, body, envelope, properties)
await channel.basic_client_ack(envelope.delivery_tag)
return res
except Exception as e:
await channel.basic_client_nack(envelope.delivery_tag, requeue=False)
log_error(log_message, exception=e)
return wrapper
return decorator
Эта схема у нас работает уже полгода, она достаточно надёжна и практически не требует внимания. Падение приложения не нарушает работу планировщика и всего лишь немного задерживает выполнение задач.
Плюсов не бывает без минусов. У этой схемы есть и критическая уязвимость. Если с RabbitMQ что-то случилось и сообщения пропали, тогда нужно вручную смотреть, что потерялось, и запускать цикл снова. Но это исключительно маловероятная ситуация, при которой думать об этом сервисе придется в последнюю очередь :)
P.S. Если тема планирования периодических задач вам покажется интересной, то в следующей статье, я подробнее расскажу, как у нас устроена автоматизация создания очередей, а также про Supervisor.
Ссылки:
===========
Источник:
habr.com
===========
Похожие новости:
- [.NET, Программирование] Принятого не воротай: Enumerable vs List
- [Big Data, Data Mining, Python, Открытые данные] Преступления на почве расизма в США: статистический анализ
- [Программирование, Разработка под Android, Разработка мобильных приложений] 20 инструментов Android-разработчика, о которых вы могли не знать (перевод)
- [Программирование] Что значит быть программистом?
- [Программирование, Проектирование и рефакторинг, Go] Дневник изучения Go: запись 1
- [JavaScript, Программирование, Разработка веб-сайтов] 200 теоретических вопросов по JavaScript
- [Программирование, Учебный процесс в IT, Карьера в IT-индустрии, Конференции] Бесплатные онлайн-мероприятия по разработке (15 сентября – 23 сентября)
- [Визуализация данных, Программирование микроконтроллеров, Разработка для интернета вещей, Умный дом, Интернет вещей] SCADA «BortX» с поддержкой языка управления в рамках ANSI /ISA-88 для ESP8266
- [Программирование микроконтроллеров] Продолжение очередной статьи: STM32 для начинающих. Интерфейсы
- [Программирование микроконтроллеров] Очередная статья: STM32 для начинающих
Теги для поиска: #_sistemy_obmena_soobschenijami (Системы обмена сообщениями), #_python, #_programmirovanie (Программирование), #_analiz_i_proektirovanie_sistem (Анализ и проектирование систем), #_python, #_rabbitmq, #_scheduler, #_blog_kompanii_domklik (
Блог компании ДомКлик
), #_sistemy_obmena_soobschenijami (
Системы обмена сообщениями
), #_python, #_programmirovanie (
Программирование
), #_analiz_i_proektirovanie_sistem (
Анализ и проектирование систем
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 17:55
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Всем привет! Меня зовут Никита, и я курирую разработку нескольких проектов в ДомКлик. Сегодня я хочу продолжить тему «веселых картинок» в мире RabbitMQ. В своей статье Алексей Казаков рассматривал такой мощный инструмент, как отложенные очереди, и разные реализации стратегии Retry. А сегодня мы поговорим, как использовать RabbitMQ для планирования периодических задач. Зачем нам понадобилось создавать свой велосипед и почему мы отказались от Сelery и других инструментов менеджмента задач? Дело в том, что они не подошли под наши задачи и требования к отказоустойчивости, которые у нас в компании достаточно жесткие. При переходе на Docker и Kubernetes многие разработчики сталкиваются с проблемами организации периодических задач, кроны запускаются с бубном, да и контроль процесса оставляет желать лучшего. А еще есть проблемы с пиковой нагрузкой в дневное время. Передо мной стояла задача реализовать в проекте надежную систему обработки периодических задач, при этом легко масштабируемую и отказоустойчивую. Наш проект на Python, поэтому логично было посмотреть, насколько нам подходит Celery. Это хороший инструмент, но с ним мы часто сталкивались с проблемами надежности, масштабирования и организации бесшовных релизов. Одна пода — одна группа процессов. При масштабировании Celery приходится увеличивать ресурсы одной поды, потому что нет синхронизации между подами, а это означает остановку обработки задач, хоть и временную. А если задачи еще и длительного выполнения, то вы уже догадались, насколько сложно этим управлять. Второй очевидный недостаток: из коробки нет поддержки асинхронности, а для нас это важно, потому что задачи в основном содержат операции ввода-вывода, а Celery работает на тредах. На тот момент (2018 год) мы не нашли подходящего готового инструмента, и начали разрабатывать свой. Взяв за основу функциональность отложенного выполнения задач и Dead Letter Exchange, мы решили создать систему обработки периодических задач. Концепт выглядел как-то так: Попробую объяснить, что к чему.
В чем же преимущество такой схемы?
Схема получилась на деле достаточно простой, мы быстро создали рабочий прототип. И код получился красивым. Достаточно callback-функцию пометить простым декоратором, контролирующим жизненный цикл сообщения. def rmq_scheduler(routing_key_for_delay_queue, routing_key_for_processing_queue):
def decorator(func): @wraps(func) async def wrapper(channel, body, envelope, properties): try: res = await func(channel, body, envelope, properties) await channel.publish( payload=body, exchange_name='', routing_key=routing_key_for_delay_queue, ) await channel.basic_client_ack(envelope.delivery_tag) return res except Exception as e: log_error(e) redelivered_count = get_count_of_redelivery_attempts(properties) if redelivered_count <= 3: await resend_msg( channel=channel, body=body, properties=properties, routing_key=routing_key_for_processing_queue) else: async with app.natalya_db_engine.acquire() as conn: async with conn.begin(): await channel.publish( payload=body, exchange_name='', routing_key=routing_key_for_delay_queue, ) await channel.basic_client_ack(envelope.delivery_tag) return wrapper return decorator Сейчас мы используем эту схему для выполнения только периодических последовательных задач, но ее можно использовать и тогда, когда важно начать выполнять задачу в определенной время, без смещения времени на само исполнение. Для этого достаточно повторно запланировать задачу после попадания сообщения в supervisor. Правда, у этого подхода есть дополнительные накладные расходы. Нужно понимать, что в случае ошибки сообщение вернется в очередь, его подхватит другой воркер и сразу начнет выполнять. Потому нужно разделять обработку ошибок по степени критичности и заранее продумывать, что делать с сообщением при той или иной ошибке. Возможные варианты:
Пример декоратора: def auto_ack_or_nack(log_message):
def decorator(func): @wraps(func) async def wrapper(channel, body, envelope, properties): try: res = await func(channel, body, envelope, properties) await channel.basic_client_ack(envelope.delivery_tag) return res except Exception as e: await channel.basic_client_nack(envelope.delivery_tag, requeue=False) log_error(log_message, exception=e) return wrapper return decorator Эта схема у нас работает уже полгода, она достаточно надёжна и практически не требует внимания. Падение приложения не нарушает работу планировщика и всего лишь немного задерживает выполнение задач. Плюсов не бывает без минусов. У этой схемы есть и критическая уязвимость. Если с RabbitMQ что-то случилось и сообщения пропали, тогда нужно вручную смотреть, что потерялось, и запускать цикл снова. Но это исключительно маловероятная ситуация, при которой думать об этом сервисе придется в последнюю очередь :) P.S. Если тема планирования периодических задач вам покажется интересной, то в следующей статье, я подробнее расскажу, как у нас устроена автоматизация создания очередей, а также про Supervisor. Ссылки: =========== Источник: habr.com =========== Похожие новости:
Блог компании ДомКлик ), #_sistemy_obmena_soobschenijami ( Системы обмена сообщениями ), #_python, #_programmirovanie ( Программирование ), #_analiz_i_proektirovanie_sistem ( Анализ и проектирование систем ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 17:55
Часовой пояс: UTC + 5