Последние полгода я начал задумываться о том, чтобы уходить с любимого Python куда‑нибудь в сторону Rust или Go, потому что, как ни крути, на нём становится писать больновато, когда дело касается каких‑то более «интересных» задач. Со мной, конечно, многие поспорят, но я продолжу смотреть на оборачивание всего, что заблокирует GIL, в различные функции библиотек asyncio или threading, как один большой костыль относительно эстетичного синтаксиса Python.

Недавно, я столкнулся с задачей, когда с проекта на Python нужно было стряхнуть пыли и заставить работать чуточку производительнее. В следствии чего монолит был распилен на микросервисы, а брокером между сервисами стали всем знакомый RabbitMQ и такой же старый как сам Python - Celery. Проект был перенесен с Django на FastAPI, который по-моему субъективному мнению является идеальным решением для любых бэкендов на Python, если мы не говорим о чём-то высоконагруженном, где с питона стоит слезть на другой язык. Вообще, микросервисы это то, что даёт возможность разработать большую часть кодовой базы дёшево, выделив уязвимые места в микросервисы на других языках.

Начнём с конфигурации docker-compose файла:

version: '3.8'

services:
  db:
    image: postgres:15.1-alpine
    env_file:
      - ./.env
    volumes:
      - postgres_data:/var/lib/postgresql/data/

  app:
    build: ./backend
    depends_on:
      - db
    env_file:
      - ./.env
    ports:
      - "8000:8000"
    volumes:
      - ./backend/src:/app/

  ...

  rabbit:
    image: rabbitmq:3.11.9-management
    hostname: rabbit
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 1147483648
    volumes:
      - ./rabbitmq:/var/lib/rabbitmq

  flower:
    image: mher/flower
    environment:
      - CELERY_BROKER_URL=amqp://admin:admin@rabbit:5672//
      - RESULT_BACKEND=rpc://
      - FLOWER_PORT=5555
    ports:
      - "5555:5555"
    depends_on:
      - rabbit

  worker:
    build: ./backend
    command: python -m celery -A celery_app.celery worker --loglevel=info -Q celery --logfile=celery_app/celery.log
    volumes:
      - ./backend/src:/app/
    env_file:
      - ./.env
    depends_on:
      - rabbit
    environment:
      - C_FORCE_ROOT=yes

Для мониторинга задач Celery использовал опять же всем знакомый и до боли простой Flower. Так же дополнительным аргументом для RabbitMQ использовал disk_free_limit для того чтобы растянуть максимально допустимый под сообщения объем памяти. Заострять внимание на каждом Dockerfile я не буду, потому что ничего специфического там нету. Касаемо конфигурации Celery, тоже ничего сложного нету, мануалов полно в интернете. Так что перейдем сразу в сути проблемы, того, с чем конкретно у меня возникли сложности.

Моя реализация подключения к базе данных через алхимию выглядит следующим образом:

engine = create_async_engine(
    DATABASE_URL,
    echo=True
)

session: async_sessionmaker[AsyncSession] = async_sessionmaker(
    engine,
    expire_on_commit=False
)

К моему разочараванию в Celery так и не появилось ничего нового и интересного. Для того, чтобы использовать асинхронную сессию необходимо использовать асинхронные функции, а значит необходимо обернуть эту функцию во что-то, чтобы celery не ругался.

Первым делом я получил loop в глобальной области моего файла tasks.py, который хранил в себе все таски для Celery (у меня их, если что всего 4). Выглядело это так:

loop = asyncio.get_event_loop()

Так же мою сессию необходимо было обернуть в функцию async_scoped_session, чтобы избежать ошибок связанных с одновременным подключением к сессии нескольких instanc'ов приложений (воркера и самого FastAPI). Выглядела она следующим образом:

@asynccontextmanager
async def scoped_session():
    scoped_factory = async_scoped_session(
        session,
        scopefunc=current_task,
    )
    try:
        async with scoped_factory() as s:
            yield s
    finally:
        await scoped_factory.remove()

Важным в этом коде является то, что мы в конце вызываем метод .remove() необходимый для корректного завершения сессии, что применительно только к scoped session (в случае обычной сессии всю работу с открытием и закрытием скрывает под собой контекстный менеджер). Подробнее про это вы можете почитать в документации. После всех проделанных операций теперь мы можем использовать нашу сессию с помощью:

async with scoped_session() as s:
    await s.execute(...)

Что касается Celery, т.к. мы не имеем возможности использовать async функции, то нам нужно будет вынести всю асинхронщину в отдельные функции и воспользоваться тем самым loop, лежащим в tasks.py. В таком случае наша таска будет выглядеть примерно таким образом

@shared_task(
    bind=True,
    name='celery:test'
)
def test_task(self, data: dict, prices: dict):
  result = loop.run_until_complete(здесь_ваша_асинхнонная_функция(и, аргументы))
  return result

После всех проделанных манипуляций, всё завелось и работает корректно и быстро. Если у кого-то есть идеи лучшей реализации - милости прошу в комментарии. Также обращу внимание, что я не считаю свой вариант самым правильным, потому что всегда найдётся человек, знающий в конкретной области больше меня или тебя. Надеюсь на объективную критику и хоть какую-то пользу от написанного. Подписывайтесь, ставьте лайки, всем удачи, всем пока!

Комментарии (11)


  1. mgis
    00.00.0000 00:00
    +1

    Спасибо за статью. У меня также была такая же проблема.
    Тогда я сделал все просто

    import asyncio

    ну и просто
    settings = asyncio.run(crud_settings.get_settings_by_clinic_id(db, clinic_id))

    где crud_settings.get_settings_by_clinic_id() асинхронная функция с асинхронной сессией. Я уже тогда чувствовал что это неправильно, но решения на тот момент не нашел.


  1. baldr
    00.00.0000 00:00
    +2

    У меня раньше Django+Celery использовался много где и, в свое время, по многим граблям этого фреймворка я попрыгал.

    Не знаю как в последних версиях (я застрял на 4.х), но раньше (лет 10 назад, хмм) celery очень не любил когда вы запускали какие-нибудь потоки (Thread) внутри тасков. Какие-то жуткие вещи там происходили, помнится - например, воркер мог форкнуться и забрать с собой часть ресурсов и потоков. В общем-то, eventloop это тоже касается.

    Первым делом я получил loop в глобальной области моего файла tasks.py

    ?? А что вы делаете когда воркер форкается?

    Если я использую asyncio внутри таски, то у меня loop создается каждый раз при запуске задачи. Чаще всего надо просто скачать 10-100 файлов параллельно.

    В новых проектах тоже часто использую FastAPI, но если нужны бэкграунд-таски, то часто беру celery, но не делаю их асинхронными. Пул запускается в отдельном контейнере и принимает задачи через RabbitMQ. Интернет предлагает запускать таски через celery напрямую из асинхронных тасков или через бэкграунд-воркера (тред), но мне такое решение не понравилось и я просто слегка реверснул механизм запуска таски - публикую сообщение в очередь RabbitMQ асинхронно через aio_pika, а если нужны результаты - то они приходят в Redis, причем если таска запускает другие таски, то стек корректно раскручивается.

    Код запуска таски
    
    import json
    import asyncio
    from uuid import uuid4
    
    import aio_pika
    
    
    class CeleryRMQConnector:
        def __init__(self, conn_str: str):
            if not conn_str.startswith('amqp://') and not conn_str.startswith('amqps://'):
                raise ValueError("CeleryRMQConnector can use only AMQP broker")
            self.conn_str = conn_str
            self._rmq = None
            self._rmq_channel = None
    
        async def _get_connection_channel(self):
            if not self._rmq:
                self._rmq = await aio_pika.connect_robust(
                    self.conn_str,
                )
                self._rmq_channel = await self._rmq.channel()
            return self._rmq_channel
    
        async def send_task(self, task_name, queue_name, task_kwargs, expires=None):
            task_id = uuid4().hex
            channel = await self._get_connection_channel()
            await channel.default_exchange.publish(
                aio_pika.Message(
                    body=json.dumps([[], task_kwargs,
                                     {"callbacks": None, "errbacks": None, "chain": None, "chord": None}]).encode(),
                    correlation_id=task_id,
                    priority=0, delivery_mode=2,
                    # reply_to=self.result_queue_name,
                    reply_to=None,
                    content_type='application/json',
                    content_encoding='utf-8',
                    message_id=None,
                    expiration=expires or 60 * 60,
                    headers={
                        'argsrepr': "[]",
                        'kwargsrepr': "{}",
                        'group': None,
                        'origin': "gen@blablabla",
                        'retries': 0,
                        'expires': expires,
                        'id': task_id,
                        'root_id': task_id,
                        'task': task_name,
                        'lang': 'py',
                    },
                ),
                routing_key=queue_name,
            )
            return task_id
    

    Здесь не закрывается соединение при завершении - по-хорошему надо бы использовать AsyncExitStack или контекстный менеджер.

    Однако, celery очень тяжелая и внутри происходит дочерта всякой магии, достаточно и костылей. Раньше я очень активно использовал celery или отдельно kombu. На что-то другое перейт непросто когда есть уже некоторый багаж наработок..


    1. kield Автор
      00.00.0000 00:00

      Я долго ломал себя на то чтобы браться за Celery, но и на aiopika в сожалению ранее нигде не натыкался. Сейчас глянул доку мельком, очень понравилось. Спасибо за ваши комментарии, пойду тыкать). Для контроля ресурсов вешал на воркера Cgroups и в случае чего он килял процесс, а докер его снова заводил. Касаемо запуска тасок внутри тасок: всяким образом старался обходить такие вещи, потому что я всегда работаю с Celery как с чем-то, что может в какой-то момент повести себя непредсказуемо и делаю так, чтобы он выполнял не более одной задачи.


  1. healfy
    00.00.0000 00:00
    +1

    А почему не использовать aiopika? Селери впринципе не подразумевает использование async


  1. Jack444
    00.00.0000 00:00

    Мне кажется Celery давно устарел и писался он под python2, почему бы ну упростить всё до asyncio.Queue и multiprocessing.Queue ?


  1. 0x131315
    00.00.0000 00:00

    я начал задумываться о том, чтобы уходить с любимого Python куда‑нибудь в сторону Rust или Go, потому что, как ни крути, на нём становится писать больновато, когда дело касается каких‑то более «интересных» задач.

    Аналогично, только в моем случае php и java


  1. RH215
    00.00.0000 00:00

    >я продолжу смотреть на оборачивание всего, что заблокирует GIL, в различные функции библиотек asyncio или threading, как один большой костыль относительно эстетичного синтаксиса Python

    Эм, а где-то возможна конкурентность без эвент-машин, потоков или процессов?


    1. kield Автор
      00.00.0000 00:00

      Я тут имел ввиду отсутствие нативной поддержки асинхронности языка. Питон имеет библиотеки для того чтобы это работало, но под капотом все равно имеем дело с синхронным GIL. Взять то же GO, который имеет встроенные в язык горутины, который был создан уже после того как в мире появился компьютер с двумя ядрами. И да, питон хорош в определенном спектре задач, но не на столько хорош по скорости. Имея такой подход к асинхронности у нас появляется множество проблем касаемо того, как управлять этим сторонним, не присущим ядру языку потоку.


      1. RH215
        00.00.0000 00:00
        +1

        Ну на практике в большинстве места, где используется python можно просто запустить два инстанса рядом. Асинхронщина тут даже в плюс: помогает обрабатывать кучу i/o-bound одним процессом, а если у нас нужна конкурентность для cpu-bound то и голый python как-то использовать не нужно. Так что я не думаю, что главной проблемой производительности python является именно GIL. Главной проблемой является плохая оптимизируемость из-за утиной типизации и жирных объектов.

        Другое дело, что вся асинхронщина в python всё ещё не является достаточно зрелой, да и реализовано это далеко не так хорошо, как в более новых платформах и языках.


        1. kield Автор
          00.00.0000 00:00
          +1

          Опять же, я не говорил что GIL это главная проблема в скорости Python. В данной статье и в области этих комментариев речь шла про асинхронность в Python. С остальным соглашусь. И возможно добавлю с этот список "некомпилируемость", интерпретация на лету тоже раскладывает свои грабли.


  1. dx-77
    00.00.0000 00:00
    +1

    Если хочется именно Celery "сделать" асинхронным, то можно действовать примерно так:

    class AsyncTask(celery.Task):
        async def apply_async(self, *args, **kwargs):
            ...
    
        async def async_run(self, *args, **kwargs):
            ...
    
        def __call__(self, *args, **kwargs):
            return self._get_app().loop.run_until_complete(self.async_run(*args, **kwargs))
    
    
    class AsyncCelery(celery.Celery):
        task_cls = AsyncTask
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self._loop = None
    
        @property
        def loop(self):
            if self._loop is None:
                try:
                    self._loop = asyncio.get_running_loop()
                except RuntimeError:
                    self._loop = asyncio.new_event_loop()
            return self._loop
    
    
        @property
        async def async_connection(self):
            # тут работа с aio_pika
    
        async def send_task_message(self, *args, **kwargs):
            # тут работа с aio_pika
    
        async def send_task(self, *args, **kwargs):
            ...
    
    celery_app = AsyncCelery('project')
    
    @celery_app.task
    async def execute_something(*, app, **kwargs):
          ...
    
     # ну и соответственно в коде для отложенного вызова таски мы будем писать вот так
    ...
    await execute_something.apply_async(**kwargs)
    ...

    Плюсы, на мой взгляд, очевидны, ну а минусы - чем больше мы используем фишек Cелери, тип больше будет разрастаться класс AsyncCelery.

    З.Ы. @kield Спасибо за статью. И не надо уходить из Питона. Python рулит)