В своей прошлой статье "Как подружить Celery и SQLAlchemy 2.0 с асинхронным Python" я разбирал возможность запускать асинхронные задачи "из-под Celery" и в комментариях мне сообщили о существовании ещё одной библиотеки под названием aio_pika. И признаться, о ней я раньше никогда не слышал. Оно и не удивительно, библиотека имеет всего в районе 1К звёзд на GitHub (по сравнению с 20К+ у Celery). Я рассмотрел абсолютно все популярные (500+ звёзд) решения и остановился именно на этом из-за активной (на текущий момент) разработке и относительной популярности.

Стек, который вы увидите в статье: FastAPI, RabbitMQ, aio_pika и docker. Статья будет полезна тем кто использует Celery в своих проектах, а так же тем, кто только слышал о том, что такое очереди и RabbitMQ.

Навигация:

  1. Конфигурация RabbitMQ

  2. Task router для consumer'a

  3. Написание consumer'a

  4. Интеграция в основное приложение

Предисловие

Библиотека позиционирует себя "обёрткой aiormq для asyncio для людей". Моей целью стало заменить Celery, используемый в проекте на неё. Решил я это сделать из-за того, что его интерфейс не предполагает разбиение приложения и worker'ов в отдельные сервисы, чего очень хотелось бы. Второстепенными причинами стали: отсутствие асинхронности, запах legacy (я про атрибут self, который необходимо писать первым аргументом функций) и отсутствие type-хинтов. Celery в проекте использовался для IO-Bound и Delay задач, поэтому интеграция асинхронности была очень кстати.

Конфигурация RabbitMQ

Я обновил свой RabbitMQ добавив плагин "RabbitMQ Delayed Message Plugin". Он нужен был для того, чтобы делать "отложенные" задачи (выполняются по истечении определённого времени). Celery с этим справлялся, т.к. у него была нативная поддержка данной фичи, но aio-pika такого не имеет. Этот плагин позволяет добавить этот функционал в сам RabbitMQ. Мой docker-compose конфиг стал выглядеть следующим образом:

docker-compose.yaml
 rabbit:
    image: rabbitmq:3-management
    hostname: rabbit
    env_file:
      - .env
    volumes:
      - ./services/rabbit/delayed_message.ez:/opt/rabbitmq/plugins/delayed_message.ez
      - ./services/rabbit/enabled:/etc/rabbitmq/enabled_plugins
    ports:
      - "15672:15672"

Через volumes я подключил скачанный плагин, а так же добавил его в список активированных по умолчанию. Мой enabled_plugins файл выглядел следующим образом:

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

*Точка в конце обязательна

Task router для consumer'a

Следующим этапом я написал Router для моего worker'а, который был бы для меня удобен. На этом моменте я немного заморочился:

router.py
class Router:
    _routes: dict[str, list[str]] = {}

    def __init__(self):
        modules = list(filter(
            lambda x: x != '__init__',
            map(lambda y: y.split('.')[0], os.listdir('tasks'))
        ))
        for module in modules:
            imported = import_module(f'tasks.{module}')
            if not hasattr(imported, '__all__'):
                continue
            self._routes[module] = imported.__all__
            del imported
    def get_method(self, action: str) -> Optional[Callable]:
        module = action.split(':')[0] # Название файла
        method = action.split(':')[1] # Название функции
        if self._exists(module, method):
            return getattr(import_module(f'tasks.{module}'), method)

Переменная _router заполняется задачами, которые расположены в папке tasks, в которой лежат сами функции (задачи). Так же они указаны в переменной all для экспорта. Для наглядности задачи выглядела примерно так:

async def test(is_test: bool):
    print(f'Hello world! Value is: {is_test}')

__all__ = ['test']

Следующей задачей предстояло решить проблему с тем, что эти функции имеют произвольное количество аргументов. Я написал ещё один метод для роутера, который мог бы учесть и это:

router.py
def check_args(func: Callable, data: dict) -> bool:
    hints = get_type_hints(func)
    for arg, arg_type in hints.items():
        if arg not in data:
            return False
        if not isinstance(data[arg], arg_type):
            return False
    return True

Мы передаем в данный метод функцию, которую импортировали из файла, а так же данные, которые пытаемся ей подсунуть. Мы так же проверяем типы указанные в аргументах функции. Если всё ок - то возвращаем True

Таким образом я регулировал количество доступных задач созданием \ удалением файлов из папки tasks. Это оказалось очень удобным и гибким решением.

Написание consumer'a

consumer.py

async def process_message(message: AbstractIncomingMessage):
    async with message.process():
        message = MessageSchema.parse_obj(json.loads(message.body.decode()))
        method = router.get_method(message.action) # Импортируем функцию и записываем в переменную
        if method:
            if not router.check_args(method, message.body): # Проверяем атрибуты, которые собираемся передавать
                print('Invalid args')
                return
            if inspect.iscoroutinefunction(method): # Проверяем является ли функция async или нет
                await method(**message.body)
            else:
                method(**message.body)


async def main() -> None:
    queue_key = rabbit_config.RABBITMQ_QUEUE

    connection = await aio_pika.connect_robust(rabbit_config.url)
    # Для корректной работы с RabbitMQ указываем publisher_confirms=False
    channel = await connection.channel(publisher_confirms=False)
    # Кол-во задач, которые consumer может выполнять в момент времени. В моём случае 100
    await channel.set_qos(prefetch_count=100)
    queue = await channel.declare_queue(queue_key)
    
    exchange = await channel.declare_exchange(
        # Объявляем exchange с именем main и типом, который поддерживает отложенные задачи
        # Важно чтобы это имя (main) совпадало с именем на стороне publisher
        'main', ExchangeType.X_DELAYED_MESSAGE, 
        arguments={
            'x-delayed-type': 'direct'
        }
    )
    await queue.bind(exchange, queue_key)
    await queue.consume(process_message)
    try:
        await asyncio.Future()
    finally:
        await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

В целом на этом сторона consumer'a закончена и можно приступить к интеграции всего этого добра в основное приложение (publisher).

Интеграция в основное приложение

На помощь снова приходит ООП и я написал класс для работы с aio-pika, который полностью закрыл мои нужды. Его инициализация происходила в новеньком lifespan (который скоро полностью вытолкнет старые способы):

@asynccontextmanager
async def lifespan(_: FastAPI):
    await rabbit_connection.connect()
    yield
    await rabbit_connection.disconnect()

app = FastAPI(lifespan=lifespan)

Далее идет реализация этого класса:

rabbit_connection.py
class RabbitConnection:
    _connection: AbstractRobustConnection | None = None
    _channel: AbstractRobustChannel | None = None
    _exchange: AbstractRobustExchange | None = None

    async def disconnect(self) -> None:
        if self._channel and not self._channel.is_closed:
            await self._channel.close()
        if self._connection and not self._connection.is_closed:
            await self._connection.close()
        self._connection = None
        self._channel = None

    async def connect(self) -> None:
        try:
            self._connection = await connect_robust(rabbit_config.url)
            self._channel = await self._connection.channel(publisher_confirms=False)
            self._exchange = await self._channel.declare_exchange(
                # Повторяем из consumer'a. Важно указать одинакое
                # имя exchange'ов. В моём случае `main`
                'main', ExchangeType.X_DELAYED_MESSAGE,
                arguments={
                    'x-delayed-type': 'direct'
                }
            )
        except Exception as e:
            await self.disconnect()

    async def send_messages(
            self,
            messages: list[MessageSchema],
            *,
            routing_key: str = rabbit_config.RABBITMQ_QUEUE,
            delay: int = None # Задержка, через которое нужно выполнить задачу (в секундах)
    ) -> None:
        async with self._channel.transaction():
            headers = None
            if delay:
                headers = {
                    'x-delay': f'{delay * 1000}' # Это тоже из документации плагина для RabbitMQ
                }
            for message in messages:
                message = Message(
                    body=json.dumps(message.dict()).encode(),
                    headers=headers
                )
                await self._exchange.publish(
                    message,
                    routing_key=routing_key,
                    mandatory=False if delay else True # Чтобы в логах был порядок ;)
                )


rabbit_connection = RabbitConnection()

В итоге для того, чтобы отправить работки worker'у достаточно было сделать следующее:

main.py
@router.get('/test')
async def test():
    message = MessageSchema(
        action='images:delete',
        body={'path': 'assets/temp/temp.png'}
    )
    await rabbit_connection.send_messages(
      [message for _ in range(150)], 
      delay=20
    )
    return {'status': 'published'}

Подводя итоги хочется сказать что worker теперь чувствует себя намного увереннее и может выполнять намного больше и быстрее. Надеюсь статья оказалась полезной. Всем спасибо, всем пока.

Комментарии (11)


  1. kai3341
    21.05.2023 23:17
    +2

    Хорошая замена Celery

    На самом деле не очень. Посмотрите в сторону https://taskiq-python.github.io/


    1. kield Автор
      21.05.2023 23:17
      -1

      Смотрел, но как было сказано в статье для меня было важным критерием 500+ звёзд. Версия библиотеки 0.5, но на главной странице заявлено что она production ready. Что тоже вызвало некоторые сомнения. Хочу и могу назвать библиотеку сырой, хотя выглядит она вполне себе неплохо. Возможно позже она заменит всех вышеперечисленных, но пока что я предпочту подождать


      1. kai3341
        21.05.2023 23:17
        +2

        для меня было важным критерием 500+ звёзд

        Вы так говорите, словно миллионы мух не могут ошибаться </шутка>. Звёзды значат только популярность, и ничего более. "Сырость" версии библиотеки также мало что значит -- надо в сорцы смотреть


        1. ri_gilfanov
          21.05.2023 23:17
          +2

          Причём, включая популярность вида "узнал о библиотеке, нагуглил и поставил звезду, чтобы не забыть и когда-нибудь пощупать". Для многих раздел stars в профиле GitHub -- это просто раздел закладок.

          Больше смысла в оценке проекта может иметь количество контрибьютеров и характер их коммитов. Если в кодовую базу проекта глубоко погружены два открытых к диалогу человека -- это может быть лучше раскрученного проекта одиночки, пропадающего на полгода или переводящего все неинтересные ему Issues в Disscussions.


        1. kield Автор
          21.05.2023 23:17
          +1

          Я понимаю что версионность каждый ведёт как хочет. Но у меня был неприятный опыт использования подобной библиотеки. Дважды наступать на грабли я не хочу. Такие библиотеки (непопулярные) генерируют намного меньше issues оттого багов в ней может быть потенциально больше. Не поймите меня не правильно, но писать на главной странице что мы готовы в продакшену когда проект только-только появляется это тоже перебор. Я хочу чтобы библиотека жила и развивалась. Я чуть было не взял AMQ, который тут тоже любят рекомендовать. Только вот уже более полугода без релизов. И да, останусь при своём, звёзды в большинстве случаев отражают приблизительную реальность. И если бы я смотрел только на звёзды - тогда бы я остался использовать Celery, ведь там их в 20 раз больше, не так ли?)


          1. klamas
            21.05.2023 23:17

            Я знаю крупный проект, который использует эту либу в проде. Но решать конечно вам)


    1. Ryav
      21.05.2023 23:17
      +1

      Выглядит интересно, спасибо. А для замены flower есть что или придётся городить что-то своё?


      1. baldr
        21.05.2023 23:17
        +1

        Кстати, интересный вопрос. Я, было, хотел вам ответить что придется свое, но решил погуглить.

        Есть, вроде бы, проект open-telemetry, у которого есть раздел "OpenTelemetry Aio-pika Instrumentation" на гитхабе. Также есть неких helios, который тоже что-то может делать с aio-pika. Я лично ни с чем из них дела не имел и ничего не могу сказать, но вот есть.

        UPD: заметил что вы спрашивали про taskiq, сорри. С ним тоже не работал, но у него заявлена поддержка Prometheus для сбора метрик.


  1. baldr
    21.05.2023 23:17
    +2

    Да, звезды на гитхабе мало что показывают.

    aio-pika - это развитие проекта pika, который, пожалуй, даже будет постарше amqplib (используется в kombu/celery). В древние времена pika уже была, но предоставляла слишком уж низкоуровневый интерфейс и kombu выглядел гораздо удобнее, особенно позволяя использовать разные брокеры (RabbitMQ, Redis, Postgresql, ...).

    В статье все-таки вы некорректно сравниваете celery с aio-pika. Это немного разные уровни. Celery - это полноценный фреймворк, который позволяет кучу разных вещей, управляя воркерами-процессами, конфигурируя очереди и сообщения. Он базируется на kombu, который берет на себя оркестрацию разными протоколами, предоставляя одинаковую абстракцию над ними - организует очереди одинаковым образом в разных брокерах.

    aio-pika можно ограниченно сравнивать с kombu при работе только с AMQP. Возможно, кстати, вам бы лучше подошел механизм RPC в ней. Однако, по сравнению с celery вам придется писать гораздо больше своего кода для контроля выполнения - воркер может тихо помереть,потерять задачу, она может уйти по таймауту, результат может не записаться и тп. celery все это берет на себя. Однако, он, конечно, уже устарел и оброс кучей тяжелого кода.

    Кстати, разделение кода для celery - это вполне реально. Разный код, с разными тасками, но подсоединяясь к одной очереди вы можете вызывать любую таску, передав ее имя в сообщении.


    1. kield Автор
      21.05.2023 23:17

      С разделением кода для celery звучит конечно не очень) пожалуй напишу больше кода для aio-pika

      На счёт сравнения с Celery. Во первых, если прочитать статью что заголовок означает то, что я выкинул из проекта Celery заменив на aio-pika и стало лучше. Во вторых это отчасти уместно сравнивать т.к. в большинстве мест где используется Celery можно было бы обойтись именно такими быстрыми и лёгкими вариантами вокруг AMPQ. Celery старый и надёжный ящик. О котором все знают и берут его для работы с очередями и для отложенных задач. Цель статьи - распространять далее идею о том что celery это не единственное что можно для этого взять.

      На счёт RPC, да, к сожалению обратил на него внимание слишком поздно. Можно было бы и через него. Но в целом даже смотря сейчас на свой результат, мне он понравился, и не так много кода я написал, чтобы этого достичь.


      1. baldr
        21.05.2023 23:17
        +1

        Да все в порядке, не воспринимайте мой комментарий как критику - скорее просто как дополнение. Статья, в целом, правильная и первый плюс за нее был мой.

        Я и сам, в последнее время, все чаще использую разные сервисы на aio-pika и все реже celery. Однако, кстати, есть класс задач, в котором асинхронные воркеры не дадут особого выигрыша - это разные ML-задачи и вообще CPU-тяжелые обсчеты. Делить процессор они никому не дадут и здесь celery с их управлением воркерами-процессорами справляется довольно хорошо - как и было у вас в предыдущей статье.