[Системы обмена сообщениями, Python, Программирование, Анализ и проектирование систем] Система отложенного исполнения на RabbitMQ

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
15-Сен-2020 13:32

Всем привет!
Меня зовут Никита, и я курирую разработку нескольких проектов в ДомКлик. Сегодня я хочу продолжить тему «веселых картинок» в мире RabbitMQ. В своей статье Алексей Казаков рассматривал такой мощный инструмент, как отложенные очереди, и разные реализации стратегии Retry. А сегодня мы поговорим, как использовать RabbitMQ для планирования периодических задач.
Зачем нам понадобилось создавать свой велосипед и почему мы отказались от Сelery и других инструментов менеджмента задач? Дело в том, что они не подошли под наши задачи и требования к отказоустойчивости, которые у нас в компании достаточно жесткие.
При переходе на Docker и Kubernetes многие разработчики сталкиваются с проблемами организации периодических задач, кроны запускаются с бубном, да и контроль процесса оставляет желать лучшего. А еще есть проблемы с пиковой нагрузкой в дневное время.
Передо мной стояла задача реализовать в проекте надежную систему обработки периодических задач, при этом легко масштабируемую и отказоустойчивую. Наш проект на Python, поэтому логично было посмотреть, насколько нам подходит Celery. Это хороший инструмент, но с ним мы часто сталкивались с проблемами надежности, масштабирования и организации бесшовных релизов. Одна пода — одна группа процессов. При масштабировании Celery приходится увеличивать ресурсы одной поды, потому что нет синхронизации между подами, а это означает остановку обработки задач, хоть и временную. А если задачи еще и длительного выполнения, то вы уже догадались, насколько сложно этим управлять. Второй очевидный недостаток: из коробки нет поддержки асинхронности, а для нас это важно, потому что задачи в основном содержат операции ввода-вывода, а Celery работает на тредах.
На тот момент (2018 год) мы не нашли подходящего готового инструмента, и начали разрабатывать свой. Взяв за основу функциональность отложенного выполнения задач и Dead Letter Exchange, мы решили создать систему обработки периодических задач. Концепт выглядел как-то так:

Попробую объяснить, что к чему.
  • Задачи в виде сообщения попадают в коммутатор планировщика (Scheduler exchange).
  • По routing_key попадают в нужную очередь «вызревания» (Hatchery queue), которая имеет параметр message_ttl, а также связь с коммутатором исполнения (Processor exchange) в качестве deal letter exchange. Очередь «вызревания» не связана с типом задач, она только играет роль «таймера», то есть вы можете создать столько очередей, сколько периодов вам требуется, и управлять через routing_key.
  • Так как у очереди нет слушателей, сообщения после «вызревания» в очереди попадают в коммутатор исполнения (Processor exchange).
  • Дальше свободный потребитель (Processing consumer) забирает сообщение и исполняет. После исполнения цикл при необходимости повторяется.

В чем же преимущество такой схемы?
  • Поэтапность исполнения, то есть новая задача не поступит в обработку, если предыдущая не завершилась.
  • Единый слушатель (consumer), то есть можно делать как универсальные воркеры, так специализированные. Масштабируется простым повышением количества нужных под.
  • Деплой новых задач без нарушения работы текущих. Достаточно мягко обновить поды-слушатели и отправить в очередь соответствующее сообщение. То есть можно поднимать поды с новым кодом, которые займутся новыми сообщениями, а текущие процессы будут доживать в старых подах. Так мы получаем бесшовное обновление.
  • Можно использовать асинхронный код и любую инфраструктуру, при этом независимы от стека.
  • Можно контролировать исполнение задач на уровне нативного ack/reject, а также получаем дополнительную опциональную очередь (control queue), которая может отслеживать жизненный цикл задач.

Схема получилась на деле достаточно простой, мы быстро создали рабочий прототип. И код получился красивым. Достаточно callback-функцию пометить простым декоратором, контролирующим жизненный цикл сообщения.
def rmq_scheduler(routing_key_for_delay_queue, routing_key_for_processing_queue):
    def decorator(func):
        @wraps(func)
        async def wrapper(channel, body, envelope, properties):
            try:
                res = await func(channel, body, envelope, properties)
                await channel.publish(
                    payload=body,
                    exchange_name='',
                    routing_key=routing_key_for_delay_queue,
                )
                await channel.basic_client_ack(envelope.delivery_tag)
                return res
            except Exception as e:
                log_error(e)
                redelivered_count = get_count_of_redelivery_attempts(properties)
                if redelivered_count <= 3:
                    await resend_msg(
                        channel=channel,
                        body=body,
                        properties=properties,
                        routing_key=routing_key_for_processing_queue)
                else:
                    async with app.natalya_db_engine.acquire() as conn:
                        async with conn.begin():
                            await channel.publish(
                                payload=body,
                                exchange_name='',
                                routing_key=routing_key_for_delay_queue,
                            )
                await channel.basic_client_ack(envelope.delivery_tag)
        return wrapper
    return decorator

Сейчас мы используем эту схему для выполнения только периодических последовательных задач, но ее можно использовать и тогда, когда важно начать выполнять задачу в определенной время, без смещения времени на само исполнение. Для этого достаточно повторно запланировать задачу после попадания сообщения в supervisor.
Правда, у этого подхода есть дополнительные накладные расходы. Нужно понимать, что в случае ошибки сообщение вернется в очередь, его подхватит другой воркер и сразу начнет выполнять. Потому нужно разделять обработку ошибок по степени критичности и заранее продумывать, что делать с сообщением при той или иной ошибке.
Возможные варианты:
  • Ошибка исправится сама (например, это системная ошибка): отправляем noack и повторяем обработку ошибок.
  • Ошибка бизнес логики: нужно прервать цикл — отправляем ack.
  • Ошибка из пункта 1 повторяется слишком часто: отравляем reject и сигнализируем разработчикам. Тут возможны варианты. Можно создать deal letter-очередь для отстоя сообщений, чтобы после разбора вернуть сообщение, или же применить технику ретраев (указать message_ttl).

Пример декоратора:
def auto_ack_or_nack(log_message):
   def decorator(func):
       @wraps(func)
       async def wrapper(channel, body, envelope, properties):
           try:
               res = await func(channel, body, envelope, properties)
               await channel.basic_client_ack(envelope.delivery_tag)
               return res
           except Exception as e:
               await channel.basic_client_nack(envelope.delivery_tag, requeue=False)
               log_error(log_message, exception=e)
       return wrapper
   return decorator

Эта схема у нас работает уже полгода, она достаточно надёжна и практически не требует внимания. Падение приложения не нарушает работу планировщика и всего лишь немного задерживает выполнение задач.
Плюсов не бывает без минусов. У этой схемы есть и критическая уязвимость. Если с RabbitMQ что-то случилось и сообщения пропали, тогда нужно вручную смотреть, что потерялось, и запускать цикл снова. Но это исключительно маловероятная ситуация, при которой думать об этом сервисе придется в последнюю очередь :)
P.S. Если тема планирования периодических задач вам покажется интересной, то в следующей статье, я подробнее расскажу, как у нас устроена автоматизация создания очередей, а также про Supervisor.
Ссылки:

===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_sistemy_obmena_soobschenijami (Системы обмена сообщениями), #_python, #_programmirovanie (Программирование), #_analiz_i_proektirovanie_sistem (Анализ и проектирование систем), #_python, #_rabbitmq, #_scheduler, #_blog_kompanii_domklik (
Блог компании ДомКлик
)
, #_sistemy_obmena_soobschenijami (
Системы обмена сообщениями
)
, #_python, #_programmirovanie (
Программирование
)
, #_analiz_i_proektirovanie_sistem (
Анализ и проектирование систем
)
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 17:55
Часовой пояс: UTC + 5