Привет!
Меня зовут Петр Коробейников, я техлид команды DBaaS for Redis в #CloudMTS.
Некоторое время назад я озадачился созданием общего набора инструментов для наших команд разработки. Цель была проста: разработчик не тратит время на погружение в логику работы конкретного инструмента, берет готовую инструкцию и просто делает свое дело — пишет код. Типовое окружение поможет переходить ребятам из команды в команду и быстро адаптироваться, а новичку — проще приступить к работе.
Сегодня я хочу рассказать про один из элементов такого типового окружения, который позволяет быстро начать работу с брокерами сообщений. Даже если разработчик Kafka и прочие брокеры до этого в глаза не видел. Речь пойдет о шине данных или событий (EventBus) и про то, как мы настроили ее кодогенерацию.
Что собой представляет шина данных и зачем она нужна?
В своей работе мы имеем дело с микросервисной архитектурой. Для асинхронного общения между сервисами мы используем шину событий. В данный момент под ее капотом Kafka (про нее писали здесь), но на ее месте может быть NSQ, Pulsar или даже Pub/Sub в Redis. В этом и суть, что разработчику для настройки обмена сообщениями не будет важно, что там внутри.
В целом схема работает следующим образом:
Разработчик описывает сообщение/событие по строгой protobuf-спецификации. Он прописывает, что, откуда и кому должно быть отправлено. Если разработчик решит написать что-то не по форме, то код просто не скомпилируется.
На основе созданной спецификации разработчик запускает автоматическую кодогенерацию кода для продьюсеров и консьюмеров.
Разработчик не ломает голову, как оформить сообщение, и не тратит время на ручное создание продьюсеров и косьюмеров. Профит!
Теперь поговорим обо всем подробнее.
Спецификация
Строгую спецификацию мы создали на основе Protocol Buffers (protobuf). Хотя с этой технологией мы работаем давно, в основном в контексте gRPC, мы все же решили посмотреть на другие варианты.
Кратко расскажу, почему не выбрали инструменты ниже:
AsyncAPI. Достойная альтернатива, которая позволяет использовать любые средства доставки. Но спецификацию пришлось бы писать в виде развесистого yaml-файла, очень близкую к спецификации OpenAPI (Swagger). Это очередная история про программирование на yaml, которая всегда вызывает определенную неприязнь у разработчиков. Существуют инструменты, аналогичные Swagger, которые умеют генерировать спецификацию уже по написанному коду. Но это тоже большая боль — любой сгенерированный, например, по аннотациям swagger-файл всегда неточен. Мы же используем принцип — specification first.
Помимо этого, по субъективным ощущениям, документация у AsyncAPI неполная и отстает от реальности, хотя сообщество разработчиков достаточно часто проводит открытые встречи. Их записи можно посмотреть на официальном канале.
Cloudevents. Также поддерживает множество способов передачи сообщений: AMQP (RabbitMQ), Kafka, MQTT, NATS и прочее.
Основная проблема — это отсутствие схемы самого сообщения, схемы полезной нагрузки (payload): по умолчанию данные могут быть записаны в произвольном формате без возможности специфицировать структуру. В целом Cloudevents больше про то, кто продьюсер, кому нужно доставить сообщение и когда оно было отправлено.
oneof data {
// Binary data
bytes binary_data = 2;
// String data
string text_data = 3;
// Protobuf Message data
google.protobuf.Any proto_data = 4;
}
Кроме того, показалось, что порог входа в Cloudevents достаточно высокий. А по опыту адаптации в прошлом стало понятно, что и тогда, и сейчас это оверкилл.
В итоге мы написали отдельный плагин на go для protoc (protoc-gen-go-eventbus).
Плагин поддерживает опцию, в которой мы задаем топик для событий, место, куда события будут публиковаться и откуда читатели будут вычитывать его.
Для формирования имени топика мы используем следующий формат: сначала идет namespace проекта, потом название проекта и сущность или действие, которое выполняется.
syntax = "proto3";
package eventbus;
option go_package = "./;eventbus";
import "cloud.mts.ru/protobuf/eventbus/annotation.proto";
import "dbaas-redis.redis-hub-gateway.types.proto";
message ApplianceCreateClaimedEvent {
option (cloud.mts.ru.protobuf.eventbus.topic) = "dbaas-redis.redis-hub-gateway.appliance-create-claimed";
message Resource {
int32 total_cpu = 1;
IntegerUnit total_ram = 2;
IntegerUnit disk_size = 3;
string disk_policy = 4;
}
message ApplianceVersion {
string id = 1;
string name = 2;
}
string appliance_id = 1;
string project_id = 2;
// …
}
Пример спецификации
И сам получившийся плагин protoc-gen-go-eventbus, и protoc у нас запечён в отдельный docker-образ. Так мы ушли от необходимости устанавливать protoc на компьютеры разработчиков и поддерживать актуальность его версии: актуальная используемая версия со всеми необходимыми плагинами и аннотациями есть в образе, доступном из внутреннего реджистри.
Hidden text
О подходе с запеканием инструментов в докер-образы вы можете почитать в моем блоге.
Автоматическая кодогенерация
Ориентируясь на данные из спецификации, автоматически создается сам экземпляр продьюсера. У всех у них есть метод Produce, который принимает первым аргументом ctx, и событие, которое было сгенерировано по protobuf с помощью спецификации выше. В таком виде событие отправляется от продьюсера в шину событий.
if err := s.applianceCreateEventProducer.Produce(ctx, applianceRequest); err != nil {
logging.LoggerFromContext(ctx).
With(zap.String("applianceID", applianceRequest.ApplianceId)).
Error("create appliance failed", zap.Error(&errCreateAppliance{err: err}))
return err
}
Пример вызова продьюсера
У консьюмера схожий с продьюсером API. При генерации инстанса консьюмера также создается и метод потребления. Первым аргументом принимается ctx, а вторым — функция, в которую будут подаваться полученные события.
На примере ниже обрабатываем событие создания кластера.
Если обработка прошла без ошибок, мы возвращаем nil.
grp.Go(func() error {
return applianceCreateClaimedEventConsumer.Consume(ctx, func(ctx context.Context, event *eventbus.ApplianceCreateClaimedEvent) error {
// do some work
return err
})
})
Пример вызова консьюмера
Если возвращаем с ошибкой, то это сообщение нужно будет вычитать еще раз.
Немного про мониторинг
Ни одно наше решение не работает без мониторинга. И шина событий — не исключение. Наиболее подходящим для нас паттерном снятия метрик в данной ситуации оказался RED (Rate, Errors, Duration). Все метрики снимаются автоматически внутри сгенерированного кода. Разработчику не нужно ничего писать самостоятельно.
А дальше снятые метрики проходят по всем известному механизму:
Попадают в endpoint/metrics сервиса;
Оттуда при очередном «скрейпе» попадают в VictoriaMetrics;
И оказываются на дашборде в grafana.
Для работы с grafana у нас тоже есть набор инструментов, позволяющий отлаживать в локальной среде на собственном ноутбуке дашборды, описанные с помощью grafonnet. Внутри — общедоступные инструменты с открытым исходным кодом, ну а мы предоставляем к ним простой и понятный интерфейс командной строки. Про это обязательно расскажу в одной из будущих статей.
Что в итоге имеем
При относительной простоте (разработка самого инструмента заняла 4 дня, а на внедрение ушел один спринт) шина данных приносит много пользы в моменте и в перспективе.
Низкий порог входа для разработчиков. Разработчику не нужно уметь в Kafka или другой брокер сообщений. Он даже может и не знать, что там внутри. У нас есть шина событий, в которую понятно, как отправлять события автоматически, без регистраций и СМС.
Универсальность. В этой схеме Kafka можно заменить на что угодно.
Защита от человеческой ошибки. Благодаря типизации схемы.
Упрощение модульных тестов. Типизированная спецификация позволяет нам воспроизводить различные сценарии (в том числе и с ошибками) с помощью моков. При этом мы не будем вызывать реальный код, а данные в шину не записываются.
Простой API. Разработчику не нужно долго разбираться, как этим всем пользоваться: описал, сгенерировал и вперед.
На этом пока всё. Подключайтесь к конструктивной дискуссии в комментариях. May the source be with you ????
Комментарии (10)
itecn
00.00.0000 00:00+3Добрый день! Спасибо за статью!
Разве отсутствие знания природы шины не приводит к тому, что разработчик ничего не знает о ее гарантиях, что в свою очередь может породить различные проблемы обработки: дубли, потеря ивентов и тд?
Также скрытие реализации шины может привести к тому, что разработчик не знает, как много сообщений консумер
pkorobeinikov Автор
00.00.0000 00:00+2Добрый день!
Разве отсутствие знания природы шины не приводит к тому, что разработчик ничего не знает о ее гарантиях, что в свою очередь может породить различные проблемы обработки: дубли, потеря ивентов и тд?
Давайте будем честны. Разработчик так или иначе знает, что под капотом в данный момент Kafka. И он так же знает про гарантии доставки. В статье об этом не сказано, но каждое сообщение снабжается ключом идемпотентности.
Основная же цель такого подхода — дать возможность общаться строго типизированными сообщениями, описывая их на привычном языке (protobuf), и снять с разработчика рутинные операции по написанию шаблонного кода. Это очень ускоряет как сам процесс реализации задачи, так и процесс ревью — вы просто игнорируете шаблонный сгенерированный код.
На всякий случай оставлю пару ссылок про идемпотентность для других наших читателей (они не имеют прямого отношения к шине событий, но дают хорошее представление о вопросе):
Также скрытие реализации шины может привести к тому, что разработчик не знает, как много сообщений консумер (...)
Ваше сообщение немного оборвалось, как мне показалось, но я отвечу Вам полностью, если дополните ниже.
aytugan
00.00.0000 00:00+1Спасибо за статью
Вы рассматривали вариант использования стандартной (для Кафки) сериализации AVRO и schema registry ?
Просто, если абстрагироваться от деталей в статье, мы имеем классическую ситуацию "14 competing standards" из xkcd.
pkorobeinikov Автор
00.00.0000 00:00+1Добрый день и спасибо за вопрос!
Вы рассматривали вариант использования стандартной (для Кафки) сериализации AVRO и schema registry ?
В какой-то момент я хотел положиться на schema registry, но если я не ошибаюсь, туда можно складывать только AVRO. Здесь я не возьмусь утверждать.
Ещё одним аргументом за формат protobuf, конечно же, является хорошее знание и опыт применения всеми разработчиками gRPC. То есть мы не добавили ещё один формат/язык/инструмент, а остались в рамках тех, что умеем готовить хорошо и применяем каждый день.
Просто, если абстрагироваться от деталей в статье, мы имеем классическую ситуацию "14 competing standards" из xkcd.
Нет-нет, речи ещё об одном стандарте не идёт и идти не может. Я бы назвал это скетчем, если угодно. Тезисный рассказ о том, как можно было бы поступить, если вам нужно в сжатые сроки и хорошо известными инструментами сохранить порядок и предсказуемость в вашей шине.
aytugan
00.00.0000 00:00Понятно
Получается статья о том, как перейти на очереди, сохранив корпоративный стандарт передачи данных (протобаф)
Тогда написание корпоративного "адаптера" для Кафки имеет смысл.
akurilov
00.00.0000 00:00А зачем структура payload? Это не ответственность шины, ответственность шины - передать метаданные и байты сообщения с определёнными гарантиями.
Вопрос гарантий доставки также не раскрыт. Что будет, если сообщение провалится ещё до условной кафки под капотом?
pkorobeinikov Автор
00.00.0000 00:00Добрый день!
А зачем структура payload? Это не ответственность шины, ответственность шины - передать метаданные и байты сообщения с определёнными гарантиями.
Если говорить на самом низком уровне, Вы совершенно правы, шина — это про гарантированную доставку.
А если подняться на уровень продукта, то схема данных выходит уже на первый план. Вам же важно не только что-то отправить и получить, а ещё и на принимающей стороне суметь однозначно понять, что именно вам пришло. И автоматически получить заполненную структуру данных для дальнейшей работы. Неужели вы не попадали в ситуацию, когда новое поле или измененный тип данных в API ломал вам сериализацию/маршализацию?
Вопрос гарантий доставки также не раскрыт. Что будет, если сообщение провалится ещё до условной кафки под капотом?
Возможно, я не понял или понял Ваш вопрос не правильно. Слабо верится, что можно потерять сообщение до отправки в покрытой тестами и автоматически генерирующейся обертке. Если я не так понял, дайте, пожалуйста, больше контекста.
romanwb
Интересная статья, спасибо! Подскажи пожалуйста, а предусмотрена ли возможность, чтобы сообщения читались в строгом порядке на косюмере, для одной сущности?
Например в кафке это делается через partition key, в rabbitmq через x-modulus-hash.
pkorobeinikov Автор
Добрый день! Спасибо за вопрос!
Мы специально не полагались на возможность доставки сообщений в определенном порядке. Саму возможность добавить можно. Но я бы старался ещё на этапе проектирования не полагаться на строгую последовательность.
Ещё раз спасибо и надеюсь, получилось ответить на вопрос.
romanwb
Понял, спасибо:) Был случай, когда у нашей сущности было состояние и для стейт машины на другом микросервисе был важен порядок.