Привет! Меня зовут Иван, я бэкенд-разработчик в KTS.

Сегодня расскажу, как в FastAPI эффективно работать с фоновыми задачами и настроить их мониторинг в Prometheus.

В туториалах для фоновых задач в FastAPI обычно предлагают celery и при этом используют синхронный код. Но сегодня в реальной практике такое встречается редко, поэтому в этой статье я покажу, как в фоновых задачах использовать асинхронный код.

В статье опишу 5 вариантов: встроенный в FastAPI Background Tasks и 4 библиотеки — ARQ, SAQ, FastStream, адаптированный к асинхронному коду Celery.

В конце расскажу, как мониторить фоновые задачи.

Оглавление

Что такое фоновые задачи

Для ускорения работы сервиса часть логики часто выносится в фоновые задачи: 

  • периодические задачи по cron

  • запланированные (отложенные) задачи

  • тяжёлый функционал:

    • требовательный к ресурсам — CPU-bound

    • продолжительный по времени — как правило, I/O-bound

Примеры:

  • Отправка писем на почту. Можно сразу отправить пользователю ответ и выполнить отправку письма в фоне. Эта задача может занять несколько секунд, поэтому лучше не заставлять пользователя ждать, пока письмо действительно отправится

  • Обработка данных. При обработке картинки от пользователя, можно отправить ответ "Accepted" (HTTP 202) и перенаправить работу с файлом в фоновую задачу

Что такое I/O-bound и CPU-bound операции

I/O-bound — операция, скорость выполнения которой ограничена скоростью подсистемы ввода-вывода. К таким задачам можно отнести выполнение запросов по сети, операции с базой, чтение/запись файла на диск. 

Например, HTTP-запрос:

r = requests.get('https://api.github.com/user', auth=('user', 'pass'))

CPU-bound — операция, скорость выполнения которой ограничена скоростью CPU. В web-сервисах это обычно работа с медиа: обрезка, ресайз, конвертация картинок.

Постановка задачи в очередь на исполнение

Чтобы поставить задачу в очередь на исполнение, понадобится брокер сообщений — мы возьмём Redis, потому что он подходит для всех четырёх библиотек.

Есть несколько способов запуска фоновых задач, но для унификации воспользуемся вариантом с передачей инстанса BackgroundTask в JSONResponse:

@router.post("")
async def simple() -> JSONResponse:
   task = BackgroundTask(func, *args, **kwargs)
   return JSONResponse({"status": "ok"}, background=task)

В качестве func будем использовать целевую функцию (для BackgroundTask), либо функцию, которая поставит задачу в очередь на исполнение.

Исходный код проекта можно посмотреть тут: https://github.com/ipakeev/fastapi-async-tasks

В нашей компании используется концепция Store, которая является единой точкой входа для всех аксессоров. Проект имеет именно такую архитектуру.

Посмотреть код
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends


class BaseAccessor:
   def __init__(self, store: "Store") -> None:
       self.store = store
       self.config = settings

   async def connect(self) -> None:
       pass

   async def disconnect(self) -> None:
       pass


class Store:
   def __init__(self) -> None:
       # чтобы исключить циклический импорт
       from app.core.accessor import CoreAccessor
       from app.redis.accessor import RedisAccessor
       from app.worker.accessor import WorkerAccessor

       self.core = CoreAccessor(self)
       self.redis = RedisAccessor(self)
       self.worker = WorkerAccessor(self)

   async def connect(self) -> None:
       await self.redis.connect()
       await self.worker.connect()
       await self.core.connect()

   async def disconnect(self) -> None:
       await self.core.disconnect()
       await self.worker.disconnect()
       await self.redis.disconnect()


_store: Store | None = None


def get_store() -> Store:
   assert _store, "Store is not initialized"
   return _store


async def connect_to_store() -> Store:
   global _store

   if not _store:
       _store = Store()
       await _store.connect()

   return _store


async def disconnect_from_store() -> None:
   global _store

   if _store:
       await _store.disconnect()
       _store = None


@asynccontextmanager
async def store_lifespan() -> AsyncGenerator[Store, None]:
   await connect_to_store()
   try:
       yield get_store()
   finally:
       await disconnect_from_store()


StoreDep = Annotated[Store, Depends(get_store)]

Для декларативного подхода можно использовать библиотеки, например dependency-injector.

Теперь рассмотрим все варианты работы с фоновым задачи с использованием асинхронного кода.

FastAPI Background Tasks

Ссылка на GitHub

Встроенный в FastAPI класс для создания фоновых задач. Они будут выполняться сразу после того, как сервер вернул ответ. При этом нет никакой очереди, а результат выполнения нигде не хранится.

from starlette.background import BackgroundTask
from starlette.responses import JSONResponse


# фоновая задача
async def send_email(email: str, message: str = "Hello") -> None:
   ...


@router.post("")
async def hello() -> JSONResponse:
   task = BackgroundTask(send_email, "support@mail.ru", message="Hello")
   return JSONResponse({"status": "ok"}, background=task)

Плюсы

- Удобно использовать

- Очень удобно тестировать, потому что не нужны дополнительные манипуляции

- Не нужно разворачивать брокер сообщений

- Не нужно разворачивать дополнительный сервис (worker)

Минусы

- При большом количестве одновременных I/O-bound задач и даже при небольшом количестве CPU-bound задач сервис деградирует: увеличивается время запросов, снижается количество RPS.

- При отключении/рестарте сервиса незавершённые и не начатые задачи будут утеряны

ARQ

Ссылка на GitHub

Легковесная библиотека, задуманная как асинхронный вариант RQ.

Особенности:

  • Задача обязательно должна принимать первым аргументом объект контекста, в котором можно хранить что угодно

  • Чтобы поставить задачу в очередь, нужно передать её название

Посмотреть код
import arq
import arq.connections


# фоновая задача: функция для отправки эл. почты в фоне
async def send_email(ctx: dict, email: str, message: str = "Hello") -> None:
   store: Store = ctx["store"]
   …


# класс для доступа к фоновым задачам
class WorkerAccessor(AbstractTaskAccessor):
   def __init__(self, *args: Any, **kwargs: Any) -> None:
       super().__init__(*args, **kwargs)

	  # инициализируем пул задач arq
       self.job_pool: arq.ArqRedis | None = None

   async def connect(self) -> None:
	  # устанавливаем соединение с Redis
       self.job_pool = await arq.create_pool(
           arq.connections.RedisSettings(
			# настраиваем параметры соединения
               host=self.config.REDIS_DSN.host,
               port=self.config.REDIS_DSN.port,
               username=self.config.REDIS_DSN.username,
               password=self.config.REDIS_DSN.password,
               database=self.config.REDIS_DSN.path.removeprefix("/"),
           )
       )

   async def disconnect(self) -> None:
       """Для закрытия соединения"""
       if self.job_pool:
           await self.job_pool.close()

   async def run(self) -> None:
       """Для запуска воркера в другом сервисе"""
      
       async def on_startup(ctx: dict) -> None:
		 # устанавливаем объект хранилища в контексте
           ctx["store"] = self.store

       worker = arq.Worker(
		 # указываем фоновые задачи
           functions=[send_email],
           on_startup=on_startup,
           max_jobs=self.config.ARQ_CONCURRENCY,
           health_check_interval=60,
           handle_signals=False,
           redis_pool=self.job_pool,
       )
       try:
		 # запускаем воркер
           await worker.main()
       finally:
		 # закрываем воркер
           await worker.close()
          
   async def send_email(self, email: str, message: str = "Hello") -> None:
       # ставим задачу в очередь на исполнение
       await self.job_pool.enqueue_job(send_email.__name__, email, message=message)

        
# запускаем фоновую задачу отправки эл. почты, вариант 1
@router.post("")
async def hello(store: StoreDep) -> JSONResponse:
   task = BackgroundTask(store.worker.arq.send_email, "support@mail.ru", message="Hello")
   return JSONResponse({"status": "ok"}, background=task)


# запускаем фоновую задачу отправки эл. почты, вариант 2
@router.post("")
async def hello(store: StoreDep) -> dict:
   await store.worker.arq.send_email("support@mail.ru", message="Hello")
   return {"status": "ok"}

Запустить воркер можно таким способом:
async def main() -> None:
   async with store_lifespan() as store:
       await store.worker.arq.run()


if __name__ == "__main__":
   asyncio.run(main())

Плюсы

  • Удобно тестировать (при помощи fakeredis)

  • Можно сделать отложенное выполнение задачи 

  • Запуск задач по расписанию (cron)

  • Встроенный механизм перезапуска (retrying) и отмены задачи

Минусы

  • В качестве брокера сообщений может быть только Redis

SAQ

Ссылка на GitHub

Аналог ARQ со встроенным дашбордом.

Посмотреть код
import saq
from aiohttp.web_runner import AppRunner, TCPSite
from redis.asyncio import Redis
from saq.web.aiohttp import create_app


# фоновая задача
async def send_email(ctx: dict, email: str, message: str = "Hello") -> None:
   store: Store = ctx["store"]
   …


class WorkerkAccessor(BaseAccessor):
   def __init__(self, *args: Any, **kwargs: Any) -> None:
       super().__init__(*args, **kwargs)

       self.queue: saq.Queue | None = None

   async def connect(self) -> None:
       client = Redis.from_url(self.config.REDIS_DSN.__str__())
       self.queue = saq.Queue(client)

   async def disconnect(self) -> None:
       if self.queue:
           await self.queue.disconnect()

   async def run(self, web: bool = False, port: int = 8090) -> None:
      """Для запуска воркера в другом сервисе"""
      
       async def on_startup(ctx: dict) -> None:
           ctx["store"] = self.store

       worker = saq.Worker(
           self.queue,
           functions=[send_email],  # указываем фоновые задачи
           startup=on_startup,
           concurrency=self.config.SAQ_CONCURRENCY,
       )

       if web:
           # запускаем дашборд
           app = create_app([self.queue])
           task = asyncio.create_task(worker.start())

           try:
               runner = AppRunner(app)
               await runner.setup()
               site = TCPSite(runner, port=port)
               await site.start()

               await asyncio.Event().wait()
           finally:
               await worker.stop()
               task.cancel()
       else:
           await worker.start()

   async def send_email(self, email: str, message: str = "Hello") -> None:
       # ставим задачу в очередь на исполнение
       await self.queue.enqueue(send_email.__name__, email=email, message=message)


@router.post("")
async def hello(store: StoreDep) -> JSONResponse:
   task = BackgroundTask(store.worker.saq.send_email, "support@mail.ru", message="Hello")
   return JSONResponse({"status": "ok"}, background=task)


# или так
@router.post("")
async def hello(store: StoreDep) -> dict:
   await store.worker.saq.send_email("support@mail.ru", message="Hello")
   return {"status": "ok"}

Запустить воркер можно таким способом:
async def main() -> None:
   async with store_lifespan() as store:
       await store.worker.saq.run(web=True)


if __name__ == "__main__":
   asyncio.run(main())

Плюсы

  • Удобно тестировать при помощи fakeredis

  • Можно сделать отложенное выполнение задачи 

  • Запуск задач по расписанию (cron)

  • Встроенный механизм перезапуска (retrying)

  • Встроенный web дашборд для мониторинга задач:

Минусы

  • В качестве брокера сообщений может быть только Redis

  • Не очень удачная реализация: в задачу можно прокидывать только kwargs, которые идут вперемешку с параметрами запуска задачи

FastStream

Ссылка на GitHub

Очень перспективный и интересный фреймворк, является идейным продолжением FastKafka и Propan. Отличается от других библиотек в статье тем, что представляет собой архитектуру pub/sub. 

Обратите внимание, что на момент написания статьи для обычной очереди задач это может иметь подводные камни:  

  • в рамках одного воркера параллельно обрабатываются только разные задачи

  • сообщения в рамках одного топика обрабатываются строго последовательно — нет возможности указать concurrency

  • если указать количество воркеров больше 1, то каждая задача будет выполняться столько раз, сколько есть воркеров

  • для запуска по расписанию потребуется интеграция с Taskiq: Taskiq - FastStream

При этом библиотека имеет много преимуществ, о которых я расскажу ниже. Сначала — как выглядит запуск фоновых задач:
import faststream
from faststream.redis import RedisBroker


class FastStream(faststream.FastStream):
   def __init__(self, *args: Any, **kwargs: Any) -> None:
       super().__init__(*args, **kwargs)

       self.on_startup(self.connect)
       self.on_shutdown(self.disconnect)

   async def connect(self, *_: Any, **__: Any) -> None:
       await connect_to_store()

       await self.broker.connect(url=settings.REDIS_DSN.__str__())

   async def disconnect(self, *_: Any, **__: Any) -> None:
       await disconnect_from_store()

       await self.broker.close()


broker = RedisBroker()

# для запуска воркера через терминал
faststream_app = FastStream(broker)

# название фоновой задачи
SEND_EMAIL_TOPIC = "send_email"


# фоновая задача
@broker.subscriber(SEND_EMAIL_TOPIC)
async def send_email(email: str, message: str) -> None:
   …


class WorkerAccessor(BaseAccessor):
   async def connect(self) -> None:
       await broker.connect(url=self.config.REDIS_DSN.__str__())

   async def disconnect(self) -> None:
       await broker.close()

   async def send_email(self, email: str, message: str = "Hello") -> None:
       # ставим задачу в очередь на исполнение
       await broker.publish(
           message={"email": email, "message": message},
           channel=SEND_EMAIL_TOPIC,
       )




@router.post("")
async def hello(store: StoreDep) -> JSONResponse:
   task = BackgroundTask(store.worker.faststream.send_email, "support@mail.ru", message="Hello")
   return JSONResponse({"status": "ok"}, background=task)


# или так
@router.post("")
async def hello(store: StoreDep) -> dict:
   await store.worker.faststream.send_email("support@mail.ru", message="Hello")
   return {"status": "ok"}

Воркер можно встроить в lifespan FastAPI либо запустить командой в терминале:
faststream run app.worker.faststream:faststream_app –workers=1

Запустить AsyncAPI:
faststream docs serve app.worker.faststream:faststream_app

Плюсы

  • В качестве брокера сообщений может быть Apache Kafka, RabbitMQ, NATS и Redis

  • Автогенерация документации (AsyncAPI)

  • Удобно тестировать (in-memory брокер)

  • Тесная интеграция с Pydantic и Dependency injection

  • Очень тесная интеграция с FastAPI, можно запускать в lifespan и не деплоить отдельный сервис — полезно при микросервисной архитектуре

  • Лёгкая интеграция с другими популярными web-фреймворками: Aiohttp, Django

  • Автоматическая кодогенерация при помощи языковых моделей

Минус

  • Фреймворк впервые вышел в релиз в сентябре 2023 года и пока не прошёл проверку временем. При этом успел обрести популярность и находится на стадии активной разработки

Celery

Ссылка на GitHub

Celery — популярный фреймворк, который позволяет очень гибко оперировать фоновыми задачами. Он обладает широкими возможностями, но порог входа может быть выше, чем у ARQ или SAQ. 

К сожалению, до сих пор нет официальной поддержки асинхронного кода, но обещают добавить в Celery 6.0. Ждём. А пока можем самостоятельно адаптировать Celery для выполнения асинхронного кода.

Посмотреть код
import celery
from celery import signals
from functools import wraps


class Celery(celery.Celery):
   def __init__(self, *args: Any, **kwargs: Any) -> None:
       super().__init__(*args, **kwargs)
      
       # тут хранятся функции для удобства тестирования (eager execution)
       self.functions: dict[str, Callable[..., Any]] = {}
      
       # цикл событий, чтобы из синхронной функции вызывать асинхронную
       self.loop = asyncio.get_event_loop()

   def connect(self, *_: Any, **__: Any) -> None:
       self.loop.run_until_complete(connect_to_store())

   def disconnect(self, *_: Any, **__: Any) -> None:
       self.loop.run_until_complete(disconnect_from_store())

   def task(
       self,
       task: Callable[..., Awaitable[T]] | None = None,
       **opts: Any,
   ) -> Callable:
       # декоратор от celery
       create_task = super().task

       def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., T]:
           @create_task(**opts)  # регистрируем задачу
           @wraps(func)
           def wrapper(*args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any) -> T:
               # для случаев, когда очень хочется выполнить задачу сразу (без apply_async)
               loop = loop or self.loop

               # выполняем асинхронную функцию в цикле событий
               return loop.run_until_complete(func(*args, **kwargs))
          
           # запоминаем функцию для удобства тестирования
           self.functions[wrapper.name] = func

           return wrapper

       if task:
           return decorator(task)

       return decorator


celery_app = Celery("celery", broker=settings.REDIS_DSN.__str__())
celery_app.autodiscover_tasks(packages=["app.worker.async_celery.tasks"])

# подключаем store
signals.worker_process_init.connect(celery_app.connect)
signals.worker_process_shutdown.connect(celery_app.disconnect)

Запуск задачи не отличается от обычного варианта (delay, apply_async и пр.):
# фоновая задача
@celery_app.task
async def send_email(email: str, message: str) -> None:
   …


@router.post("")
async def hello() -> JSONResponse:
   task = BackgroundTask(send_email.delay, "support@mail.ru", message="Hello")
   return JSONResponse({"status": "ok"}, background=task)


# варианты:
@router.post("")
async def hello() -> dict:
   # примеры постановки задачи в очередь на исполнение
   send_email.delay("support@mail.ru", message="Hello")
   send_email.apply_async(("support@mail.ru",), {"message": "Hello"}, countdown=5)
   send_email.s("support@mail.ru", message="Hello").apply_async(countdown=5)

   # пример запуска задачи в текущем процессе
   send_email("support@mail.ru", message="Hello", loop=asyncio.get_running_loop())

   return {"status": "ok"}

Для eager execution в тестах придется прибегнуть к использованию nest-asyncio (conftest.py):
from contextlib import ExitStack
import nest_asyncio
import pytest


# чтобы не ловить ошибки о том, что цикл событий уже запущен
nest_asyncio.apply()


@pytest.fixture(scope="session", autouse=True)
def celery_eager_execution(store: Store) -> Generator[None, None, None]:
   # регистрируем все фоновые задачи
   from app.worker.async_celery import tasks  # noqa: F401

   def execute_task(name: str) -> Callable[..., Any]:
       # достаем функцию для этой задачи и выполняем ее в качестве side_effect у мока
       func = celery_app.functions[name]

       def wrapper(args: Any, kwargs: Any, **_: Any) -> Any:
           loop = asyncio.get_running_loop()
           return loop.run_until_complete(func(*args, **kwargs))

       return wrapper
  
   # создаем моки для каждой задачи
   patches = [
       patch.object(task, "apply_async", side_effect=execute_task(name))
       for name, task in celery_app.tasks.items()
   ]
  
   # одновременно применяем все моки
   with ExitStack() as stack:
       for p in patches:
           stack.enter_context(p)
       yield

Воркер можно запустить командой в терминале:
celery -A app.worker.async_celery.app:celery_app worker –concurrency=3

Flower можно запустить командой в терминале:
celery -A app.worker.async_celery.app:celery_app flower

Плюсы

  • Проверенная временем библиотека для распределённой очереди задач

  • В качестве брокера сообщений могут быть RabbitMQ, Redis и Amazon SQS

  • Запуск задач по cron

  • Огромные возможности по контролю и жонглированию очередями и задачами

  • Установка приоритета выполнения задачи

  • В связке с Flower легко мониторить задачи и экспортировать метрики в Prometheus

Минусы

  • Внутри фоновой задачи нельзя создавать процессы и выполнять функции в ProcessPoolExecutor

  • Пока нет официальной поддержки асинхронного кода, поэтому могут быть некоторые издержки при использовании библиотеки

Какие ещё есть варианты работы с фоновыми задачами

Мониторинг

Для FastAPI настроить мониторинг очень просто — при помощи библиотеки prometheus-fastapi-instrumentator.

Посмотреть код
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator

from app.api.router import router


app = FastAPI(title="FastAPI async tasks")
app.include_router(router)

# все метрики будут доступны по пути /metrics
Instrumentator().instrument(app).expose(app)

А вот отслеживать именно фоновые задачи — задача нетривиальная. Они работают в другом процессе или даже кластере и никак не связаны с работающим web-сервером. 

Для их мониторинга воспользуемся библиотекой prometheus-async, которая позволит настроить сбор метрик и развернуть web-сервер, который по пути /metrics будет экспортировать эти метрики.

Для Сelery использование prometheus-async не требуется, потому что Flower имеет встроенную интеграцию с Prometheus.

Настраиваем декоратор export_task_metrics, который будем навешивать на интересующую нас функцию:
from prometheus_async.aio import count_exceptions, time
from prometheus_client import Counter, Histogram


EXECUTION_TIME = Histogram(
   "task_execution_seconds",
   "Task execution time",
   labelnames=["task_name"],  # чтобы в метриках видеть название фоновой задачи
)
TASKS_FAILED = Counter(
   "tasks_failed",
   "Tasks failed",
   labelnames=["task_name"],  # чтобы в метриках видеть название фоновой задачи
)


# декоратор для удобного сбора метрик
def export_task_metrics(func: Callable) -> Callable:
   task_name = func.__name__
  
   @count_exceptions(TASKS_FAILED.labels(task_name=task_name))  # подсчет количества ошибок
   @time(EXECUTION_TIME.labels(task_name=task_name))  # подсчет времени выполнения
   @wraps(func)
   def wrapper(*args: Any, **kwargs: Any) -> Any:
       return func(*args, **kwargs)

   return wrapper

Применяем декоратор:
@export_task_metrics
async def send_email(email: str, message: str) -> None:
   ...

При старте воркера запускаем web-сервер в отдельном треде, чтобы отдавать метрики по фоновым задачам (на примере ARQ):
from prometheus_async.aio.web import start_http_server_in_thread


async def main() -> None:
   start_http_server_in_thread(port=settings.ARQ_EXPORTER_PORT)

   async with store_lifespan() as store:
       await store.worker.arq.run()


if __name__ == "__main__":
   asyncio.run(main())

Обратите внимание, что в каждом воркере необходимо развернуть свой web-сервер.

Сейчас мы будем одновременно тестировать все 4 библиотеки, поэтому настроим Prometheus для сбора всех метрик:
# prometheus.yml
global:
 scrape_interval: 15s


scrape_configs:
 - job_name: "fastapi"
   static_configs:
     - targets: ["fastapi:8000"]


 - job_name: "arq"
   static_configs:
     - targets: ["arq:8001"]


 - job_name: "saq"
   static_configs:
     - targets: ["saq:8002"]


 - job_name: "faststream"
   static_configs:
     - targets: ["faststream:8003"]


 - job_name: "async-celery"
   static_configs:
     - targets: ["async-celery-flower:5555"]

Соберём образ проекта:
docker build -t fastapi-async-tasks .

И запустим в docker compose. Все воркеры будут иметь одинаково небольшие ресурсы:
# docker-compose.yml
version: "3.8"


x-tasks-common: &tasks-common
   image: fastapi-async-tasks
   volumes:
       - .:/app  # для разработки
   depends_on:
       - fastapi
   restart: on-failure
   networks:
       - default
   deploy:
       resources:
           limits:
               memory: 200M
               cpus: "0.2"


services:
   redis:
       image: redis:7.2.4
       ports:
           - "6379:6379"
       restart: on-failure
       networks:
           - default


   prometheus:
       image: prom/prometheus
       ports:
           - "9090:9090"
       volumes:
           - .prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
       command:
           -  --config.file=/etc/prometheus/prometheus.yml
       restart: on-failure
       networks:
           - default


   grafana:
       image: grafana/grafana
       ports:
           - "3000:3000"
       volumes:
           - type: volume
             source: grafana
             target: /var/lib/grafana
       environment:
           - GF_SECURITY_ADMIN_USER=admin
           - GF_SECURITY_ADMIN_PASSWORD=admin
           - GF_INSTALL_PLUGINS=redis-app
       user: "0"
       restart: on-failure
       networks:
           - default


   fastapi:
       image: fastapi-async-tasks
       ports:
           - "8000:8000"
       volumes:
           - .:/app
       depends_on:
           - redis
           - prometheus
           - grafana
       restart: on-failure
       networks:
           - default
       deploy:
           resources:
               limits:
                   memory: 200M
                   cpus: "0.2"
       command:
           - uvicorn
           - app.main:app
           - --host=0.0.0.0
           - --port=8000
           - --reload
           - --workers=${FASTAPI_WORKERS}


   arq:
       <<: *tasks-common
       ports:
           - "8001:8001"  # metrics
       command:
           - python
           - -m
           - worker_arq


   saq:
       <<: *tasks-common
       ports:
           - "8082:8082"  # web dashboard
           - "8002:8002"  # metrics
       command:
           - python
           - -m
           - worker_saq


   faststream:
       <<: *tasks-common
       ports:
           - "8003:8003"  # metrics
       command:
           - faststream
           - run
           - app.worker.faststream.app:faststream_app
           - --workers=${FASTSTREAM_CONCURRENCY}


   async-celery:
       <<: *tasks-common
       command:
           - celery
           - -A
           - app.worker.async_celery.app:async_celery_app
           - worker
           - --loglevel=info
           - --concurrency=${ASYNC_CELERY_CONCURRENCY}


   async-celery-flower:
       image: fastapi-async-tasks
       ports:
           - "5555:5555"
       volumes:
           - .:/app
       depends_on:
           - async-celery
       restart: on-failure
       networks:
           - default
       command:
           - celery
           - -A
           - app.worker.async_celery.app:async_celery_app
           - flower


volumes:
   grafana:


networks:
   default:
       driver: bridge
docker-compose up -d

Нагрузочное тестирование

Чтобы не деплоить слишком много, будем проверять корректность выполненной работы при помощи инкремента числа в Redis. Одна выполненная задача — один инкремент. Финальное значение посмотрим в Grafana, это послужит контролем того, что все фоновые задачи выполнены успешно.

Redis работает быстро, поэтому в сравнительном тестировании мы сможем проверить скорость работы именно самой библиотеки (поставить задачу в очередь, достать задачу из очереди).

Во всех тестах используется 1 реплика FastAPI.

Проверим 5 разных эндпойнтов, которые запускают свои фоновые задачи:

  • io/simple: асинхронный redis

  • io/sync: синхронный redis

  • io/thread: синхронный redis в отдельном треде ThreadPoolExecutor(max_workers=3)

  • cpu/simple: числодробильная работа + асинхронный redis

  • cpu/process: числодробильная работа в отдельном процессе ProcessPoolExecutor(max_workers=3) + асинхронный redis

Для нагрузочного тестирования воспользуемся библиотекой Locust.

Посмотреть код
# locust/master.conf
headless = true  # управление через консоль
expect-workers = 10  # сначала дожидаемся старта 10 процессов
host = http://localhost:8000  # адрес FastAPI
# locust/background.py
from random import shuffle
from locust import between, HttpUser, task


# для тестирования Background Task
class TaskUser(HttpUser):
   wait_time = between(0.001, 0.001)

   @task
   def execute(self):
       endpoint = os.environ.get("endpoint")
       endpoints = (
           [endpoint]
           if endpoint
           else ["io/simple", "io/sync", "io/thread", "cpu/simple", "cpu/process"]
       )
       shuffle(endpoints)

       for endpoint in endpoints:
           self.client.post(f"/api/v1/incr/{endpoint}/background", json={"value": 1})
# locust/benchmark.py
from random import shuffle
from locust import between, HttpUser, task


# для тестирования остальных библиотек
class TaskUser(HttpUser):
   wait_time = between(0.001, 0.001)

   @task
   def execute(self):
       endpoint = os.environ.get("endpoint")
       endpoints = (
           [endpoint]
           if endpoint
           else ["io/simple", "io/sync", "io/thread", "cpu/simple", "cpu/process"]
       )

       worker = os.environ.get("worker")
       workers = (
           [worker]
           if worker
           else ["arq", "saq", "faststream", "async-celery"]
       )

       params = [(endpoint, worker) for endpoint in endpoints for worker in workers]
       shuffle(params)

       for endpoint, worker in params:
           self.client.post(f"/api/v1/incr/{endpoint}/{worker}", json={"value": 1})

Параметры:

  • -i: количество итераций

  • -u: количество пользователей

Пример команды запуска:
env endpoint=io/simple locust --config locust/master.conf -f locust/background.py -i 10000 -u 100

Таблицы и графики

Как Background Task влияет на производительность FastAPI

# 200M memory, 0.2 cpus

Итерации (количество фоновых задач)

Пользователи

RPS

Время запроса (медиана), мс

Время запроса (95-й перцентиль), мс

Время выполнения всех задач, сек

Количество ошибок (%)

io/simple

50000

100

319

200

390

160

0.0

io/sync

50000

100

290

280

410

180

0.0

io/thread

50000

100

209

400

600

245

0.0

cpu/simple

100

3

0.4

7300

9600

245

0.0

cpu/process

1000

3

16

190

400

2460

0.0

# io/simple (асинхронный redis)

Фоновые задачи простые и быстрые, поэтому видим стабильно хорошие показатели RPS.

# io/sync (синхронный redis)

Сказывается синхронный код: значение RPS меньше, а время выполнения задач больше. Чем дольше будет выполняться синхронный код, тем хуже будут становиться показатели.

# io/thread (синхронный redis в отдельном треде)

Не стоит злоупотреблять этим вариантом, т.к. выполнение кода в другом треде требует накладных расходов, что мы и видим по таблице и на графиках: показатели заметно хуже, чем в синхронном варианте. При более сложных задачах может сильно проседать производительность.

# cpu/simple (числодробильная работа + асинхронный redis)

FastAPI был настолько загружен, что не всегда успевал отдавать метрики. Вывод очевиден: CPU-bound фоновые задачи необходимо выносить в другой сервис.

# cpu/process (числодробильная работа в отдельном процессе + асинхронный redis)

С точки зрения Locust было около 16 RPS, но для FastAPI запрос считается завершенным только после выполнения фоновой задачи. Именно поэтому в Grafana значение RPS находится около нуля, а значение Total requests растёт вместе с Executed tasks. Вывод: BackgroundTask можно использовать только при небольшом количестве таких фоновых задач.

Показатели FastAPI при отправке фоновых задач в другой сервис

Тестируем эндпойнт io/simple, 50000 итераций, 100 пользователей

Чтобы не ущемлять FastStream, для чистоты эксперимента ARQ, SAQ и Celery будут иметь concurrency=1.

# 200M memory, 0.2 cpus

RPS

Время запроса (медиана), мс

Время запроса (95-й перцентиль), мс

ARQ

227

380

590

SAQ*

162*

290*

500*

FastStream

262

300

490

Celery

151

590

880

ARQ

Хорошие показатели, стабильная работа. Разница между временем окончания нагрузки и последней выполненной задачей небольшая.

SAQ

Тут результат неожиданный. 

  • Сначала задачи бодро ставились в очередь, скорость работы была чуть выше, чем у ARQ

  • Но вскоре сервис стал деградировать, а новые задачи не ставились в очередь

  • В итоге web-сервер перестал отвечать на запросы, тестирование пришлось остановить

Увеличение ресурсов в 2 раза решило проблему, но ненадолго: увеличив количество итераций в 2 раза, проблема вернулась. Пройти тест удалось лишь после увеличения времени ожидания между запросами.

FastStream

Лучшие показатели среди представленных библиотек. Фоновые задачи выполнялись практически сразу после постановки в очередь.

Celery

Показал самые скромные результаты. Последняя задача выполнилась сильно позже окончания нагрузки.

Сравнительное тестирование скорости выполнения задач

Сравнительная производительность после увеличения ресурсов

# 500M memory, 0.5 cpus

RPS

Время запроса (медиана), мс

Время запроса (95-й перцентиль), мс

ARQ

1114

18

34

SAQ

1079

19

37

FastStream

1178

15

31

Celery

871

30

59

Как видим, увеличение ресурсов несколько сгладило картину для ARQ, SAQ и FastStream. У Celery результаты ожидаемо скромнее — сказывается синхронность библиотеки.

Обратите внимание, что в Celery внутри фоновой задачи нельзя создавать процессы и выполнять функции в ProcessPoolExecutor, получим ошибку AssertionError ('daemonic processes are not allowed to have children').

Итоги тестирования

  • BackgroundTasks можно использовать в небольших проектах либо для небольших I/O-bound задач. В остальных случаях лучше использовать специализированные библиотеки.

  • FastStream оказался самым скоростным решением и имеет очень интересный набор функционала.

  • Celery ожидаемо проигрывает в скорости, но за счёт широкого функционала может стать неплохим вариантом. Ожидаем шестую версию.

  • ARQ является простым решением, но ничем особым не выделился. Пару раз воркер переставал работать, хотя задачи ставились в очередь. Лечилось перезапуском воркера. Возможно, это поправили в новой версии.

  • SAQ является аналогом ARQ со своими плюсами и минусами. Зависает при длительной большой нагрузке.

Выводы

  • Если в коде много CPU-bound-операций, их обязательно нужно вынести в отдельный микросервис

  • Если нагрузка небольшая, а фоновые задачи — в основном I/O-bound операции, можно использовать встроенный функционал FastAPI Background Tasks. Но для стабильности всё равно лучше их вынести в отдельный микросервис

Другие наши статьи по бэкенду и асинхронному программированию для начинающих:

Другие наши статьи по бэкенду и асинхронному программированию для продвинутого уровня:

Комментарии (15)


  1. Flosckow
    24.05.2024 09:51

    Если мне память не изменяет, то фоновые задачи выполняются последовательно и если в какой-то из задач рейзится исключение - то следующие не выполнятся. Так что это можно тоже отметить как потенциальный минус


    1. ipakeev Автор
      24.05.2024 09:51

      Это про какую библиотеку?


  1. ThatSeemsLegit
    24.05.2024 09:51

    Регулярно приходится сталкиваться с такими задачами и пришел к выводу, что лучше всего просто пульнуть таску в очередь, и оттуда уже сервисами их исполнять. Точно ничего не потеряется (nack на фейле) + легкий скейлинг и без спайков в нагрузке


  1. Ryav
    24.05.2024 09:51
    +2

    Спасибо за статью, неплохой обзор получился и открыл для себя FastStream.
    Вопрос по воркерам FastStream: когда воркер забирает таску, разве он не должен сделать её недоступной для остальных, чтобы одну и ту же задачу не выполнили n раз?


    1. ipakeev Автор
      24.05.2024 09:51
      +1

      Если у каждой библиотеки запустить по 5 воркеров, то результат будет таким: arq, saq и celery выполнят все 10к задач, а faststream - в 5 раз больше (потому что одна и та же задача будет выполняться на каждом воркере).

      Думаю, у faststream такое поведение из-за того, что в первую очередь он создан для общения между микросервисами, а не для фоновых задач.


      1. Ryav
        24.05.2024 09:51

        Ну, собственно, я это и спрашиваю — почему все воркеры берут одну и ту же задачу? Либо её статус никак не меняется, либо не успевает поменяться (все 5 уже забрали до изменения статуса), либо что-то ещё.


        1. evgenii_moriakhin
          24.05.2024 09:51

          FastStream предполагает, что потребление только одним подписчиком нужно реализовать вручную: https://github.com/airtai/faststream/issues/693

          И в целом модель применения FastStream отличается от arq/saq/celery - они нужны для фоновых\запланированных задач, а FastStream для управляемой событиями архитектуры сервисов

          Но, конечно, никто не мешает вам начать применять их, как вам хочется


        1. ipakeev Автор
          24.05.2024 09:51
          +1

          При тестировании замечал, что иногда один из воркеров работает заметно медленнее остальных. Тем не менее, это не мешает ему обработать все 10к задач, даже если остальные воркеры завершили работу десятки секунд назад.

          Как указал выше, faststream создан именно для общения между микросервисами. Это подразумевает, что все, кто подписан на топик, должны получить сообщение.

          С другой стороны, для разных брокеров есть куча специфичных параметров, которые позволяют гранулированно настроить поведение публикации/обработки сообщений, в том числе отработку только один раз. Но этот момент не совсем очевидный и требует проверки, например встречал похожий issue для NATS.


  1. lehshik
    24.05.2024 09:51

    для каких задач применяются все эти технологии, для интернет магазинов это как будто избыточно, крупные проекты наверное испольуют самописные системы?


    1. ipakeev Автор
      24.05.2024 09:51
      +1

      Если у вас CRUD'овый интернет магазин, то избыточно. Но как только появляются оплаты, интеграции, квитанции, задачи по расписанию, то без фоновых задач никак.

      Даже если эти задачи находятся в отдельных микросервисах (как у крупных проектов), тот же faststream значительно упростит реализацию.


  1. evgenii_moriakhin
    24.05.2024 09:51

    А что скажете про бенчмарки автора saq?

    В бенчмарках автора 1000 итераций, на моей машине итераций saq спокойно переносит 500к и обгоняет arq со 100к.

    ARQ enqueue 100000 18.29077935218811
    ARQ process 100000 noop 501.143620967865

    SAQ enqueue 500000 39.770418882369995
    SAQ process 500000 noop 209.9531545639038

    arq же в свою очередь не смог пройти 300к и заставил стаймаутить редис

    подозреваю тут тоже играет роль этот плюс saq:
    >> Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY


    1. ipakeev Автор
      24.05.2024 09:51

      Бенчмарки действительно показывают, что saq работает значительно быстрее, чем arq. Локально цифры тоже подтверждаются.
      Но чтобы это как-то отразилось в реальной практике, похоже, нужны бешеные нагрузки.

      Почему при нагрузочном тестировании saq так странно себя ведет - загадка. Судя по логам, сервер периодически намертво зависает на 5-20 секунд в момент постановки задачи в очередь (в это время FastAPI не может обработать ни один запрос).


  1. evgenii_moriakhin
    24.05.2024 09:51
    +1

    К минусам arq так же можно отнести, что достаточно долго над ним не велась работа, поэтому и появилcя saq, как его переработанный форк
    (и интересно, что saq выбрали ребята из Litestar в своих примерах - https://github.com/litestar-org/litestar-pg-redis-docker/blob/main/app/lib/worker.py)

    но у автора arq (он же автор pydantic'a, к слову) в планах целый роадмап по переработке arq - https://github.com/samuelcolvin/arq/issues/437


    1. ipakeev Автор
      24.05.2024 09:51
      +1

      Да, изначально одним из минусов было то, что последний релиз arq был в конце 2022 года, но внезапно 1 мая этого года выкатили новую версию. Поэтому пришлось убрать)


  1. AikoASMR
    24.05.2024 09:51

    Терпим и ждём celery asyncio pool и django async db driver. Слишком больно по времени терять среду Джанги и собирать велосипеды из кучи разрозненных либ.