Привет! Меня зовут Иван, я бэкенд-разработчик в KTS.
Сегодня расскажу, как в FastAPI эффективно работать с фоновыми задачами и настроить их мониторинг в Prometheus.
В туториалах для фоновых задач в FastAPI обычно предлагают celery и при этом используют синхронный код. Но сегодня в реальной практике такое встречается редко, поэтому в этой статье я покажу, как в фоновых задачах использовать асинхронный код.
В статье опишу 5 вариантов: встроенный в FastAPI Background Tasks и 4 библиотеки — ARQ, SAQ, FastStream, адаптированный к асинхронному коду Celery.
В конце расскажу, как мониторить фоновые задачи.
Оглавление
Что такое фоновые задачи
Для ускорения работы сервиса часть логики часто выносится в фоновые задачи:
периодические задачи по cron
запланированные (отложенные) задачи
-
тяжёлый функционал:
требовательный к ресурсам — CPU-bound
продолжительный по времени — как правило, I/O-bound
Примеры:
Отправка писем на почту. Можно сразу отправить пользователю ответ и выполнить отправку письма в фоне. Эта задача может занять несколько секунд, поэтому лучше не заставлять пользователя ждать, пока письмо действительно отправится
Обработка данных. При обработке картинки от пользователя, можно отправить ответ "Accepted" (HTTP 202) и перенаправить работу с файлом в фоновую задачу
Что такое I/O-bound и CPU-bound операции
I/O-bound — операция, скорость выполнения которой ограничена скоростью подсистемы ввода-вывода. К таким задачам можно отнести выполнение запросов по сети, операции с базой, чтение/запись файла на диск.
Например, HTTP-запрос:
r = requests.get('https://api.github.com/user', auth=('user', 'pass'))
CPU-bound — операция, скорость выполнения которой ограничена скоростью CPU. В web-сервисах это обычно работа с медиа: обрезка, ресайз, конвертация картинок.
Постановка задачи в очередь на исполнение
Чтобы поставить задачу в очередь на исполнение, понадобится брокер сообщений — мы возьмём Redis, потому что он подходит для всех четырёх библиотек.
Есть несколько способов запуска фоновых задач, но для унификации воспользуемся вариантом с передачей инстанса BackgroundTask в JSONResponse:
@router.post("")
async def simple() -> JSONResponse:
task = BackgroundTask(func, *args, **kwargs)
return JSONResponse({"status": "ok"}, background=task)
В качестве func будем использовать целевую функцию (для BackgroundTask), либо функцию, которая поставит задачу в очередь на исполнение.
Исходный код проекта можно посмотреть тут: https://github.com/ipakeev/fastapi-async-tasks
В нашей компании используется концепция Store, которая является единой точкой входа для всех аксессоров. Проект имеет именно такую архитектуру.
Посмотреть код
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends
class BaseAccessor:
def __init__(self, store: "Store") -> None:
self.store = store
self.config = settings
async def connect(self) -> None:
pass
async def disconnect(self) -> None:
pass
class Store:
def __init__(self) -> None:
# чтобы исключить циклический импорт
from app.core.accessor import CoreAccessor
from app.redis.accessor import RedisAccessor
from app.worker.accessor import WorkerAccessor
self.core = CoreAccessor(self)
self.redis = RedisAccessor(self)
self.worker = WorkerAccessor(self)
async def connect(self) -> None:
await self.redis.connect()
await self.worker.connect()
await self.core.connect()
async def disconnect(self) -> None:
await self.core.disconnect()
await self.worker.disconnect()
await self.redis.disconnect()
_store: Store | None = None
def get_store() -> Store:
assert _store, "Store is not initialized"
return _store
async def connect_to_store() -> Store:
global _store
if not _store:
_store = Store()
await _store.connect()
return _store
async def disconnect_from_store() -> None:
global _store
if _store:
await _store.disconnect()
_store = None
@asynccontextmanager
async def store_lifespan() -> AsyncGenerator[Store, None]:
await connect_to_store()
try:
yield get_store()
finally:
await disconnect_from_store()
StoreDep = Annotated[Store, Depends(get_store)]
Для декларативного подхода можно использовать библиотеки, например dependency-injector.
Теперь рассмотрим все варианты работы с фоновым задачи с использованием асинхронного кода.
FastAPI Background Tasks
Встроенный в FastAPI класс для создания фоновых задач. Они будут выполняться сразу после того, как сервер вернул ответ. При этом нет никакой очереди, а результат выполнения нигде не хранится.
from starlette.background import BackgroundTask
from starlette.responses import JSONResponse
# фоновая задача
async def send_email(email: str, message: str = "Hello") -> None:
...
@router.post("")
async def hello() -> JSONResponse:
task = BackgroundTask(send_email, "support@mail.ru", message="Hello")
return JSONResponse({"status": "ok"}, background=task)
Плюсы
- Удобно использовать
- Очень удобно тестировать, потому что не нужны дополнительные манипуляции
- Не нужно разворачивать брокер сообщений
- Не нужно разворачивать дополнительный сервис (worker)
Минусы
- При большом количестве одновременных I/O-bound задач и даже при небольшом количестве CPU-bound задач сервис деградирует: увеличивается время запросов, снижается количество RPS.
- При отключении/рестарте сервиса незавершённые и не начатые задачи будут утеряны
ARQ
Легковесная библиотека, задуманная как асинхронный вариант RQ.
Особенности:
Задача обязательно должна принимать первым аргументом объект контекста, в котором можно хранить что угодно
Чтобы поставить задачу в очередь, нужно передать её название
Посмотреть код
import arq
import arq.connections
# фоновая задача: функция для отправки эл. почты в фоне
async def send_email(ctx: dict, email: str, message: str = "Hello") -> None:
store: Store = ctx["store"]
…
# класс для доступа к фоновым задачам
class WorkerAccessor(AbstractTaskAccessor):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
# инициализируем пул задач arq
self.job_pool: arq.ArqRedis | None = None
async def connect(self) -> None:
# устанавливаем соединение с Redis
self.job_pool = await arq.create_pool(
arq.connections.RedisSettings(
# настраиваем параметры соединения
host=self.config.REDIS_DSN.host,
port=self.config.REDIS_DSN.port,
username=self.config.REDIS_DSN.username,
password=self.config.REDIS_DSN.password,
database=self.config.REDIS_DSN.path.removeprefix("/"),
)
)
async def disconnect(self) -> None:
"""Для закрытия соединения"""
if self.job_pool:
await self.job_pool.close()
async def run(self) -> None:
"""Для запуска воркера в другом сервисе"""
async def on_startup(ctx: dict) -> None:
# устанавливаем объект хранилища в контексте
ctx["store"] = self.store
worker = arq.Worker(
# указываем фоновые задачи
functions=[send_email],
on_startup=on_startup,
max_jobs=self.config.ARQ_CONCURRENCY,
health_check_interval=60,
handle_signals=False,
redis_pool=self.job_pool,
)
try:
# запускаем воркер
await worker.main()
finally:
# закрываем воркер
await worker.close()
async def send_email(self, email: str, message: str = "Hello") -> None:
# ставим задачу в очередь на исполнение
await self.job_pool.enqueue_job(send_email.__name__, email, message=message)
# запускаем фоновую задачу отправки эл. почты, вариант 1
@router.post("")
async def hello(store: StoreDep) -> JSONResponse:
task = BackgroundTask(store.worker.arq.send_email, "support@mail.ru", message="Hello")
return JSONResponse({"status": "ok"}, background=task)
# запускаем фоновую задачу отправки эл. почты, вариант 2
@router.post("")
async def hello(store: StoreDep) -> dict:
await store.worker.arq.send_email("support@mail.ru", message="Hello")
return {"status": "ok"}
Запустить воркер можно таким способом:
async def main() -> None:
async with store_lifespan() as store:
await store.worker.arq.run()
if __name__ == "__main__":
asyncio.run(main())
Плюсы
Удобно тестировать (при помощи fakeredis)
Можно сделать отложенное выполнение задачи
Запуск задач по расписанию (cron)
Встроенный механизм перезапуска (retrying) и отмены задачи
Минусы
-
В качестве брокера сообщений может быть только Redis
SAQ
Аналог ARQ со встроенным дашбордом.
Посмотреть код
import saq
from aiohttp.web_runner import AppRunner, TCPSite
from redis.asyncio import Redis
from saq.web.aiohttp import create_app
# фоновая задача
async def send_email(ctx: dict, email: str, message: str = "Hello") -> None:
store: Store = ctx["store"]
…
class WorkerkAccessor(BaseAccessor):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.queue: saq.Queue | None = None
async def connect(self) -> None:
client = Redis.from_url(self.config.REDIS_DSN.__str__())
self.queue = saq.Queue(client)
async def disconnect(self) -> None:
if self.queue:
await self.queue.disconnect()
async def run(self, web: bool = False, port: int = 8090) -> None:
"""Для запуска воркера в другом сервисе"""
async def on_startup(ctx: dict) -> None:
ctx["store"] = self.store
worker = saq.Worker(
self.queue,
functions=[send_email], # указываем фоновые задачи
startup=on_startup,
concurrency=self.config.SAQ_CONCURRENCY,
)
if web:
# запускаем дашборд
app = create_app([self.queue])
task = asyncio.create_task(worker.start())
try:
runner = AppRunner(app)
await runner.setup()
site = TCPSite(runner, port=port)
await site.start()
await asyncio.Event().wait()
finally:
await worker.stop()
task.cancel()
else:
await worker.start()
async def send_email(self, email: str, message: str = "Hello") -> None:
# ставим задачу в очередь на исполнение
await self.queue.enqueue(send_email.__name__, email=email, message=message)
@router.post("")
async def hello(store: StoreDep) -> JSONResponse:
task = BackgroundTask(store.worker.saq.send_email, "support@mail.ru", message="Hello")
return JSONResponse({"status": "ok"}, background=task)
# или так
@router.post("")
async def hello(store: StoreDep) -> dict:
await store.worker.saq.send_email("support@mail.ru", message="Hello")
return {"status": "ok"}
Запустить воркер можно таким способом:
async def main() -> None:
async with store_lifespan() as store:
await store.worker.saq.run(web=True)
if __name__ == "__main__":
asyncio.run(main())
Плюсы
Удобно тестировать при помощи fakeredis
Можно сделать отложенное выполнение задачи
Запуск задач по расписанию (cron)
Встроенный механизм перезапуска (retrying)
Встроенный web дашборд для мониторинга задач:
Минусы
В качестве брокера сообщений может быть только Redis
-
Не очень удачная реализация: в задачу можно прокидывать только kwargs, которые идут вперемешку с параметрами запуска задачи
FastStream
Очень перспективный и интересный фреймворк, является идейным продолжением FastKafka и Propan. Отличается от других библиотек в статье тем, что представляет собой архитектуру pub/sub.
Обратите внимание, что на момент написания статьи для обычной очереди задач это может иметь подводные камни:
в рамках одного воркера параллельно обрабатываются только разные задачи
сообщения в рамках одного топика обрабатываются строго последовательно — нет возможности указать concurrency
если указать количество воркеров больше 1, то каждая задача будет выполняться столько раз, сколько есть воркеров
для запуска по расписанию потребуется интеграция с Taskiq: Taskiq - FastStream
При этом библиотека имеет много преимуществ, о которых я расскажу ниже. Сначала — как выглядит запуск фоновых задач:
import faststream
from faststream.redis import RedisBroker
class FastStream(faststream.FastStream):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.on_startup(self.connect)
self.on_shutdown(self.disconnect)
async def connect(self, *_: Any, **__: Any) -> None:
await connect_to_store()
await self.broker.connect(url=settings.REDIS_DSN.__str__())
async def disconnect(self, *_: Any, **__: Any) -> None:
await disconnect_from_store()
await self.broker.close()
broker = RedisBroker()
# для запуска воркера через терминал
faststream_app = FastStream(broker)
# название фоновой задачи
SEND_EMAIL_TOPIC = "send_email"
# фоновая задача
@broker.subscriber(SEND_EMAIL_TOPIC)
async def send_email(email: str, message: str) -> None:
…
class WorkerAccessor(BaseAccessor):
async def connect(self) -> None:
await broker.connect(url=self.config.REDIS_DSN.__str__())
async def disconnect(self) -> None:
await broker.close()
async def send_email(self, email: str, message: str = "Hello") -> None:
# ставим задачу в очередь на исполнение
await broker.publish(
message={"email": email, "message": message},
channel=SEND_EMAIL_TOPIC,
)
@router.post("")
async def hello(store: StoreDep) -> JSONResponse:
task = BackgroundTask(store.worker.faststream.send_email, "support@mail.ru", message="Hello")
return JSONResponse({"status": "ok"}, background=task)
# или так
@router.post("")
async def hello(store: StoreDep) -> dict:
await store.worker.faststream.send_email("support@mail.ru", message="Hello")
return {"status": "ok"}
Воркер можно встроить в lifespan FastAPI либо запустить командой в терминале:
faststream run app.worker.faststream:faststream_app –workers=1
Запустить AsyncAPI:
faststream docs serve app.worker.faststream:faststream_app
Плюсы
В качестве брокера сообщений может быть Apache Kafka, RabbitMQ, NATS и Redis
Автогенерация документации (AsyncAPI)
Удобно тестировать (in-memory брокер)
Тесная интеграция с Pydantic и Dependency injection
Очень тесная интеграция с FastAPI, можно запускать в lifespan и не деплоить отдельный сервис — полезно при микросервисной архитектуре
Лёгкая интеграция с другими популярными web-фреймворками: Aiohttp, Django
Минус
-
Фреймворк впервые вышел в релиз в сентябре 2023 года и пока не прошёл проверку временем. При этом успел обрести популярность и находится на стадии активной разработки
Celery
Celery — популярный фреймворк, который позволяет очень гибко оперировать фоновыми задачами. Он обладает широкими возможностями, но порог входа может быть выше, чем у ARQ или SAQ.
К сожалению, до сих пор нет официальной поддержки асинхронного кода, но обещают добавить в Celery 6.0. Ждём. А пока можем самостоятельно адаптировать Celery для выполнения асинхронного кода.
Посмотреть код
import celery
from celery import signals
from functools import wraps
class Celery(celery.Celery):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
# тут хранятся функции для удобства тестирования (eager execution)
self.functions: dict[str, Callable[..., Any]] = {}
# цикл событий, чтобы из синхронной функции вызывать асинхронную
self.loop = asyncio.get_event_loop()
def connect(self, *_: Any, **__: Any) -> None:
self.loop.run_until_complete(connect_to_store())
def disconnect(self, *_: Any, **__: Any) -> None:
self.loop.run_until_complete(disconnect_from_store())
def task(
self,
task: Callable[..., Awaitable[T]] | None = None,
**opts: Any,
) -> Callable:
# декоратор от celery
create_task = super().task
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., T]:
@create_task(**opts) # регистрируем задачу
@wraps(func)
def wrapper(*args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any) -> T:
# для случаев, когда очень хочется выполнить задачу сразу (без apply_async)
loop = loop or self.loop
# выполняем асинхронную функцию в цикле событий
return loop.run_until_complete(func(*args, **kwargs))
# запоминаем функцию для удобства тестирования
self.functions[wrapper.name] = func
return wrapper
if task:
return decorator(task)
return decorator
celery_app = Celery("celery", broker=settings.REDIS_DSN.__str__())
celery_app.autodiscover_tasks(packages=["app.worker.async_celery.tasks"])
# подключаем store
signals.worker_process_init.connect(celery_app.connect)
signals.worker_process_shutdown.connect(celery_app.disconnect)
Запуск задачи не отличается от обычного варианта (delay, apply_async и пр.):
# фоновая задача
@celery_app.task
async def send_email(email: str, message: str) -> None:
…
@router.post("")
async def hello() -> JSONResponse:
task = BackgroundTask(send_email.delay, "support@mail.ru", message="Hello")
return JSONResponse({"status": "ok"}, background=task)
# варианты:
@router.post("")
async def hello() -> dict:
# примеры постановки задачи в очередь на исполнение
send_email.delay("support@mail.ru", message="Hello")
send_email.apply_async(("support@mail.ru",), {"message": "Hello"}, countdown=5)
send_email.s("support@mail.ru", message="Hello").apply_async(countdown=5)
# пример запуска задачи в текущем процессе
send_email("support@mail.ru", message="Hello", loop=asyncio.get_running_loop())
return {"status": "ok"}
Для eager execution в тестах придется прибегнуть к использованию nest-asyncio (conftest.py):
from contextlib import ExitStack
import nest_asyncio
import pytest
# чтобы не ловить ошибки о том, что цикл событий уже запущен
nest_asyncio.apply()
@pytest.fixture(scope="session", autouse=True)
def celery_eager_execution(store: Store) -> Generator[None, None, None]:
# регистрируем все фоновые задачи
from app.worker.async_celery import tasks # noqa: F401
def execute_task(name: str) -> Callable[..., Any]:
# достаем функцию для этой задачи и выполняем ее в качестве side_effect у мока
func = celery_app.functions[name]
def wrapper(args: Any, kwargs: Any, **_: Any) -> Any:
loop = asyncio.get_running_loop()
return loop.run_until_complete(func(*args, **kwargs))
return wrapper
# создаем моки для каждой задачи
patches = [
patch.object(task, "apply_async", side_effect=execute_task(name))
for name, task in celery_app.tasks.items()
]
# одновременно применяем все моки
with ExitStack() as stack:
for p in patches:
stack.enter_context(p)
yield
Воркер можно запустить командой в терминале:
celery -A app.worker.async_celery.app:celery_app worker –concurrency=3
Flower можно запустить командой в терминале:
celery -A app.worker.async_celery.app:celery_app flower
Плюсы
Проверенная временем библиотека для распределённой очереди задач
В качестве брокера сообщений могут быть RabbitMQ, Redis и Amazon SQS
Запуск задач по cron
Огромные возможности по контролю и жонглированию очередями и задачами
Установка приоритета выполнения задачи
В связке с Flower легко мониторить задачи и экспортировать метрики в Prometheus
Минусы
Внутри фоновой задачи нельзя создавать процессы и выполнять функции в ProcessPoolExecutor
-
Пока нет официальной поддержки асинхронного кода, поэтому могут быть некоторые издержки при использовании библиотеки
Какие ещё есть варианты работы с фоновыми задачами
-
Если знаете другие, напишите в комментариях
Мониторинг
Для FastAPI настроить мониторинг очень просто — при помощи библиотеки prometheus-fastapi-instrumentator.
Посмотреть код
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator
from app.api.router import router
app = FastAPI(title="FastAPI async tasks")
app.include_router(router)
# все метрики будут доступны по пути /metrics
Instrumentator().instrument(app).expose(app)
А вот отслеживать именно фоновые задачи — задача нетривиальная. Они работают в другом процессе или даже кластере и никак не связаны с работающим web-сервером.
Для их мониторинга воспользуемся библиотекой prometheus-async, которая позволит настроить сбор метрик и развернуть web-сервер, который по пути /metrics будет экспортировать эти метрики.
Для Сelery использование prometheus-async не требуется, потому что Flower имеет встроенную интеграцию с Prometheus.
Настраиваем декоратор export_task_metrics, который будем навешивать на интересующую нас функцию:
from prometheus_async.aio import count_exceptions, time
from prometheus_client import Counter, Histogram
EXECUTION_TIME = Histogram(
"task_execution_seconds",
"Task execution time",
labelnames=["task_name"], # чтобы в метриках видеть название фоновой задачи
)
TASKS_FAILED = Counter(
"tasks_failed",
"Tasks failed",
labelnames=["task_name"], # чтобы в метриках видеть название фоновой задачи
)
# декоратор для удобного сбора метрик
def export_task_metrics(func: Callable) -> Callable:
task_name = func.__name__
@count_exceptions(TASKS_FAILED.labels(task_name=task_name)) # подсчет количества ошибок
@time(EXECUTION_TIME.labels(task_name=task_name)) # подсчет времени выполнения
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
return func(*args, **kwargs)
return wrapper
Применяем декоратор:
@export_task_metrics
async def send_email(email: str, message: str) -> None:
...
При старте воркера запускаем web-сервер в отдельном треде, чтобы отдавать метрики по фоновым задачам (на примере ARQ):
from prometheus_async.aio.web import start_http_server_in_thread
async def main() -> None:
start_http_server_in_thread(port=settings.ARQ_EXPORTER_PORT)
async with store_lifespan() as store:
await store.worker.arq.run()
if __name__ == "__main__":
asyncio.run(main())
Обратите внимание, что в каждом воркере необходимо развернуть свой web-сервер.
Сейчас мы будем одновременно тестировать все 4 библиотеки, поэтому настроим Prometheus для сбора всех метрик:
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "fastapi"
static_configs:
- targets: ["fastapi:8000"]
- job_name: "arq"
static_configs:
- targets: ["arq:8001"]
- job_name: "saq"
static_configs:
- targets: ["saq:8002"]
- job_name: "faststream"
static_configs:
- targets: ["faststream:8003"]
- job_name: "async-celery"
static_configs:
- targets: ["async-celery-flower:5555"]
Соберём образ проекта:
docker build -t fastapi-async-tasks .
И запустим в docker compose. Все воркеры будут иметь одинаково небольшие ресурсы:
# docker-compose.yml
version: "3.8"
x-tasks-common: &tasks-common
image: fastapi-async-tasks
volumes:
- .:/app # для разработки
depends_on:
- fastapi
restart: on-failure
networks:
- default
deploy:
resources:
limits:
memory: 200M
cpus: "0.2"
services:
redis:
image: redis:7.2.4
ports:
- "6379:6379"
restart: on-failure
networks:
- default
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- .prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
command:
- --config.file=/etc/prometheus/prometheus.yml
restart: on-failure
networks:
- default
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- type: volume
source: grafana
target: /var/lib/grafana
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_INSTALL_PLUGINS=redis-app
user: "0"
restart: on-failure
networks:
- default
fastapi:
image: fastapi-async-tasks
ports:
- "8000:8000"
volumes:
- .:/app
depends_on:
- redis
- prometheus
- grafana
restart: on-failure
networks:
- default
deploy:
resources:
limits:
memory: 200M
cpus: "0.2"
command:
- uvicorn
- app.main:app
- --host=0.0.0.0
- --port=8000
- --reload
- --workers=${FASTAPI_WORKERS}
arq:
<<: *tasks-common
ports:
- "8001:8001" # metrics
command:
- python
- -m
- worker_arq
saq:
<<: *tasks-common
ports:
- "8082:8082" # web dashboard
- "8002:8002" # metrics
command:
- python
- -m
- worker_saq
faststream:
<<: *tasks-common
ports:
- "8003:8003" # metrics
command:
- faststream
- run
- app.worker.faststream.app:faststream_app
- --workers=${FASTSTREAM_CONCURRENCY}
async-celery:
<<: *tasks-common
command:
- celery
- -A
- app.worker.async_celery.app:async_celery_app
- worker
- --loglevel=info
- --concurrency=${ASYNC_CELERY_CONCURRENCY}
async-celery-flower:
image: fastapi-async-tasks
ports:
- "5555:5555"
volumes:
- .:/app
depends_on:
- async-celery
restart: on-failure
networks:
- default
command:
- celery
- -A
- app.worker.async_celery.app:async_celery_app
- flower
volumes:
grafana:
networks:
default:
driver: bridge
docker-compose up -d
Нагрузочное тестирование
Чтобы не деплоить слишком много, будем проверять корректность выполненной работы при помощи инкремента числа в Redis. Одна выполненная задача — один инкремент. Финальное значение посмотрим в Grafana, это послужит контролем того, что все фоновые задачи выполнены успешно.
Redis работает быстро, поэтому в сравнительном тестировании мы сможем проверить скорость работы именно самой библиотеки (поставить задачу в очередь, достать задачу из очереди).
Во всех тестах используется 1 реплика FastAPI.
Проверим 5 разных эндпойнтов, которые запускают свои фоновые задачи:
io/simple: асинхронный redis
io/sync: синхронный redis
io/thread: синхронный redis в отдельном треде
ThreadPoolExecutor(max_workers=3)
cpu/simple: числодробильная работа + асинхронный redis
cpu/process: числодробильная работа в отдельном процессе
ProcessPoolExecutor(max_workers=3)
+ асинхронный redis
Для нагрузочного тестирования воспользуемся библиотекой Locust.
Посмотреть код
# locust/master.conf
headless = true # управление через консоль
expect-workers = 10 # сначала дожидаемся старта 10 процессов
host = http://localhost:8000 # адрес FastAPI
# locust/background.py
from random import shuffle
from locust import between, HttpUser, task
# для тестирования Background Task
class TaskUser(HttpUser):
wait_time = between(0.001, 0.001)
@task
def execute(self):
endpoint = os.environ.get("endpoint")
endpoints = (
[endpoint]
if endpoint
else ["io/simple", "io/sync", "io/thread", "cpu/simple", "cpu/process"]
)
shuffle(endpoints)
for endpoint in endpoints:
self.client.post(f"/api/v1/incr/{endpoint}/background", json={"value": 1})
# locust/benchmark.py
from random import shuffle
from locust import between, HttpUser, task
# для тестирования остальных библиотек
class TaskUser(HttpUser):
wait_time = between(0.001, 0.001)
@task
def execute(self):
endpoint = os.environ.get("endpoint")
endpoints = (
[endpoint]
if endpoint
else ["io/simple", "io/sync", "io/thread", "cpu/simple", "cpu/process"]
)
worker = os.environ.get("worker")
workers = (
[worker]
if worker
else ["arq", "saq", "faststream", "async-celery"]
)
params = [(endpoint, worker) for endpoint in endpoints for worker in workers]
shuffle(params)
for endpoint, worker in params:
self.client.post(f"/api/v1/incr/{endpoint}/{worker}", json={"value": 1})
Параметры:
-i: количество итераций
-u: количество пользователей
Пример команды запуска:
env endpoint=io/simple locust --config locust/master.conf -f locust/background.py -i 10000 -u 100
Таблицы и графики
Как Background Task влияет на производительность FastAPI
# 200M memory, 0.2 cpus
Итерации (количество фоновых задач) |
Пользователи |
RPS |
Время запроса (медиана), мс |
Время запроса (95-й перцентиль), мс |
Время выполнения всех задач, сек |
Количество ошибок (%) |
|
io/simple |
50000 |
100 |
319 |
200 |
390 |
160 |
0.0 |
io/sync |
50000 |
100 |
290 |
280 |
410 |
180 |
0.0 |
io/thread |
50000 |
100 |
209 |
400 |
600 |
245 |
0.0 |
cpu/simple |
100 |
3 |
0.4 |
7300 |
9600 |
245 |
0.0 |
cpu/process |
1000 |
3 |
16 |
190 |
400 |
2460 |
0.0 |
# io/simple (асинхронный redis)
Фоновые задачи простые и быстрые, поэтому видим стабильно хорошие показатели RPS.
# io/sync (синхронный redis)
Сказывается синхронный код: значение RPS меньше, а время выполнения задач больше. Чем дольше будет выполняться синхронный код, тем хуже будут становиться показатели.
# io/thread (синхронный redis в отдельном треде)
Не стоит злоупотреблять этим вариантом, т.к. выполнение кода в другом треде требует накладных расходов, что мы и видим по таблице и на графиках: показатели заметно хуже, чем в синхронном варианте. При более сложных задачах может сильно проседать производительность.
# cpu/simple (числодробильная работа + асинхронный redis)
FastAPI был настолько загружен, что не всегда успевал отдавать метрики. Вывод очевиден: CPU-bound фоновые задачи необходимо выносить в другой сервис.
# cpu/process (числодробильная работа в отдельном процессе + асинхронный redis)
С точки зрения Locust было около 16 RPS, но для FastAPI запрос считается завершенным только после выполнения фоновой задачи. Именно поэтому в Grafana значение RPS находится около нуля, а значение Total requests
растёт вместе с Executed tasks
. Вывод: BackgroundTask можно использовать только при небольшом количестве таких фоновых задач.
Показатели FastAPI при отправке фоновых задач в другой сервис
Тестируем эндпойнт io/simple, 50000 итераций, 100 пользователей
Чтобы не ущемлять FastStream, для чистоты эксперимента ARQ, SAQ и Celery будут иметь concurrency=1.
# 200M memory, 0.2 cpus
RPS |
Время запроса (медиана), мс |
Время запроса (95-й перцентиль), мс |
|
ARQ |
227 |
380 |
590 |
SAQ* |
162* |
290* |
500* |
FastStream |
262 |
300 |
490 |
Celery |
151 |
590 |
880 |
ARQ
Хорошие показатели, стабильная работа. Разница между временем окончания нагрузки и последней выполненной задачей небольшая.
SAQ
Тут результат неожиданный.
Сначала задачи бодро ставились в очередь, скорость работы была чуть выше, чем у ARQ
Но вскоре сервис стал деградировать, а новые задачи не ставились в очередь
В итоге web-сервер перестал отвечать на запросы, тестирование пришлось остановить
Увеличение ресурсов в 2 раза решило проблему, но ненадолго: увеличив количество итераций в 2 раза, проблема вернулась. Пройти тест удалось лишь после увеличения времени ожидания между запросами.
FastStream
Лучшие показатели среди представленных библиотек. Фоновые задачи выполнялись практически сразу после постановки в очередь.
Celery
Показал самые скромные результаты. Последняя задача выполнилась сильно позже окончания нагрузки.
Сравнительное тестирование скорости выполнения задач
Сравнительная производительность после увеличения ресурсов
# 500M memory, 0.5 cpus
RPS |
Время запроса (медиана), мс |
Время запроса (95-й перцентиль), мс |
|
ARQ |
1114 |
18 |
34 |
SAQ |
1079 |
19 |
37 |
FastStream |
1178 |
15 |
31 |
Celery |
871 |
30 |
59 |
Как видим, увеличение ресурсов несколько сгладило картину для ARQ, SAQ и FastStream. У Celery результаты ожидаемо скромнее — сказывается синхронность библиотеки.
Обратите внимание, что в Celery внутри фоновой задачи нельзя создавать процессы и выполнять функции в ProcessPoolExecutor, получим ошибку AssertionError ('daemonic processes are not allowed to have children').
Итоги тестирования
BackgroundTasks можно использовать в небольших проектах либо для небольших I/O-bound задач. В остальных случаях лучше использовать специализированные библиотеки.
FastStream оказался самым скоростным решением и имеет очень интересный набор функционала.
Celery ожидаемо проигрывает в скорости, но за счёт широкого функционала может стать неплохим вариантом. Ожидаем шестую версию.
ARQ является простым решением, но ничем особым не выделился. Пару раз воркер переставал работать, хотя задачи ставились в очередь. Лечилось перезапуском воркера. Возможно, это поправили в новой версии.
SAQ является аналогом ARQ со своими плюсами и минусами. Зависает при длительной большой нагрузке.
Выводы
Если в коде много CPU-bound-операций, их обязательно нужно вынести в отдельный микросервис
-
Если нагрузка небольшая, а фоновые задачи — в основном I/O-bound операции, можно использовать встроенный функционал FastAPI Background Tasks. Но для стабильности всё равно лучше их вынести в отдельный микросервис
Другие наши статьи по бэкенду и асинхронному программированию для начинающих:
Цикл статей «Первые шаги в aiohttp»: пишем первое hello-world-приложение, подключаем базу данных, выкладываем проект в Интернет
Другие наши статьи по бэкенду и асинхронному программированию для продвинутого уровня:
Комментарии (15)
ThatSeemsLegit
24.05.2024 09:51Регулярно приходится сталкиваться с такими задачами и пришел к выводу, что лучше всего просто пульнуть таску в очередь, и оттуда уже сервисами их исполнять. Точно ничего не потеряется (nack на фейле) + легкий скейлинг и без спайков в нагрузке
Ryav
24.05.2024 09:51+2Спасибо за статью, неплохой обзор получился и открыл для себя FastStream.
Вопрос по воркерам FastStream: когда воркер забирает таску, разве он не должен сделать её недоступной для остальных, чтобы одну и ту же задачу не выполнили n раз?ipakeev Автор
24.05.2024 09:51+1Если у каждой библиотеки запустить по 5 воркеров, то результат будет таким: arq, saq и celery выполнят все 10к задач, а faststream - в 5 раз больше (потому что одна и та же задача будет выполняться на каждом воркере).
Думаю, у faststream такое поведение из-за того, что в первую очередь он создан для общения между микросервисами, а не для фоновых задач.
Ryav
24.05.2024 09:51Ну, собственно, я это и спрашиваю — почему все воркеры берут одну и ту же задачу? Либо её статус никак не меняется, либо не успевает поменяться (все 5 уже забрали до изменения статуса), либо что-то ещё.
evgenii_moriakhin
24.05.2024 09:51FastStream предполагает, что потребление только одним подписчиком нужно реализовать вручную: https://github.com/airtai/faststream/issues/693
И в целом модель применения FastStream отличается от arq/saq/celery - они нужны для фоновых\запланированных задач, а FastStream для управляемой событиями архитектуры сервисов
Но, конечно, никто не мешает вам начать применять их, как вам хочется
ipakeev Автор
24.05.2024 09:51+1При тестировании замечал, что иногда один из воркеров работает заметно медленнее остальных. Тем не менее, это не мешает ему обработать все 10к задач, даже если остальные воркеры завершили работу десятки секунд назад.
Как указал выше, faststream создан именно для общения между микросервисами. Это подразумевает, что все, кто подписан на топик, должны получить сообщение.
С другой стороны, для разных брокеров есть куча специфичных параметров, которые позволяют гранулированно настроить поведение публикации/обработки сообщений, в том числе отработку только один раз. Но этот момент не совсем очевидный и требует проверки, например встречал похожий issue для NATS.
lehshik
24.05.2024 09:51для каких задач применяются все эти технологии, для интернет магазинов это как будто избыточно, крупные проекты наверное испольуют самописные системы?
ipakeev Автор
24.05.2024 09:51+1Если у вас CRUD'овый интернет магазин, то избыточно. Но как только появляются оплаты, интеграции, квитанции, задачи по расписанию, то без фоновых задач никак.
Даже если эти задачи находятся в отдельных микросервисах (как у крупных проектов), тот же faststream значительно упростит реализацию.
evgenii_moriakhin
24.05.2024 09:51А что скажете про бенчмарки автора saq?
В бенчмарках автора 1000 итераций, на моей машине итераций saq спокойно переносит 500к и обгоняет arq со 100к.
ARQ enqueue 100000 18.29077935218811
ARQ process 100000 noop 501.143620967865
SAQ enqueue 500000 39.770418882369995
SAQ process 500000 noop 209.9531545639038
arq же в свою очередь не смог пройти 300к и заставил стаймаутить редис
подозреваю тут тоже играет роль этот плюс saq:
>> Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFYipakeev Автор
24.05.2024 09:51Бенчмарки действительно показывают, что saq работает значительно быстрее, чем arq. Локально цифры тоже подтверждаются.
Но чтобы это как-то отразилось в реальной практике, похоже, нужны бешеные нагрузки.
Почему при нагрузочном тестировании saq так странно себя ведет - загадка. Судя по логам, сервер периодически намертво зависает на 5-20 секунд в момент постановки задачи в очередь (в это время FastAPI не может обработать ни один запрос).
evgenii_moriakhin
24.05.2024 09:51+1К минусам arq так же можно отнести, что достаточно долго над ним не велась работа, поэтому и появилcя saq, как его переработанный форк
(и интересно, что saq выбрали ребята из Litestar в своих примерах - https://github.com/litestar-org/litestar-pg-redis-docker/blob/main/app/lib/worker.py)
но у автора arq (он же автор pydantic'a, к слову) в планах целый роадмап по переработке arq - https://github.com/samuelcolvin/arq/issues/437ipakeev Автор
24.05.2024 09:51+1Да, изначально одним из минусов было то, что последний релиз arq был в конце 2022 года, но внезапно 1 мая этого года выкатили новую версию. Поэтому пришлось убрать)
AikoASMR
24.05.2024 09:51Терпим и ждём celery asyncio pool и django async db driver. Слишком больно по времени терять среду Джанги и собирать велосипеды из кучи разрозненных либ.
Flosckow
Если мне память не изменяет, то фоновые задачи выполняются последовательно и если в какой-то из задач рейзится исключение - то следующие не выполнятся. Так что это можно тоже отметить как потенциальный минус
ipakeev Автор
Это про какую библиотеку?