Привет, Хабр! Я хочу рассказать, как я решал проблему эффективного конкурентного исполнения asyncio задач в Celery.

КДПВ

Введение


Celery — большой проект со сложной историей и тяжким бременем обратной совместимости. Ключевые архитектурные решения принимались задолго до появления asyncio в python. Поэтому тем более интересно, что запустить как-нибудь asyncio-задачу можно в celery из коробки.

Зануда mode on
Формально после #839. Субъективно для меня переименование пакета не меняет архитектуру приложения. Зануда mode off.

Запускаем asyncio задачи в ванильном celery


Запустить asyncio задачу можно из коробки:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

Очевидные плюсы:

  • Работает же!
  • Просто
  • Нет дополнительных внешних зависимостей

Что, собственно, не так?

  • Event Loop создаётся внутри каждого воркера
  • Переключения контекста между исполняющимися корутинами не происходит
  • В один момент времени воркер исполняет не более одной корутины
  • Общие ресурсы не шарятся между задачами
  • Бойлерплейт

То есть asyncio в данном случае используется ради asyncio, а преимуществ никаких

Попробуем хитрее?


Я боролся за производительность:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

Бинго! Плюсы:

  • Всё ещё работает
  • Даже более-менее эффективно
  • Даже ресурсы шарятся между корутинами в пределах воркера
  • Всё ещё нет дополнительных внешних зависимостей

Бинго? Проблемы не заставили себя ждать:

  • Celery не знает ничего про запущенную корутину
  • Вы теряете контроль над исполнением таски
  • Вы теряете контроль над исключениями
  • Бойлерплейт

Идти с таким чудесным инженерным решением в прод у меня как-то рука не поднялась

Постановка задачи


  • Должно работать
  • Корутины должны исполняться конкурентно
  • Ресурсы должны шариться между множеством исполняемых тасок
  • Никакого бойлерплейта
  • Простой предсказуемый API

То есть, мои ожидания:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

Итоги


Свои идеи я реализовал в библиотеке celery-pool-asyncio. Эта библиотека используется нашей командой в текущем проекте, и мы уже выкатились в прод.

Коротко о возможностях


Помимо непосредственно исполнения asyncio задач, celery-pool-asyncio также решает проблемы:

  • Шедулинг асинхронных задач
  • Поддержка корутин в сигналах celery

Для того, чтобы заставить celery-pool-asyncio работать, я использовал monkey patching. Для каждого применяемого в рантайме патча предусмотрена возможность отключения.

Обо всём этом подробнее можно прочитать в документации

Планы


По-хорошему, нужно интегрировать пул в celery. С другой стороны, разработчики celery во всю прототипируют celery 5.0, который будет асинхронным. Стоит ли игра свеч?

Примеры


Для демонстрации возможностей моих библиотек celery-pool-asyncio и celery-decorator-taskcls (статья) был реализован тестовый проект.

Прочее


Я пытался то же самое рассказать на митапе