Привет, Хабр! Я расскажу тебе историю своего профессионального подгорания.
Так вышло, что я терпеть не могу рутинных однообразных действий. У меня за плечами несколько проектов, использующих Celery. Каждый раз, когда задача становится сложнее вывода 2 + 2 = 5
, шаблон решения сводится к созданию класса, выполняющего задачу, и функции-стартера, с которой умеет работать Celery — бойлерплейта. В этой статье я расскажу, как я боролся с бойлерплейтом, и что из этого вышло.
Отправная точка
Рассмотрим рядовую таску Celery. Есть класс, исполняющий задачу, и функция-стартер, выполняющая инстанцирование класса и запуск одного его метода, в котором реализована вся логика задачи и унаследована обработка ошибок:
class MyTask(
FirstMixin,
SecondMixin,
ThirdMixin,
):
def main(self):
data = self.do_something()
response = self.remote_call(data)
parsed = self.parser(response)
return self.process(parsed)
@app.task(bind=True)
def my_task(self, arg1, arg2):
instance = MyTask(
celery_task=self,
arg1=arg1,
arg2=arg2,
)
return instance.full_task()
При этом метод full_task
включает в себя вызов main
, однако также занимается обработкой ошибок, логгированием и прочей чушью, не имеющей прямого отношения к основной задаче.
Идея тасккласса
В корне тасккласса лежит простая идея: в базовом классе можно определить метод класса task
, в нём реализовать поведение функции-стартера, а после наследоваться:
class BaseTask:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def full_task(self):
try:
return self.main()
except:
self.celery_task.retry(countdown=30)
@classmethod
def task(cls, task, **kwargs):
self = cls(
celery_task=celery_task,
**kwargs,
)
return self.full_task()
Вся вспомогательная скукотища собрана в базовом классе. Больше к ней не возвращаемся. Реализуем логику задачи:
@app.taskcls(bind=True)
class MyTask(
BaseTask,
FirstMixin,
SecondMixin,
ThirdMixin,
):
def main(self):
data = self.do_something()
response = self.remote_call(data)
parsed = self.parser(response)
return self.process(parsed)
Больше никакой шелухи, уже намного лучше. Однако что же с точкой входа?
MyTask.task.delay(...)
MyTask.task
обладает всеми методами обычной таски: delay
, apply_async
, и, вообще говоря, ей и является.
Теперь аргументы декоратора. Особенно весело тащить bind = True
в каждую таску. Можно ли передать аргументы по умолчанию через базовый класс?
class BaseTask:
class MetaTask:
bind = True
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def full_task(self):
try:
return self.main()
except:
self.celery_task.retry(countdown=30)
@classmethod
def task(cls, task, **kwargs):
self = cls(
celery_task=celery_task,
**kwargs,
)
return self.full_task()
Вложенный класс MetaTask
содержит аргументы по умолчанию и будет доступен всем дочерним классам. Интересно, что и его можно унаследовать:
class BaseHasTimeout(BaseTask):
class MetaTask(BaseTask.MetaTask):
timeout = 42
Наивысшим приоритетом обладают аргументы, переданные декоратору @app.taskcls
:
@app.taskcls(timeout=20)
class MyTask(
BaseHasTimeout,
FirstMixin,
SecondMixin,
ThirdMixin,
):
def main(self):
...
В итоге timeout для задачи будет 20.
Выход за рамки
В web-приложениях часто есть необходимость из view запустить таску. В случае высокой сцепленности view и таски их можно совместить:
class BaseViewTask:
@classmethod
def task(cls, **kwargs):
# Somehow init View class manually
self = cls(...)
return self.celery()
@app.taskcls
class MyView(
BaseViewTask,
FirstMixin,
SecondMixin,
ThirdMixin,
APIView,
):
queryset = MyModel.objects.all()
def get_some_data(self, *args, **kwargs): # common methed
return self.queryset.filtert(...)
def get(self, request):
data = self.get_some_data(request.field) # used in request handling
return Response(json.dumps(data))
def post(self, request):
self.task.delay(...)
return Response(status=201)
def celery(self):
data = self.get_some_data(...) # also used in background task
return self.do_something(data)
Кстати, именно для исключения коллизии имён вложенный класс называется MetaTask
, а не Meta
, как в django.
Заключение
Эта функциональность ожидается в Celery 4.5. Однако я также подготовил пакет, позволяющий попрбовать декоратор taskcls
уже сегодня. Идея пакета сводится к тому, что при обновлении Celery до версии 4.5 вы сможете убрать его импорт не меняя более ни строчки кода.
Комментарии (21)
Sovetnikov
07.10.2019 23:46Странно, что мейнтейнеры согласились включить это в репозиторий, уж так ли это необходимо?
Вообще celery переживает сложный период с выходом 4.x и качество «продукта» упало.
В celery очень много наростов, которые комьюнити видимо не может поддерживать и они просто вырезаются, из наиболее интересных это прекращение поддержки Redis как брокера.
Внутренности celery и его обвязок тоже не сахар (видел, исправлял, пытался доработать) и как это всё стабилизируется непонятно.
Часто обвязки несовместимы друг с другом, в некоторых версиях есть ошибки, а версии без ошибок уже дают ошибки из-за несовместимости.
Слишком сложный продукт стал для простого запуска задач (построение workflow из задач полноценно не работает в celery, дальше chain и group лучше не ходить).
Работаем с celery c 2015 года и через пару недель надеюсь откажемся от него, хотя задач у нас всего 3000-5000 в час.
Версия 3.x была вполне стабильна.kai3341 Автор
08.10.2019 00:08> Странно, что мейнтейнеры согласились включить это в репозиторий
Да вот до сегодняшнего дня там просто висела лычка «Milestone 4.5». Сегодня пол дня рассказывал, что это и зачем это. Пока туго, придётся тестовый проект писать
> уж так ли это необходимо?
У меня есть аналогия — ООП. Дело в том, что без ООП можно реализовать что угодно — сишные структурки в помощь. Однако почему-то ООП является наиболее популярной парадигмой, хотя, казалось бы, так ли это необходимо?
> из наиболее интересных это прекращение поддержки Redis как брокера
Странно, документация говорит, что redis поддерживается в качестве брокера. Можно подробнее?
> Внутренности celery и его обвязок тоже не сахар (видел, исправлял, пытался доработать) и как это всё стабилизируется непонятно.
Да, хорошо видно, что celery писался разными людьми. Одного посещала муза, другого нет
А что вы вкладываете в слово `обвязки`? Модули? Или уровни абстракции?
А вообще celery нужен хороший рефакторинг. Каким образом взаимодействуют пул и хаб я пока не понял, поэтому мой asyncio-пул — это пока скорее костыль, нежели решение.Sovetnikov
08.10.2019 00:30+1ООП дело вкуса, просто всего один метод в классе…
Про redis погорячился, это в 2016 году они хотели его почикать, но оставили github.com/celery/celery/commit/e15b0bfc658b397955beeae4a84127cf44686d50
Обвязки это billiard, kombu, beat и т.д.kai3341 Автор
08.10.2019 00:48О, интересная ссылочка
> ООП дело вкуса, просто всего один метод в классе…
Так в том и дело, что не один. Я упираю на множественное наследование. Один (опять же, это слабодостижимый идеал. Чаще несколько) метод требуется до или переопределить и скомбинировать остальные.
Ага, то есть и и модули, и абстракции. Так, а какие претензии к kombu? Свести под одну гребёнку разные шины сообщений — это же прекрасно. А чем billiard провинился? Получилась хорошая абстракция и по факту основа для построения кластера. Beat же вообще опциональный. Для одного воркера можно передать флаг -B — и beat будет встроен в воркер. Естественно, для нескольких экземпляров воркеров нельзя встраивать beat, иначе копии тасок станут исполняться на каждой ноде. Во весело запустить одновременно несколько сессий бэкапа БД.
А вообще к beat у меня персональный пунктик. Вот есть же kombu под капотом. Но нет — расписания мы способны подхватить лишь при старте. Выкручивался пакетом redisbeat, благо, брокером использовался как раз redis. Там кстати смешной баг был. Ну как смешной, стул тогда подо мной прогорелsnp
08.10.2019 01:12Естественно, для нескольких экземпляров воркеров нельзя встраивать beat, иначе копии тасок станут исполняться на каждой ноде.
celery-redbeat решает проблему. Просто запускаем все celery с
-B
и не беспокоимся.
запустить одновременно несколько сессий бэкапа БД
Ну подобные таски в лок оборачивать положено, независимо ни от чего, например с помощью python-redis-lock
kai3341 Автор
08.10.2019 01:34celery-redbeat решает проблему
Неа, эту проблему решает просто внешний шедулер, и стандартного хватит. Когда появляется слово «динамика», стандартный шедулер уходит в закат
А за библиотечки спасибо — celery-redbeat выглядит поприятнее, чем redisbeat. python-redis-lock интересен тем, что он идёт в поставке с celery-redbeat
Sovetnikov
08.10.2019 12:55Сколько неприятных моментов нам доставили распределенные блокировки по TTL в случае нештатных ситуаций :)
Самое печально, что блокировки по TTL без отслеживания смерти клиента в случае аварии могут помешать системе восстановиться после простого ребута.snp
08.10.2019 14:21python-redis-lock может ставить лок с TTL и обновлять его, пока процесс работает. Умер/подвис процесс — лок устарел.
with redis_lock.Lock('my-lock', expire=60, auto_renewal=True): # Do work....
Sovetnikov
08.10.2019 15:00Это всё до боли знакомо и ясно :)
Но лок устареет только через 60 секунд.
А если у нас длительные процессы с несколько непонятной продолжительностью которые ресурсы берут? Аппроксимация TTL? Обновление TTL? Ну его… не надёжно это всё, всёравно останется какая-то блокировка у которой TTL несколько часов.
Мы своё элегантное решение сделали, пока обкатываем и оформляем, если интересно то пишите asovetnikov на гмейле
Sovetnikov
08.10.2019 12:56По отдельности kombu, billiard и всё остальное прекрасны может быть, но этот зоопарк сильно связан по конкретным версиям между собой и в случае ошибки в одном начинается процесс подбора версий.
snp
08.10.2019 14:24Смешно было, когда проект с celery 3.x решили под Python 3.7 запустить. В одной из не самых свежих либ была строка вида
import foo.async.bar
— получаемSyntaxError
:)Sovetnikov
08.10.2019 14:56Смеяться будете, мы запустили celery 4.2 под Python 3.7 и нам пришлось переименовывать async в asynchronous.
snp
08.10.2019 20:33Это в kombu 4.2.0 пофикшено было полтора года назад. Сама селери 4.3.0 стабильная сейчас. Собственно, в вышеупомянутом проект бампнули селери до 4.3.0 и никаких проблем.
Sovetnikov
08.10.2019 21:14Вы будете смеяться дальше, но нам пока некогда и нет смысла переходить дальше Django 1.8, а celery 4.3 не дружит если у тебя ниже 1.9
Потрясающая зависимость celery и Django :)
Заниматься этим всем и ждать новых сюрпризов смысла не вижу, есть положительный опыт с другой библиотекой, проще перейти.
Sovetnikov
08.10.2019 23:03Уж до кучи, забыл совсем — flower крайне негативные эмоции вызывает, история задач вечно куда-то теряется, фильтровать нормально нельзя, какие-то пустые поля в задачах часто бывают, чувство потери контроля в сравнении со старым celery cam и djcelery
Cykooz
08.10.2019 00:50А почему вы не воспользовались тем, что «таска» в Celery — это на самом деле и есть класс, который налету создаётся декоратором app.task() с использованием базового класса celery.Task? С давних пор можно было самому писать «таски» в виде классов унаследованных от celery.Task.
Минус был только в том, что их не так удобно регистрировать в Celery-приложении, как это делается с декоратором. И ещё в минус можно записать невозможность переопределить базовый класс через настройку Celery.kai3341 Автор
08.10.2019 01:01Хороший вопрос. Действительно, можно указать базовый класс для таска. Проблема однако в том, что:
- Проблему бойлерплейта он не решает
- Я боялся нарваться на коллизию имён. Хотелось отделить логику задачи от логики celery
- Минусы вы назвали сами. Частно говоря, вы их назвали больше, чем я знал
- И вкусное: а мой подход не запрещает их комбинировать
Cykooz
08.10.2019 01:09Никто не мешает написать свой «базовый» класс на основе оригинального, что бы спрятать в него бойлерплейт. Я так и сделал — запихал туда обработку ошибок, логирование и хитрую инициализацию энвайромента.
Хотя… вот вам самый важный «минус» — экземпляр класс-таски в Celery создаётся только один, и потому его нельзя использовать для хранения данных выполняемой таски. Для этого можно использовать «словарик» Task.request — он создаётся каждый раз перед запуском таски.kai3341 Автор
08.10.2019 01:15Ох, точно, это и была та самая причина, по которой я отбросил затею с классами Task. Знатно подо мной тогда стул прогорел — а в чём смысл класса?
А обработку ошибок я в примерах и предложил направить в метод task. Разве что всё под рукой — в базовом классе, а не снаружи
Tihon_V
Я для себя ранее открыл пару аналогичных либ, в некоторых — данная проблема обработки рутинных задач решена:
— Dramatiq
— Huey
Наследование реализую с помощью механизма pipeline и functools.partial. Увы Celery — не серебряная пуля ;)
P.S.: В данный момент — все чаще использую pika или aiopika.
kai3341 Автор
Большое спасибо, в ближайшее время ознакомлюсь