image

Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.

Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.

Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.

1. Задачи по расписанию


Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.

В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.

Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.

# schedule.py
from datetime import timedelta
from celery.schedules import crontab


CELERYBEAT_SCHEDULE = {
   'publish_post_starter': {
       'task': 'publish_post_starter',
       'schedule': timedelta(minutes=1),
   },
   'end_trial_starter': {
     'task': 'end_trial_starter',
     'schedule': crontab(hour=10, minute=21),
  },
}

В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.

# tasks.py
@app.task(name='publish_post')
def publish_post(post_id):
   ...

@app.task(name='publish_post_starter')
def publish_post_starter():
   post_ids = list(
       Post.objects.filter(
           publish_dt__lte=timezone.now(),
           is_published=False
       ).values_list('id', flat=True)
   )

   for post_id in post_ids:
       publish_post.delay(post_id)

2. Долгие вычисления и вызовы API из WSGI


Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.

Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.

В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.

# rest_views.py
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response

from tasks import send_emails


class SendEmailView(APIView):
   def post(self, request):
       # this id will be used to send response with websocket
       request_uuid = request.data.get('request_uuid')
       if not request_uuid:
           return Response(status=status.HTTP_400_BAD_REQUEST)
      
       send_emails.delay(request.user.id, request_uuid)
       return Response(status=status.HTTP_200_OK)

Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.

3. Вызовы из Tornado


При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запросов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.

Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть IO операций (работа с БД, файлами). Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.

# tornado_handlers.py
from tornado import gen, escape
from tornado.web import RequestHandler

from tasks import handle_vk_callback


class VkCallbackHandler(RequestHandler):
   @gen.coroutine
   def post(self, *args, **kwargs):
       try:
           data = escape.json_decode(self.request.body)
       except ValueError:
           self.set_status(status_code=400, reason='Invalid data')
           return

       handle_vk_callback.delay(data)
       self.write('ok')
       return

Комментарии (12)


  1. rSedoy
    29.07.2019 18:35
    +1

    Раз упомянули uWSGI, то для решения первых двух задач, можете использовать его spooler, уменьшите внешние зависимости проекта.


    1. Dizast Автор
      29.07.2019 20:31

      Спасибо.
      В статье заменил uWSGI на WSGI, так корректней будет.


  1. andreymal
    29.07.2019 18:57

    Для обработки таких запросов подходит асинхронный фреймворк Tornado.

    Что мешает сделать абсолютно такую же вьюху в Django?


    1. Dizast Автор
      29.07.2019 19:26

      Django будет обрабатывать эти запросы синхронно. А их может быть сильно больше чем к основному приложению. Tornado или другой асинхронный фреймворк лучше справляются с этой задачей.


      1. andreymal
        29.07.2019 19:30

        Django будет обрабатывать эти запросы синхронно.

        В вашем коде Tornado тоже нет ни единой строчки асинхронного кода — разницы не будет никакой.


        А их может быть сильно больше чем к основному приложению.

        И они всё равно будут обрабатываться быстро, потому что отправка задачи в Celery это единственное, чем будет заниматься этот код, так что это совершенно не проблема. (Ну, если речь идёт не о тысячах запросов в секунду, но на тысячах запросов в секунду вылезет куча других проблем)


  1. resetme
    29.07.2019 21:39

    Раз уж упомянули про вебсокеты, то привели бы для примера вариант реализации. Это было бы одно из интереснейших мест статьи.


    1. Assargin
      30.07.2019 10:17
      +1

      Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету

      Особенно интересен вот этот момент. Каким образом таск, выполненный celery-воркером, отправляет что-то в конкретное вебсокетное соединение.

      Или клиент, получив от endpoint'а uuid celery-задания, сам по сокету периодически опрашивает сервер на тему результата выполнения этого задания, и сервер возвращает текущий статус задания и результат его выполнения (если он есть)?


      1. Dizast Автор
        30.07.2019 15:12
        +1

        Это можно реализовать, например, с помощью Tornado или asyncio + Redis pub/sub + sockjs (https://github.com/leporo/tornado-redis#pubsub).
        На фронте генерируем случайный uuid, подписываемся и отправляем в запросе на бэкенд. В Celery результат таска пишем в Redis канал с тем uuid в имени. Результат отправляется подписанному клиенту.


        1. resetme
          30.07.2019 19:29

          А зачем на фронте генерировать UUID? Клиент хоть что сможет передать на сервер. Это же дыра в безопасности.


          1. Dizast Автор
            30.07.2019 19:40

            Чтобы на конкретный запрос вернуть данные, запрос от клиента, там и request_uuid генерится. У одного юзера во время одной сессии может быть сгенерировано много таких запросов с разным request_uuid.
            Еще user_id используется в имени канала.


            1. resetme
              30.07.2019 19:51

              А зачем вообще генерировать UUID на фронте? Каждой задаче Celery назначается UUID, вот его и нужно отдавать клиенту, чтобы он дергал вьюшку с этим UUID, а в ней проверяется завершилась ли эта задача. Зачем все переусложнять с pubsub?


              1. Dizast Автор
                30.07.2019 19:58

                По-моему с pubsub не сложно и вью не дергается, когда результата еще нет.