Пока я работал над некоторыми проектами, использующими Celery для управления очередями задач, выявились некоторые лучшие практики, которые я решил задокументировать. Впрочем это громкие слова для того, что я думаю о правильном подходе к решению подобных задач, а также о некоторых недостаточно используемых возможностях, которые предлагает сообщество проекта Celery.
No.1: Не используйте СУБД как ваш AMQP брокер
Позвольте мне объяснить почему я считаю это неправильным(помимо тех ограничений что описаны в документации Celery).
СУБД не разрабатывались для тех задач, которые выполняют полноценный AMQP брокер такой как RabbitMQ. Она упадет в «боевых» условиях даже на проекте с не очень большим трафиком\пользовательской базой.
Я предполагаю, что самой популярной причиной того почему люди решают использовать СУБД в том что, как правило, у них уже есть одна СУБД для веб-приложения, так почему бы не воспользоваться ей еще раз. Начать работать с таким вариантом несложно и не надо беспокоиться о других компонентах(таких как RabbitMQ).
Предположим не такой уж гипотетический сценарий: у вас есть 4 фоновых воркера для обработки, которые вы помещаете в базу данных. Это значит что вы получаете 4 процесса, которые достаточно часто запрашивают базу о новых задачах, не говоря уже о том, что каждый из них может иметь собственные конкурирующие потоки. В некоторый момент времени вы понимаете, что растет задержка при обработке задач, а потому приходит больше новых задач чем завершается, необходимо увеличивать количество воркеров. Вдруг скорость вашей базы данных начинает «проседать» из-за огромного количества запросов воркеров к базе, дисковый ввод\вывод превышает заданные лимиты, а все это начинает влиять на ваше приложение, так как воркеры, фактически, устроили DDOS-атаку вашей базе.
Этого не произошло бы при использовании полноценного AMQP брокера, так как очередь размещается в памяти и таким образом устраняется высокая нагрузка на жесткий диск. Потребителям(воркерам) нет необходимости часто запрашивать информацию, так как очередь имеет механизм доставки новой задачи воркеру, и даже, если AMQP брокер будет перегружен по каким-либо иным причинам это не приведет к падению и тормозам того веб-приложения, которое взаимодействует с пользователем.
Я пойду еще дальше и скажу, что вы не должны использовать СУБД как брокера даже в процессе разработки, тогда когда есть такие вещи как Docker и множество преднастроенных образов, которые предоставляют настроенный RabbitMQ «из коробки».
No.2: Используйте больше очередей (т.е. не только одну, которая дается по умолчанию)
Celery очень легко начать использовать, и она предоставляет сразу же одну очередь по умолчанию, в которую и помещаются все задачи пока не будет явно предписано другое поведение Celery. Наиболее общий пример того, что вы можете увидеть:
@app.task()
def my_taskA(a, b, c):
print("doing something here...")
@app.task()
def my_taskB(x, y):
print("doing something here...")
Что происходит, если обе задачи будут размещены в одной очереди, если иное не определено в файле celeryconfig.py. Я полностью пониманию чем может оправдывать подобный подход, у вас есть один декоратор, который создает удобные фоновые задачи. Здесь я хотел бы обратить внимание, что taskA и taskB, находясь в одной очереди могут делать совершенно разные вещи и таким образом одна из них может быть куда важнее другой, так почему они находятся все в одной корзине? Даже, если у вас один воркер, то представьте такую ситуацию что менее важная задача taskB окажется настолько массовой, что более важной задаче taskA воркер не сможет уделить необходимого внимания.Это приводит нас к к следующему пункту.
No.3: Используйте приоритеты воркеров
Путем решения проблемы, указанной выше является размещение задачи taskA в одной очереди, а taskB в другой и после этого присвоить x воркеров обработке очередь Q1, а остальных на обработку Q2, так как в нее приходит больше задач. Таким образом вы можете быть уверены, что задача taskB получит достаточно воркеров, а остальные тем временем будут обрабатывать менее приоритетную задачу, когда она придет, не провоцируя длительного ожидания и обработки. Потому, определите ваши очереди сами:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)
И ваши роутеры, которые определять куда направлять задачу:
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}
Это позволит выполнять воркеры для каждой задачи:
celery worker -E -l INFO -n workerA -Q for_task_A
celery worker -E -l INFO -n workerB -Q for_task_B
No.4: используйте механизмы Celery для обработки ошибок
Большинство задач, которые я видел не имеют механизмов обработки ошибок. Если в задаче произошла ошибка, то она просто падает. Это может быть удобно для некоторых задач, однако большинство задач, которые я видел взаимодействовали с внешними API и падали из-за некоторых видов сетевых ошибок или иных проблем «доступности ресурса». Самый простой подход к обработке таких ошибок перевыполнить код задачи, так как, возможно, проблемы взаимодействия с внешним API были уже устранены.
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
Я люблю определять по умолчанию для задачи время ожидания, которое она будет ждать прежде чем попытается выполниться снова и как много попыток перевыполнения она предпримет прежде чем окончательно выбросить ошибку(параметры default_retry_delay и max_retries соответственно). Это наиболее простая форма обработки ошибок, которую я могу представить, но я видел, что и она практически не применяется. Разумеется Celery имеет и более сложные методы обработки ошибок, они описаны в документации Celery.
No.5: используйте Flower
Flower — прекрасный инструмент для отслеживания состояния ваших задач и воркеров Celery. У инструмента есть веб-интерфейс и он позволяет такие вещи как:
- прогресс задач
- детали выполнения
- статус воркеров
- запускать новые воркеры
Полный список возможностей вы можете увидеть по приведенной ссылке.
No.6: Отслеживайте статус задачи, только если вам это необходимо
Статус задачи это информация о том успешно или нет завершилась задача. Она может быть полезна для некоторых статистических показателей. Важная вещь, которую следует понимать в данном случае: статус задачи это не результирующие данные и той работы, которая она выполняла, такая информация наиболее похожа на неявные изменения, записываемые в базу данных(такие, например, как изменения списка друзей пользователя).
В большинстве проектов, которые я видел реально не заботились о данных по статусу задачи после ее завершения, используя базу данных sqlite, которую предлагается по умолчанию или лучше того тратили время на использование больших СУБД типа PostgreSQL. Зачем просто так нагружать базу данных своего приложения? Используйте CELERY_IGNORE_RESULT = True в вашем файле настроек celeryconfig.py и отбрасывайте такие данные.
No.7: не передавайте объекты базы данных\ORM в задачу
После обсуждения вышеизложенного на встречах локальных групп python разработчиков некоторые люди предложили включить дополнительный пункт в представленный список. О чем он? Вы не должны передавать объекты базы данных, например, модель пользователя в фоновую задачу, так как в сериализованном объекте могут оказаться уже устаревшие и некорректные данные. Если вам необходимо, то передавайте в задачу ID пользователя, а в самой задаче запрашивайте базу об этом пользователе.
Комментарии (23)
evikbook
22.10.2015 23:17+4На практике Redis быстрее чем RabbitMQ в роли брокера.
youlose
23.10.2015 11:41+2У RabbitMQ есть встроенная веб морда с метриками для очередей.
Редис не может накапливать сообщения в очереди, то есть там нужно чтобы и подписчики и паблишеры были онлайн.
Насколько я в курсе в редисе нету подтверждения сообщений, нету авторизации, логического разделения очередей и много чего другого. В общем это инструменты для разных нужд и нагрузок.
Плюс про производительность хочу сказать, что у меня в работе есть очереди которые обрабатывают тысячи сообщений в секунду и сверхнагрузки на RMQ нет, а большинству людей насколько я в курсе и 10-20/секунду никогда не достичь нагрузки. Так что предлагаю вспомнить Дональда Кнута: «Предварительная оптимизация — зло».armab
23.10.2015 15:14В Redis-e есть авторизация и он умеет накапливать в очереди.
У рубистов есть Resque, который использует Redis в качестве бэкенда.
Resque сделан в GitHub и используется ими в качестве основного MQ. И он действительно быстрый и нагрузку он держит.
Есть очень много имплементаций на других языках и очень хорошая web паель управления (в лучших традициях GitHub).youlose
23.10.2015 15:53«В Redis-e есть авторизация и он умеет накапливать в очереди.»
Авторизация там не для разграничения доступа к очередям, а для доступа к Redis.
Да, накапливает, тут я неправильно сказал.
«У рубистов есть Resque, который использует Redis в качестве бэкенда.»
Он Redis использует в качестве брокера очередей. А Rescue по сути — воркер-враппер, для заданий и c RMQ его нельзя сравнивать потому что это совсем разные продукты.
Я не понимаю что вы хотите доказать, я не сказал что Redis'ом не надо пользоваться как брокером очередей, я сказал что у них разные задачи и их не нужно сравнивать. Redis для очень простых очередей с небольшим количеством неважных задач. Вот у меня на RMQ крутятся сотни очередей от десятков проектов, написанные на 4 разных языках программирования, которые изолированы друг от друга авторизацией. Данные в очередях (которые durable) ПЕРЕЖИВАЮТ РЕСТАРТ RMQ, вот в редисе так точно нельзя. Если у вас возникнут какие-то проблемы с Redis, например он начнёт жрать очень много памяти, вы не сможете выяснить почему, там нет никакой возможности мониторить сложные проблемы. Также у RMQ есть ещё море всяких уникальных возможностей которые в один комментарий не засунешь, это надо целый пост писать.armab
23.10.2015 19:06Я не понимаю что вы хотите доказать
Ваше изначальное утверждение, что решение основанное на Redis недостаточно гибко/масштабируемо/надежно итд.
Я показал пример, что Redis в качестве бэкенда для MQ достаточно серьезное решение, используемое Github, где совсем не: «очень простые очереди с небольшим количеством неважных задач». К примеру Redis-Resque и RabbitMQ вполне конкурирующие системы.
Данные в очередях (которые durable) ПЕРЕЖИВАЮТ РЕСТАРТ RMQ, вот в редисе так точно нельзя
Конечно можно, Redis умеет складывать данные в файловую систему и подхватывать в случае рестарта/поломки. Кроме того, есть репликация.
По функционалу — пройдитесь по списку resque плагинов и их описанию. Заодно статья за 2009 год будет полезна почему Github написали свой велосипед, перепробовав кучу других MQ:
highscalability.com/blog/2009/11/6/product-resque-githubs-distrubuted-job-queue.html
Я в корне не согласен с трактовкой что Redis как бэкенд для MQ (пример resque) не подходит для серьезных задач и нагрузок. И по функционалу и по скорости (~100K сообщений/секунду на ядро одинаково достижимы как для resque так и для RabbitMQ) и по надежности (миллионы сообщений в сутки не проблема вообще ни для какой MQ).cadmi
26.10.2015 08:11А master-master между двумя площадками умеет? А реплицировать между инстансами не все очереди гуртом, а только одну (которую администратор позволит)?
Ах черт, это же про RabbitMQ и про «большинству этого не нужно» :)
defuz
23.10.2015 18:02+1Редис не может накапливать сообщения в очереди, то есть там нужно чтобы и подписчики и паблишеры были онлайн.
Может, в редисе всего этого нет «из коробки» в механизме pub/sub, но все это точно можно реализовать средствами самого редиса.
Насколько я в курсе в редисе нету подтверждения сообщений, нету авторизации, логического разделения очередей и много чего другого. В общем это инструменты для разных нужд и нагрузок.
youlose
23.10.2015 11:47+1Хотел бы попросить автора описать, чем celery выгоднее и лучше чем просто использовать RabbitMQ (я про реально используемые фичи). Потому что пару раз задумывался об этом, читал доку celery, но так там ничего полезного для нас и не нашёл особо. Как я понимаю это универсальная надстройка над разными брокерами сообщений, но вот конкретно для раббита что она добавляет?
gigimon
23.10.2015 13:44+1celery это система для выполнения задач и rabbitmq она использует по его прямому назначению — роутить сообщения
kibergus
23.10.2015 12:13У меня очень негативный опыт использования celery и особенно связки celery + mongo. Производительность мне была не важна, задачь очень мало. Но их надо было не терять. Т.е. если я добавляю задачу то она, во-первых, должна когда-нибудь выполниться, а во-вторых, мне должен прийти результат выполнения задачи. Запускаю систему, она работает, но иногда задачи исчезают. Долго копаюсь в коде, а там в celery абстракция над абстракцией и абстракцией погоняет, нахожу проблему, исправляю. Заодно понимаю, что оно by design будет терять задачи: задача выгружается в память воркера и если он упадет, то попытается в expect блоке положить её обратно в очередь. Но может и не положить.
Потом выясняется, что при смене мастера монги celery виснет. Оно исключение не обрабатывало. Пишу патч, отправляю разработчикам. Результат — мы сами не используем mongodb, поэтому не понимает что это исправление делает, так что патч принимать не будем.
Пока они над этим думали, я написал свой велосипед который занимает на несколько порядков меньше строк кода и который рагантированно отказоустойчив.youlose
23.10.2015 12:53А что именно в celery привлекло? Почему не чистый RMQ (там для вашей проблемы есть durable очереди + ручное подтверждение задач, своя кластеризация + один инструмент вместо двух, меньше точек отказа).
kibergus
23.10.2015 15:49В проекте уже использовалась монга. Соответсвенно можно было использовать её, а можно было поднимать рядом RabbitMQ. Отделу администрирования не понравилась идея поднимать еще один сервис и обеспечивать его отказоустойчивость. Причем опыт эксплуатации mongo был значительно больше, чем и RabbitMQ.
debsaw
23.10.2015 15:07+1Может кому пригодится
os.environ['CELERY_CONFIG_MODULE'] = 'conf.celeryconfig'
указать альтернативное размещение конфига
Daniyar94
23.10.2015 17:01Вопрос. Я занимаюсь разработкой большого проекта на Джанго, и сегодня увидел эту статью. Я не могу понять зачем нам нужен Celery. Можете привести примеры использования Celery в реальной жизни? Пытаюсь что нибудь нарыть в интернете, но все источники просто твердят, что это task queue
GDApsy
23.10.2015 19:01А чем он еще должен быть? Celery же и проектировался как абстракция над системами очередей. Вот, например, его использует такая джанго-батарейка: habrahabr.ru/post/253445 и ее репозиторий: github.com/LPgenerator/django-db-mailer
vivo
26.10.2015 01:28Начиная от cron задач и заканчивая любыми асинхронными операциями. Например, отправка писем.
Apollo286
23.10.2015 17:21+1Я люблю celery за возможность строить сложные цепочки асинхронных задач, используя их примитивы
Например так:
# (4 + 4) * 8 * 10
res = chain(add.s(4, 4), mul.s(8), mul.s(10))
Результат выполнения задачи идёт в кач-ве первого аргумента для след. задачи
Ссылка на документацию
Бывает весьма полезно использовать web hooks, которые позволяют в кач-ве воркера использовать сторонний сервис
Так же часто бывает полезно, когда много мелких однотипных задач, держать соединение с базой для всех выполняемых задач, а не создавать его заново в каждой новой.
Пример
GDApsy
Возможно данная статья уже публиковалась на Хабре, но я ее не нашел. Об ошибках орфографии или опечатках пишите в личку