В Python при выборе библиотеки для работы с MQTT почти всегда приходишь к paho-mqtt. Это зрелый и самый популярный клиент, но его API построен на колбэках, а современное Python-приложение живёт в asyncio: FastAPI, фоновые воркеры, асинхронные клиенты и всё это в одном общем event loop.
В одном из IoT-проектов я столкнулся ровно с этим. Мне нужен был MQTT-клиент, который без сложной адаптации встраивается в асинхронное приложение и позволяет работать с подписками как с управляемыми объектами, а не через набор колбэков.
Похожий запрос давно был и в FastStream. Задачу на поддержку MQTT открыли ещё в ноябре 2023 года, но долго не закрывали: интерес был, а вот с базовым драйвером всё было не так очевидно. Заворачивать paho-mqtt в асинхронную обёртку можно, но тогда в основе интеграции FastStream всё равно остаётся callback/thread-модель. В итоге я написал отдельный драйвер — zmqtt, а затем и MQTT-интеграцию в FastStream.
Меня зовут Борис Алексеев, я senior разработчик и фичалид в Райффайзенбанке, один из мейнтейнеров FastStream и автор zmqtt. В этой статье покажу, как теперь выглядит MQTT-сервис на Python: асинхронные обработчики, управляемые подписки, тестирование без реального брокера и AsyncAPI-документация. А заодно кратко расскажу, зачем под это пришлось написать новый драйвер.
Почему пришлось начать с драйвера
paho-mqtt — зрелый проект и де-факто стандартный MQTT-клиент для Python. Он поддерживает все популярные версии протокола MQTT: 5.0, 3.1.1 и 3.1. Асинхронные обёртки над paho существуют, но для FastStream этого всё равно было недостаточно. Фреймворку нужно не просто дождаться следующего сообщения из брокера, а маршрутизировать поток сообщений по подписчикам, управлять их жизненным циклом и не превращать каждую подписку в отдельное MQTT-соединение. В случае с paho-mqtt остаются два варианта: писать сложный слой роутинга поверх чужой callback-модели или плодить соединения на каждую подписку. Первый вариант неприятно поддерживать, а второй недопустим из-за расточительного обращения с ресурсами.
Также хочется более удобный API: объект подписки вместо колбэка, готовая имплементация request/response, строго типизированные структуры данных в противовес универсальным контейнерам для MQTT 5.0 properties в paho.
Критерий |
paho-mqtt |
zmqtt |
|---|---|---|
Модель выполнения |
Callback API, threading |
|
Подписки |
|
|
Request/response |
Нет готовой высокоуровневой абстракции |
Есть |
Интеграция с FastStream |
Нужен отдельный слой роутинга и lifecycle |
Драйвер сразу писался под асинхронную интеграцию с FastStream |
Зрелость |
Много лет в эксплуатации, самое популярное решение |
Новая библиотека, меньше 100 звёзд |
Зачем здесь FastStream
Концепцию FastStream в своей статье сформулировал автор библиотеки Никита Пастухов:
FastStream — это очень толстый клиент для брокеров, который позволяет вам писать меньше инфраструктурного кода и сконцентрироваться на бизнес-логике ваших приложений.
Zmqtt отвечает за протокол и сетевое взаимодействие: соединение с брокером, подписки, публикацию, request/response и всё, что относится к MQTT как к транспорту. FastStream стоит выше и берёт на себя прикладной уровень: маршрутизацию сообщений по обработчикам, сериализацию и десериализацию, Dependency Injection, observability и tracing, документацию AsyncAPI и утилиты для тестирования приложений.
Именно поэтому это две разные сущности, а не один большой MQTT-клиент. Если смешать транспортный слой и фреймворк внутри одной библиотеки, получится либо тяжёлый клиент со слишком широкой ответственностью, либо неудобная точка расширения для прикладного кода. Разделение на zmqtt и FastStream даёт более чистую границу: один компонент знает про MQTT-протокол, второй — про удобную разработку сервиса поверх него.
Установка:
pip install "faststream[mqtt]"
Чтобы воспроизвести следующие примеры локально, поднимите MQTT-брокер в отдельном терминале:
docker run --rm -p 1883:1883 eclipse-mosquitto:2 mosquitto -c /mosquitto-no-auth.conf
Простой сервис
Ниже пример, который можно положить в main.py и запустить. Приложение подписывается на температуру устройства, достаёт device_id из MQTT-топика, валидирует payload как float. После старта приложение публикует тестовое сообщение, чтобы показать вызов обработчика.
from typing import Annotated from faststream import FastStream, Path from faststream.mqtt import MQTTBroker broker = MQTTBroker() app = FastStream(broker) @broker.subscriber("temperature-sensors/{device_id}/temperature") async def handle_temperature( temperature: float, device_id: Annotated[str, Path()], ) -> None: print(f"{device_id}: {temperature}") @app.after_startup async def publish_test_message() -> None: await broker.publish( 21.5, "temperature-sensors/thermostat-1/temperature", )
Запуск:
faststream run main:app
FastStream отправит MQTT SUBSCRIBE на topic filter temperature-sensors/+/temperature. Когда из топика temperature-sensors/thermostat-1/temperature придёт сообщение, обработчик получит device_id="thermostat-1" и temperature=21.5.
Runtime-возможности
Версии MQTT
MQTT 3.1.1 и MQTT 5.0. MQTT 3.1 пока не поддерживается в zmqtt, но есть в планах развития.
Параллельная обработка сообщений
По умолчанию subscriber обрабатывает сообщения последовательно. Если вы хотите обрабатывать несколько сообщений одновременно, укажите max_workers.
@broker.subscriber( "temperature-sensors/{device_id}/temperature", max_workers=5, ) async def handle_temperature( temperature: float, device_id: Annotated[str, Path()], ) -> None: print(f"{device_id}: {temperature}")
Shared subscriptions
MQTT shared subscriptions позволяют нескольким клиентам подписаться на один топик таким образом, чтобы каждое сообщение получил только один клиент из группы. Это похоже на consumer group в Kafka.
@broker.subscriber("workers/jobs/#", shared="pool-a") async def handle_job(job: dict) -> None: print(job)
В брокер уйдёт SUBSCRIBE на топик $share/pool-a/workers/jobs/#.
Acknowledgment
В MQTT QoS описывает доставку между брокером и клиентом, а не успешную обработку сообщения бизнес-кодом. Поэтому, если следовать MQTT идиоматически, протокольный ack обычно остаётся на уровне клиента: получил пакет, подтвердил получение, и дальше уже вызываешь логику.
Но FastStream всё равно позволяет управлять этим явно: отправлять Ack для QoS 1 или запускать протокольный обмен подтверждениями для QoS 2 после пользовательской обработки. Это скорее демонстрация контроля над жизненным циклом сообщения, чем рекомендация превращать MQTT в очередь задач с бизнес-ack.
from faststream import AckPolicy, FastStream from faststream.mqtt import MQTTBroker, MQTTMessage, QoS broker = MQTTBroker() app = FastStream(broker) @broker.subscriber( "jobs/run", qos=QoS.AT_LEAST_ONCE, ack_policy=AckPolicy.MANUAL, ) async def work(payload: dict, msg: MQTTMessage) -> None: try: print(payload) finally: await msg.ack()
По умолчанию сообщение подтверждается при получении, и в большинстве MQTT-сценариев именно это поведение стоит оставлять.
Publish
Публиковать можно напрямую через брокер.
await broker.publish( {"value": 21.5, "unit": "celsius"}, "devices/thermostat-1/rooms/kitchen/temperature", qos=QoS.AT_LEAST_ONCE, retain=True, )
Если публикация является частью обработчика, удобнее описать publisher декларативно через декоратор.
@broker.subscriber("devices/{device_id}/commands") @broker.publisher("devices/events") async def handle_command( device_id: Annotated[str, Path()], command: dict, ) -> dict: print(f"Command for device {device_id}: {command}") return {"device_id": device_id, "command": command}
Request/response
В MQTT 5.0 сценарий request/response строится на стандартных свойствах PUBLISH: Response Topic и Correlation Data. Если correlation_id не передан явно, FastStream сгенерирует его сам. Основная логика для MQTT 5.0 реализована на уровне zmqtt: клиент создаёт служебный топик для ответа, подписывается на него, отправляет запрос и ждёт первое сообщение в ответ. Если ответ не приходит за заданное время, будет ошибка по таймауту.
broker = MQTTBroker(version="5.0") reply = await broker.request( {"request_id": "req-42"}, "devices/thermostat-1/status", timeout=5.0, ) print(reply.body)
В MQTT 3.1.1 таких свойств нет, поэтому FastStream поддерживает request/response через явный reply_to-топик. Будет работать, если обработчик сообщения знает, куда отвечать на уровне договорённости:
reply = await broker.request( "ping", "devices/thermostat-1/status", reply_to="devices/replies/thermostat-1", timeout=5.0, )
Брокер FastStream подпишется на reply_to, отправит запрос и дождётся первого ответа в этом топике. Если вызвать request() в MQTT 3.1.1 без reply_to, будет ошибка.
AsyncAPI
Вернёмся к изначальному примеру, установим uvicorn и создадим ASGI-приложение на основе FastStream-приложения:
pip install uvicorn
app = FastStream(broker).as_asgi(asyncapi_path="/docs")
Запуск:
faststream run main:app
Теперь app — полноценное ASGI-приложение, которое помимо работы с брокером может раздавать HTTP-эндпоинты. AsyncAPI-спецификация будет доступна по адресу http://localhost:8000/docs и выглядит так:

Помимо описания асинхронного интерфейса есть также кнопка Try It Out: можно либо сымитировать отправку сообщения в приложение, либо, выбрав соответствующий чекбокс, отправить сообщение напрямую в брокер. Это позволяет проверить обработчик без отдельного publisher-клиента — прямо как в Swagger UI.

Тестирование
Для тестирования приложений есть асинхронный контекстный менеджер TestMQTTBroker. При публикации он вызывает обработчики напрямую, без реального MQTT-брокера. Внутри контекст-менеджера объект handle_temperature.mock вызывается вместе с самим обработчиком, что очень удобно для тестов.
test_example.py:
import pytest from faststream.mqtt import MQTTBroker, TestMQTTBroker broker = MQTTBroker() @broker.subscriber("devices/{device_name}/temperature") async def handle_temperature(payload: dict) -> None: print(payload) @pytest.mark.asyncio async def test_collect_temperature() -> None: async with TestMQTTBroker(broker) as test_broker: await test_broker.publish( {"value": 21.5, "unit": "celsius"}, topic="devices/thermostat-1/temperature", ) handle_temperature.mock.assert_called_once_with({"value": 21.5, "unit": "celsius"})
Запуск:
pytest test_example.py
Заключение
Теперь MQTT-сервисы на Python можно писать в том же декларативном стиле, что и приложения на других брокерах в FastStream: обработчик описывает прикладную логику, а фреймворк берёт на себя подписки, валидацию, тестирование и интеграцию с инфраструктурой.
zmqtt при этом остаётся отдельной библиотекой. Если вам нужен небольшой asyncio MQTT-клиент для MQTT 3.1.1 или 5.0 без внешних зависимостей, его можно использовать и без FastStream.
Если в ваших Python-проектах есть MQTT, буду рад обратной связи: расскажите, в каких проектах он у вас применяется и с какими проблемами вы сталкиваетесь. Если работа показалась полезной — поставьте звезду zmqtt: это поможет проекту стать заметнее.
KrySeyt
Хорошая статья. Наконец-то есть нормальный клиент для MQTT