В Python при выборе библиотеки для работы с MQTT почти всегда приходишь к paho-mqtt. Это зрелый и самый популярный клиент, но его API построен на колбэках, а современное Python-приложение живёт в asyncio: FastAPI, фоновые воркеры, асинхронные клиенты и всё это в одном общем event loop.

В одном из IoT-проектов я столкнулся ровно с этим. Мне нужен был MQTT-клиент, который без сложной адаптации встраивается в асинхронное приложение и позволяет работать с подписками как с управляемыми объектами, а не через набор колбэков.

Похожий запрос давно был и в FastStream. Задачу на поддержку MQTT открыли ещё в ноябре 2023 года, но долго не закрывали: интерес был, а вот с базовым драйвером всё было не так очевидно. Заворачивать paho-mqtt в асинхронную обёртку можно, но тогда в основе интеграции FastStream всё равно остаётся callback/thread-модель. В итоге я написал отдельный драйвер — zmqtt, а затем и MQTT-интеграцию в FastStream.

Меня зовут Борис Алексеев, я senior разработчик и фичалид в Райффайзенбанке, один из мейнтейнеров FastStream и автор zmqtt. В этой статье покажу, как теперь выглядит MQTT-сервис на Python: асинхронные обработчики, управляемые подписки, тестирование без реального брокера и AsyncAPI-документация. А заодно кратко расскажу, зачем под это пришлось написать новый драйвер.

Почему пришлось начать с драйвера

paho-mqtt — зрелый проект и де-факто стандартный MQTT-клиент для Python. Он поддерживает все популярные версии протокола MQTT: 5.0, 3.1.1 и 3.1. Асинхронные обёртки над paho существуют, но для FastStream этого всё равно было недостаточно. Фреймворку нужно не просто дождаться следующего сообщения из брокера, а маршрутизировать поток сообщений по подписчикам, управлять их жизненным циклом и не превращать каждую подписку в отдельное MQTT-соединение. В случае с paho-mqtt остаются два варианта: писать сложный слой роутинга поверх чужой callback-модели или плодить соединения на каждую подписку. Первый вариант неприятно поддерживать, а второй недопустим из-за расточительного обращения с ресурсами.

Также хочется более удобный API: объект подписки вместо колбэка, готовая имплементация request/response, строго типизированные структуры данных в противовес универсальным контейнерам для MQTT 5.0 properties в paho.

Критерий

paho-mqtt

zmqtt

Модель выполнения

Callback API, threading

asyncio

Подписки

subscribe() + callbacks

Subscription — отдельный объект с жизненным циклом

Request/response

Нет готовой высокоуровневой абстракции

Есть request() для MQTT 5.0, который сам подписывается на reply topic и ждёт ответ

Интеграция с FastStream

Нужен отдельный слой роутинга и lifecycle

Драйвер сразу писался под асинхронную интеграцию с FastStream

Зрелость

Много лет в эксплуатации, самое популярное решение

Новая библиотека, меньше 100 звёзд

Зачем здесь FastStream

Концепцию FastStream в своей статье сформулировал автор библиотеки Никита Пастухов:

FastStream — это очень толстый клиент для брокеров, который позволяет вам писать меньше инфраструктурного кода и сконцентрироваться на бизнес-логике ваших приложений.

Zmqtt отвечает за протокол и сетевое взаимодействие: соединение с брокером, подписки, публикацию, request/response и всё, что относится к MQTT как к транспорту. FastStream стоит выше и берёт на себя прикладной уровень: маршрутизацию сообщений по обработчикам, сериализацию и десериализацию, Dependency Injection, observability и tracing, документацию AsyncAPI и утилиты для тестирования приложений.

Именно поэтому это две разные сущности, а не один большой MQTT-клиент. Если смешать транспортный слой и фреймворк внутри одной библиотеки, получится либо тяжёлый клиент со слишком широкой ответственностью, либо неудобная точка расширения для прикладного кода. Разделение на zmqtt и FastStream даёт более чистую границу: один компонент знает про MQTT-протокол, второй — про удобную разработку сервиса поверх него.

Установка:

pip install "faststream[mqtt]"

Чтобы воспроизвести следующие примеры локально, поднимите MQTT-брокер в отдельном терминале:

docker run --rm -p 1883:1883 eclipse-mosquitto:2 mosquitto -c /mosquitto-no-auth.conf

Простой сервис

Ниже пример, который можно положить в main.py и запустить. Приложение подписывается на температуру устройства, достаёт device_id из MQTT-топика, валидирует payload как float. После старта приложение публикует тестовое сообщение, чтобы показать вызов обработчика.

from typing import Annotated

from faststream import FastStream, Path
from faststream.mqtt import MQTTBroker


broker = MQTTBroker()
app = FastStream(broker)


@broker.subscriber("temperature-sensors/{device_id}/temperature")
async def handle_temperature(
    temperature: float,
    device_id: Annotated[str, Path()],
) -> None:
    print(f"{device_id}: {temperature}")


@app.after_startup
async def publish_test_message() -> None:
    await broker.publish(
        21.5,
        "temperature-sensors/thermostat-1/temperature",
    )

Запуск:

faststream run main:app

FastStream отправит MQTT SUBSCRIBE на topic filter temperature-sensors/+/temperature. Когда из топика temperature-sensors/thermostat-1/temperature придёт сообщение, обработчик получит device_id="thermostat-1" и temperature=21.5.

Runtime-возможности

Версии MQTT

MQTT 3.1.1 и MQTT 5.0. MQTT 3.1 пока не поддерживается в zmqtt, но есть в планах развития.

Параллельная обработка сообщений

По умолчанию subscriber обрабатывает сообщения последовательно. Если вы хотите обрабатывать несколько сообщений одновременно, укажите max_workers.

@broker.subscriber(
    "temperature-sensors/{device_id}/temperature",
    max_workers=5,
)
async def handle_temperature(
    temperature: float,
    device_id: Annotated[str, Path()],
) -> None:
    print(f"{device_id}: {temperature}")

Shared subscriptions

MQTT shared subscriptions позволяют нескольким клиентам подписаться на один топик таким образом, чтобы каждое сообщение получил только один клиент из группы. Это похоже на consumer group в Kafka.

@broker.subscriber("workers/jobs/#", shared="pool-a")
async def handle_job(job: dict) -> None:
    print(job)

В брокер уйдёт SUBSCRIBE на топик $share/pool-a/workers/jobs/#.

Acknowledgment

В MQTT QoS описывает доставку между брокером и клиентом, а не успешную обработку сообщения бизнес-кодом. Поэтому, если следовать MQTT идиоматически, протокольный ack обычно остаётся на уровне клиента: получил пакет, подтвердил получение, и дальше уже вызываешь логику.

Но FastStream всё равно позволяет управлять этим явно: отправлять Ack для QoS 1 или запускать протокольный обмен подтверждениями для QoS 2 после пользовательской обработки. Это скорее демонстрация контроля над жизненным циклом сообщения, чем рекомендация превращать MQTT в очередь задач с бизнес-ack.

from faststream import AckPolicy, FastStream
from faststream.mqtt import MQTTBroker, MQTTMessage, QoS


broker = MQTTBroker()
app = FastStream(broker)


@broker.subscriber(
    "jobs/run",
    qos=QoS.AT_LEAST_ONCE,
    ack_policy=AckPolicy.MANUAL,
)
async def work(payload: dict, msg: MQTTMessage) -> None:
    try:
        print(payload)
    finally:
        await msg.ack()

По умолчанию сообщение подтверждается при получении, и в большинстве MQTT-сценариев именно это поведение стоит оставлять.

Publish

Публиковать можно напрямую через брокер.

await broker.publish(
    {"value": 21.5, "unit": "celsius"},
    "devices/thermostat-1/rooms/kitchen/temperature",
    qos=QoS.AT_LEAST_ONCE,
    retain=True,
)

Если публикация является частью обработчика, удобнее описать publisher декларативно через декоратор.

@broker.subscriber("devices/{device_id}/commands")
@broker.publisher("devices/events")
async def handle_command(
    device_id: Annotated[str, Path()],
    command: dict,
) -> dict:
    print(f"Command for device {device_id}: {command}")
    return {"device_id": device_id, "command": command}

Request/response

В MQTT 5.0 сценарий request/response строится на стандартных свойствах PUBLISH: Response Topic и Correlation Data. Если correlation_id не передан явно, FastStream сгенерирует его сам. Основная логика для MQTT 5.0 реализована на уровне zmqtt: клиент создаёт служебный топик для ответа, подписывается на него, отправляет запрос и ждёт первое сообщение в ответ. Если ответ не приходит за заданное время, будет ошибка по таймауту.

broker = MQTTBroker(version="5.0")

reply = await broker.request(
    {"request_id": "req-42"},
    "devices/thermostat-1/status",
    timeout=5.0,
)

print(reply.body)

В MQTT 3.1.1 таких свойств нет, поэтому FastStream поддерживает request/response через явный reply_to-топик. Будет работать, если обработчик сообщения знает, куда отвечать на уровне договорённости:

reply = await broker.request(
    "ping",
    "devices/thermostat-1/status",
    reply_to="devices/replies/thermostat-1",
    timeout=5.0,
)

Брокер FastStream подпишется на reply_to, отправит запрос и дождётся первого ответа в этом топике. Если вызвать request() в MQTT 3.1.1 без reply_to, будет ошибка.

AsyncAPI

Вернёмся к изначальному примеру, установим uvicorn и создадим ASGI-приложение на основе FastStream-приложения:

pip install uvicorn
app = FastStream(broker).as_asgi(asyncapi_path="/docs")

Запуск:

faststream run main:app

Теперь app — полноценное ASGI-приложение, которое помимо работы с брокером может раздавать HTTP-эндпоинты. AsyncAPI-спецификация будет доступна по адресу http://localhost:8000/docs и выглядит так:

AsyncAPI-документация MQTT-сервиса
AsyncAPI-документация MQTT-сервиса

Помимо описания асинхронного интерфейса есть также кнопка Try It Out: можно либо сымитировать отправку сообщения в приложение, либо, выбрав соответствующий чекбокс, отправить сообщение напрямую в брокер. Это позволяет проверить обработчик без отдельного publisher-клиента — прямо как в Swagger UI.

Try It Out в AsyncAPI-документации
Try It Out в AsyncAPI-документации

Тестирование

Для тестирования приложений есть асинхронный контекстный менеджер TestMQTTBroker. При публикации он вызывает обработчики напрямую, без реального MQTT-брокера. Внутри контекст-менеджера объект handle_temperature.mock вызывается вместе с самим обработчиком, что очень удобно для тестов.

test_example.py:

import pytest

from faststream.mqtt import MQTTBroker, TestMQTTBroker


broker = MQTTBroker()


@broker.subscriber("devices/{device_name}/temperature")
async def handle_temperature(payload: dict) -> None:
    print(payload)


@pytest.mark.asyncio
async def test_collect_temperature() -> None:
    async with TestMQTTBroker(broker) as test_broker:
        await test_broker.publish(
            {"value": 21.5, "unit": "celsius"},
            topic="devices/thermostat-1/temperature",
        )
        handle_temperature.mock.assert_called_once_with({"value": 21.5, "unit": "celsius"})

Запуск:

pytest test_example.py

Заключение

Теперь MQTT-сервисы на Python можно писать в том же декларативном стиле, что и приложения на других брокерах в FastStream: обработчик описывает прикладную логику, а фреймворк берёт на себя подписки, валидацию, тестирование и интеграцию с инфраструктурой.

zmqtt при этом остаётся отдельной библиотекой. Если вам нужен небольшой asyncio MQTT-клиент для MQTT 3.1.1 или 5.0 без внешних зависимостей, его можно использовать и без FastStream.

Если в ваших Python-проектах есть MQTT, буду рад обратной связи: расскажите, в каких проектах он у вас применяется и с какими проблемами вы сталкиваетесь. Если работа показалась полезной — поставьте звезду zmqtt: это поможет проекту стать заметнее.

Комментарии (3)


  1. KrySeyt
    29.06.2026 09:02

    Хорошая статья. Наконец-то есть нормальный клиент для MQTT


  1. qckzzi
    29.06.2026 09:02

    спасибо за статью, как раз очень нужен был простой способ работать с mqtt


  1. Ilnurathome
    29.06.2026 09:02

    gmqtt рассматривали?