Привет, Habr! Возможно, кто-то меня даже помнит: совсем недавно я создал Python Framework для асинхронных приложений - Propan.

Два месяц назад я делился с вами прогрессом и говорил, что работа над PropanV2 активно ведется. И вот, она завершена. Вот только вместо PropanV2 свет увидел немного другой фреймворк - FastStream, разработкой которого занимался уже не только я, а целая команда AirtAI.

В рамках статьи я расскажу, чем проекты отличаются, как переехать с одного на другой, ну и конечно, как так вообще получилось. Если вам интересно, как я "продал" opensource проект, и как от этого все остались в выигрыше (особенно пользователи) - добро пожаловать под кат!

Очень длинное предисловие

Что такое Propan?

Небольшой экскурс в недалекую историю...

Когда-то давно, когда я активно работал с RabbitMQ, я постоянно удивлялся, почему такой мощный инструмент не имеет удобного инструментария в экосистеме Python. Для HTTP-сервисов у нас есть все, а для асинхронных - лишь "голые" клиенты. Так родилась идея создания удобного фреймворка для работы с RabbitMQ.

Эта идея быстра нашла воплощение в небольшой приватной библиотеке для внутренних нужд, которая позволяла быстро поднимать однотипные сервисы вокруг RabbitMQ.

Однако, полгода назад при работе с Kafka (по великой нужде), я понял, что проблема стоит не вокруг RabbitMQ, а вокруг любых брокеров сообщений/очередей/стримов и тд. И тогда я начал работать над проектом всерьез: было принято решение сделать на его базе поддержку ЛЮБЫХ брокеров (а где-то и brokerless pub/sub) и вывести это все в Open Source.

Так, в мае этого года вышел первый открытый релиз Propan, который позволяет вам в достаточно декларативном синтаксисе объявлять ваших асинхронных потребителей:

from propan import PropanApp, RabbitBroker

broker = RabbitBroker()
app = PropanApp(broker)

@broker.handle("test-queue")
async def handler(user: str, user_id: int):
    print(f"User {user} - {user_id} created")

Откуда взялись AirtAI

Естественно, пока я занимался проектом, я мониторил, какие существуют конкуренты. На момент начала работы над приватным проектом - их не было. На момент начала работы над opensource - были, но в основном - такие же "поделки" энтузиастов в альфа версии.

Из крупного - только FastKafka. Это фреймворк от AirtAI с функционалом крайне близким к тому, что хотел получить я, однако только для Kafka. Но, в их бэклоге лежала задача на добавление поддержки других брокеров. И на тот момент у них уже было 400+ звезд на GitHub.

Так почему я не стал ждать добавления поддержки, а решил делать свое?

На то был ряд причин:

  • AirtAI - компания датасаентистов про датасаенс. Хотя функционал их проекта и был близок к тому, что я хотел, но интерфейсы оставляли желать лучшего

  • Сам пакет собирался из Jupiter блокнотов. Это "прикольно", но очень сильно затрудняет правки "от сообщества"

  • Их кодовая база не была готова к внедрению поддержки других брокеров

  • История коммитов говорила о том, что люди застряли на багфиксах и новый функционал ждать придется очень нескоро

  • Хотя проект имел много звезд и поддержку от разных разработчиков, но фактические скачивания были минимальны

В целом, наличие такого проекта (и некоторых остальных) говорило о том, что потребность в инструменте есть, но сам инструментарий - в зачаточном состоянии. Надо делать!

Почему мы объединились

С момента выхода первого релиза Propan, я достаточно быстро смог привлечь к нему хоть какое-то внимание разработчиков. Посыпались звезды на гит, скачивания, и главное - фидбек.

И достаточно быстро я сообразил, что OSS - это здорово и весело, но тянуть поддержку проекта таких амбиций, наращивать функционал, писать документацию, заниматься продвижением, взаимодействовать с комьюнити и тд, в одного - это слишком.

Поэтому уже в начале августа, когда проект набрал некоторое "весомое" количество звезд на GitHub, а по скачиваниям в несколько раз обошел FastKafka, я решил написать AirtAI и предложить им не конкурировать, но объединить наши усилия.

Согласие на сотрудничество пришло мне через 5 минут.

Как это работает

После объединения с AirtAI мы взялись за Documentation Driven Development: несколько недель мы спорили и составляли спецификацию проекта, который должен получиться в итоге. Что-то пришло из Propan, что-то из FastKafka (оба deprecated сейчас). В целом, после литров пролитой в спорах крови, все остались довольны.

Новый фреймворк FastStream выходит под эгидой команды AirtAI, а я все еще остаюсь ведущим разработчиком на проекте. От оффера я был вынужден отказаться, поэтому у нас сложилось очень интересный формат сотрудничества, в детали которого я не хотел бы углубляться. Однако, это абсолютно win-2-win-2-win ситуация.

WIN: Я доволен, т.к. проект теперь продвигается так, как я не смог бы: релиз на Infobip Shift, презентация на PyCon Praga, новостные пресс-релизы и тп, а также дополнительные "руки", которые работают вместе со мной над тем, что я люблю.

WIN: AirtAI получили сильный продукт для своего технического PR.

WIN: Пользователи получают целую компанию в поддержку инструмента, что является страховкой от моего выгорания и ухода из проекта.

А что, собственно, новенького?

А много чего! Но в первую очередь, я должен сказать, что FastStream построен на базе Propan с дополнительными возможностями из FastKafka. Поэтому почти весь Propan код (после переименования и корректировки импортов) корректно работает и в FastStream (гайд по миграции)

Основные нововведения - добавление publisher'ов и расширенный функционал для тестирования.

Publisher - абсолютно новая сущность, которая позволяет вам удобно строить конвейеры обработки данных: вы просто объявляете, откуда получать данные и куда отправлять результат, ваш код занимается только их обработкой.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def process_data(msg: str) -> str:
    return f"{msg=} processed"

Testability - очень важный аспект в FastStream. In-Memory тестирование уже было в Propan, однако в FastStream этот концепт существенно расширили, добавив инструменты для end-2-end тестирования. Теперь вы можете валидировать как входящие, так и исходящие сообщения, а также переключать ваши тесты на использование реального брокера всего одним параметром.

import pytest
from faststream.kafka import TestKafkaBroker

@pytest.mark.asyncio
async def test_process_data():
    async with TestKafkaBroker(broker, with_real=False) as br:
        await br.publish("Hello!", topic="in-topic")

        process_data.mock.assert_called_once_with("Hello!")

Кроме этого добавлена куча новых мелочей: Middlewares, более мощные Router'ы, перегрузка subscriber'ов, улучшенная кастомизация логирования и некоторые другие вещи, положительно сказывающиеся на опыте использования.

Куда дальше?

Проект сейчас активно развивается: мы тесно взаимодействуем с компаниями и людьми, которые уже приняли на вооружение FastStream и обрабатываем фидбек. На текущий момент FastStream поддерживает работу с RabbitMQ, Kafka и NATS. В ближайшее время ожидается переезд поддержки Redis Pub/Sub и SQS из Propan, а также поддержка Redis Streams, о которой нас очень просят.

Поскольку у проекта сейчас два визионера, то все изменения проходят через согласования с обеих сторон, что позволяет нам существенно улучшить качество продукта, отполировав каждую идею еще до ее реализации (Documentation Driven Development, помните?). Все эти обсуждения ведутся прямо на GitHub, поэтому любой человек, заинтересованный в Open Source также может принять участие.

Например, сейчас мы активно спорим вокруг того, как все-таки "правильно" именовать сущности в AsyncAPI спецификации, которая генерируется автоматически.

Если быть совсем откровенным, то FastStream - очень продвинутый, удобный и унифицированный клиент для различных брокеров сообщений. Но это всего лишь клиент, на базе которого можно построить отличные решения еще более высокого уровня (например, очередного убийцу Celery или платформу распределенной обработки данных, сервисы в которой генерируются из конфига). Те же AirtAI разработали CLI утилиту поверх ChatGPT, которая генерирует готовые FastStream сервисы по вашему описанию. Посмотрим, каким еще новым проектам откроет путь FastStream.

Вместо вывода

FastStream - пример того, какие уникальные отношения могут строиться в мире OpenSource. Я просто хотел поделиться с вами этим кейсом, и надеюсь, он вдохновит кого-то заняться своим Pet-проектом и развить его в нечто большее (мой лежал на полке несколько лет).

Комментарии (0)