![image](https://habrastorage.org/webt/4w/u0/gt/4wu0gtw9jsubehn772rd2ezlouo.jpeg)
Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.
Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.
Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.
1. Задачи по расписанию
Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.
В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.
Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.
# schedule.py
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'publish_post_starter': {
'task': 'publish_post_starter',
'schedule': timedelta(minutes=1),
},
'end_trial_starter': {
'task': 'end_trial_starter',
'schedule': crontab(hour=10, minute=21),
},
}
В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.
# tasks.py
@app.task(name='publish_post')
def publish_post(post_id):
...
@app.task(name='publish_post_starter')
def publish_post_starter():
post_ids = list(
Post.objects.filter(
publish_dt__lte=timezone.now(),
is_published=False
).values_list('id', flat=True)
)
for post_id in post_ids:
publish_post.delay(post_id)
2. Долгие вычисления и вызовы API из WSGI
Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.
Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.
В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.
# rest_views.py
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from tasks import send_emails
class SendEmailView(APIView):
def post(self, request):
# this id will be used to send response with websocket
request_uuid = request.data.get('request_uuid')
if not request_uuid:
return Response(status=status.HTTP_400_BAD_REQUEST)
send_emails.delay(request.user.id, request_uuid)
return Response(status=status.HTTP_200_OK)
Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.
3. Вызовы из Tornado
При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запросов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.
Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть IO операций (работа с БД, файлами). Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.
# tornado_handlers.py
from tornado import gen, escape
from tornado.web import RequestHandler
from tasks import handle_vk_callback
class VkCallbackHandler(RequestHandler):
@gen.coroutine
def post(self, *args, **kwargs):
try:
data = escape.json_decode(self.request.body)
except ValueError:
self.set_status(status_code=400, reason='Invalid data')
return
handle_vk_callback.delay(data)
self.write('ok')
return
Комментарии (12)
andreymal
29.07.2019 18:57Для обработки таких запросов подходит асинхронный фреймворк Tornado.
Что мешает сделать абсолютно такую же вьюху в Django?
Dizast Автор
29.07.2019 19:26Django будет обрабатывать эти запросы синхронно. А их может быть сильно больше чем к основному приложению. Tornado или другой асинхронный фреймворк лучше справляются с этой задачей.
andreymal
29.07.2019 19:30Django будет обрабатывать эти запросы синхронно.
В вашем коде Tornado тоже нет ни единой строчки асинхронного кода — разницы не будет никакой.
А их может быть сильно больше чем к основному приложению.
И они всё равно будут обрабатываться быстро, потому что отправка задачи в Celery это единственное, чем будет заниматься этот код, так что это совершенно не проблема. (Ну, если речь идёт не о тысячах запросов в секунду, но на тысячах запросов в секунду вылезет куча других проблем)
resetme
29.07.2019 21:39Раз уж упомянули про вебсокеты, то привели бы для примера вариант реализации. Это было бы одно из интереснейших мест статьи.
Assargin
30.07.2019 10:17+1Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету
Особенно интересен вот этот момент. Каким образом таск, выполненный celery-воркером, отправляет что-то в конкретное вебсокетное соединение.
Или клиент, получив от endpoint'а uuid celery-задания, сам по сокету периодически опрашивает сервер на тему результата выполнения этого задания, и сервер возвращает текущий статус задания и результат его выполнения (если он есть)?Dizast Автор
30.07.2019 15:12+1Это можно реализовать, например, с помощью Tornado или asyncio + Redis pub/sub + sockjs (https://github.com/leporo/tornado-redis#pubsub).
На фронте генерируем случайный uuid, подписываемся и отправляем в запросе на бэкенд. В Celery результат таска пишем в Redis канал с тем uuid в имени. Результат отправляется подписанному клиенту.resetme
30.07.2019 19:29А зачем на фронте генерировать UUID? Клиент хоть что сможет передать на сервер. Это же дыра в безопасности.
Dizast Автор
30.07.2019 19:40Чтобы на конкретный запрос вернуть данные, запрос от клиента, там и request_uuid генерится. У одного юзера во время одной сессии может быть сгенерировано много таких запросов с разным request_uuid.
Еще user_id используется в имени канала.resetme
30.07.2019 19:51А зачем вообще генерировать UUID на фронте? Каждой задаче Celery назначается UUID, вот его и нужно отдавать клиенту, чтобы он дергал вьюшку с этим UUID, а в ней проверяется завершилась ли эта задача. Зачем все переусложнять с pubsub?
Dizast Автор
30.07.2019 19:58По-моему с pubsub не сложно и вью не дергается, когда результата еще нет.
rSedoy
Раз упомянули uWSGI, то для решения первых двух задач, можете использовать его spooler, уменьшите внешние зависимости проекта.
Dizast Автор
Спасибо.
В статье заменил uWSGI на WSGI, так корректней будет.