Привет!

Меня зовут Петр Коробейников, я техлид команды DBaaS for Redis в #CloudMTS.
Некоторое время назад я озадачился созданием общего набора инструментов для наших команд разработки. Цель была проста: разработчик не тратит время на погружение в логику работы конкретного инструмента, берет готовую инструкцию и просто делает свое дело — пишет код. Типовое окружение поможет переходить ребятам из команды в команду и быстро адаптироваться, а новичку — проще приступить к работе.

Сегодня я хочу рассказать про один из элементов такого типового окружения, который позволяет быстро начать работу с брокерами сообщений. Даже если разработчик Kafka и прочие брокеры до этого в глаза не видел. Речь пойдет о шине данных или событий (EventBus) и про то, как мы настроили ее кодогенерацию.

Что собой представляет шина данных и зачем она нужна?

В своей работе мы имеем дело с микросервисной архитектурой. Для асинхронного общения между сервисами мы используем шину событий. В данный момент под ее капотом Kafka (про нее писали здесь), но на ее месте может быть NSQ, Pulsar или даже Pub/Sub в Redis. В этом и суть, что разработчику для настройки обмена сообщениями не будет важно, что там внутри.

В целом схема работает следующим образом:

  • Разработчик описывает сообщение/событие по строгой protobuf-спецификации. Он прописывает, что, откуда и кому должно быть отправлено. Если разработчик решит написать что-то не по форме, то код просто не скомпилируется.

  • На основе созданной спецификации разработчик запускает автоматическую кодогенерацию кода для продьюсеров и консьюмеров. 

  • Разработчик не ломает голову, как оформить сообщение, и не тратит время на ручное создание продьюсеров и косьюмеров. Профит!

Теперь поговорим обо всем подробнее.

Спецификация

Строгую спецификацию мы создали на основе Protocol Buffers (protobuf). Хотя с этой технологией мы работаем давно, в основном в контексте gRPC, мы все же решили посмотреть на другие варианты.

Кратко расскажу, почему не выбрали инструменты ниже:

AsyncAPI. Достойная альтернатива, которая позволяет использовать любые средства доставки. Но спецификацию пришлось бы писать в виде развесистого yaml-файла, очень близкую к спецификации OpenAPI (Swagger). Это очередная история про программирование на yaml, которая всегда вызывает определенную неприязнь у разработчиков. Существуют инструменты, аналогичные Swagger, которые умеют генерировать спецификацию уже по написанному коду. Но это тоже большая боль — любой сгенерированный, например, по аннотациям swagger-файл всегда неточен. Мы же используем принцип — specification first.

Помимо этого, по субъективным ощущениям, документация у AsyncAPI неполная и отстает от реальности, хотя сообщество разработчиков достаточно часто проводит открытые встречи. Их записи можно посмотреть на официальном канале.

Cloudevents. Также поддерживает множество способов передачи сообщений: AMQP (RabbitMQ), Kafka, MQTT, NATS  и прочее.

Основная проблема — это отсутствие схемы самого сообщения, схемы полезной нагрузки (payload): по умолчанию данные могут быть записаны в произвольном формате без возможности специфицировать структуру. В целом Cloudevents больше про то, кто продьюсер, кому нужно доставить сообщение и когда оно было отправлено. 

oneof data {

    // Binary data
    bytes binary_data = 2;

    // String data
    string text_data = 3;

    // Protobuf Message data
    google.protobuf.Any proto_data = 4;
}

Источник

Кроме того, показалось, что порог входа в Cloudevents достаточно высокий. А по опыту адаптации в прошлом стало понятно, что и тогда, и сейчас это оверкилл.

В итоге мы написали отдельный плагин на go для protoc (protoc-gen-go-eventbus).
Плагин поддерживает опцию, в которой мы задаем топик для событий, место, куда события будут публиковаться и откуда читатели будут вычитывать его.

Для формирования имени топика мы используем следующий формат: сначала идет namespace проекта, потом название проекта и сущность или действие, которое выполняется.

syntax = "proto3";


package eventbus;


option go_package = "./;eventbus";


import "cloud.mts.ru/protobuf/eventbus/annotation.proto";
import "dbaas-redis.redis-hub-gateway.types.proto";


message ApplianceCreateClaimedEvent {
 option (cloud.mts.ru.protobuf.eventbus.topic) = "dbaas-redis.redis-hub-gateway.appliance-create-claimed";


 message Resource {
   int32 total_cpu = 1;
   IntegerUnit total_ram = 2;
   IntegerUnit disk_size = 3;
   string disk_policy = 4;
 }


 message ApplianceVersion {
   string id = 1;
   string name = 2;
 }


 string appliance_id = 1;
 string project_id = 2;
 // …
}

Пример спецификации

И сам получившийся плагин protoc-gen-go-eventbus, и protoc у нас запечён в отдельный docker-образ. Так мы ушли от необходимости устанавливать protoc на компьютеры разработчиков и поддерживать актуальность его версии: актуальная используемая версия со всеми необходимыми плагинами и аннотациями есть в образе, доступном из внутреннего реджистри.

Hidden text

О подходе с запеканием инструментов в докер-образы вы можете почитать в моем блоге.

Автоматическая кодогенерация

Ориентируясь на данные из спецификации, автоматически создается сам экземпляр продьюсера. У всех у них есть метод Produce, который принимает первым аргументом ctx, и событие, которое было сгенерировано по protobuf с помощью спецификации выше. В таком виде событие отправляется от продьюсера в шину событий.

if err := s.applianceCreateEventProducer.Produce(ctx, applianceRequest); err != nil {
  logging.LoggerFromContext(ctx).
     With(zap.String("applianceID", applianceRequest.ApplianceId)).
     Error("create appliance failed", zap.Error(&errCreateAppliance{err: err}))


  return err
}

Пример вызова продьюсера

У консьюмера схожий с продьюсером API. При генерации инстанса консьюмера также создается и метод потребления. Первым аргументом принимается ctx, а вторым — функция, в которую будут подаваться полученные события.
На примере ниже обрабатываем событие создания кластера.

Если обработка прошла без ошибок, мы возвращаем nil.

grp.Go(func() error {
  return applianceCreateClaimedEventConsumer.Consume(ctx, func(ctx context.Context, event *eventbus.ApplianceCreateClaimedEvent) error {
     // do some work
     return err
  })
})

Пример вызова консьюмера

Если возвращаем с ошибкой, то это сообщение нужно будет вычитать еще раз.

Немного про мониторинг

Ни одно наше решение не работает без мониторинга. И шина событий — не исключение. Наиболее подходящим для нас паттерном снятия метрик в данной ситуации оказался RED (Rate, Errors, Duration). Все метрики снимаются автоматически внутри сгенерированного кода. Разработчику не нужно ничего писать самостоятельно.

А дальше снятые метрики проходят по всем известному механизму:

  1. Попадают в endpoint/metrics сервиса;

  2. Оттуда при очередном «скрейпе» попадают в VictoriaMetrics;

  3. И оказываются на дашборде в grafana.

Для работы с grafana у нас тоже есть набор инструментов, позволяющий отлаживать в локальной среде на собственном ноутбуке дашборды, описанные с помощью grafonnet. Внутри — общедоступные инструменты с открытым исходным кодом, ну а мы предоставляем к ним простой и понятный интерфейс командной строки. Про это обязательно расскажу в одной из будущих статей.

Что в итоге имеем

При относительной простоте (разработка самого инструмента заняла 4 дня, а на внедрение ушел один спринт) шина данных приносит много пользы в моменте и в перспективе.

  • Низкий порог входа для разработчиков. Разработчику не нужно уметь в Kafka или другой брокер сообщений. Он даже может и не знать, что там внутри. У нас есть шина событий, в которую понятно, как отправлять события автоматически, без регистраций и СМС. 

  • Универсальность. В этой схеме Kafka можно заменить на что угодно.

  • Защита от человеческой ошибки. Благодаря типизации схемы.

  • Упрощение модульных тестов. Типизированная спецификация позволяет нам воспроизводить различные сценарии (в том числе и с ошибками) с помощью моков. При этом мы не будем вызывать реальный код, а данные в шину не записываются.  

  • Простой API. Разработчику не нужно долго разбираться, как этим всем пользоваться: описал, сгенерировал и вперед.

На этом пока всё. Подключайтесь к конструктивной дискуссии в комментариях. May the source be with you ????

Комментарии (10)


  1. romanwb
    00.00.0000 00:00
    +1

    Интересная статья, спасибо! Подскажи пожалуйста, а предусмотрена ли возможность, чтобы сообщения читались в строгом порядке на косюмере, для одной сущности?
    Например в кафке это делается через partition key, в rabbitmq через x-modulus-hash.


    1. pkorobeinikov Автор
      00.00.0000 00:00
      +1

      Добрый день! Спасибо за вопрос!

      Мы специально не полагались на возможность доставки сообщений в определенном порядке. Саму возможность добавить можно. Но я бы старался ещё на этапе проектирования не полагаться на строгую последовательность.

      Ещё раз спасибо и надеюсь, получилось ответить на вопрос.


      1. romanwb
        00.00.0000 00:00

        Понял, спасибо:) Был случай, когда у нашей сущности было состояние и для стейт машины на другом микросервисе был важен порядок.


  1. itecn
    00.00.0000 00:00
    +3

    Добрый день! Спасибо за статью!

    Разве отсутствие знания природы шины не приводит к тому, что разработчик ничего не знает о ее гарантиях, что в свою очередь может породить различные проблемы обработки: дубли, потеря ивентов и тд?

    Также скрытие реализации шины может привести к тому, что разработчик не знает, как много сообщений консумер


    1. pkorobeinikov Автор
      00.00.0000 00:00
      +2

      Добрый день!

      Разве отсутствие знания природы шины не приводит к тому, что разработчик ничего не знает о ее гарантиях, что в свою очередь может породить различные проблемы обработки: дубли, потеря ивентов и тд?

      Давайте будем честны. Разработчик так или иначе знает, что под капотом в данный момент Kafka. И он так же знает про гарантии доставки. В статье об этом не сказано, но каждое сообщение снабжается ключом идемпотентности.

      Основная же цель такого подхода — дать возможность общаться строго типизированными сообщениями, описывая их на привычном языке (protobuf), и снять с разработчика рутинные операции по написанию шаблонного кода. Это очень ускоряет как сам процесс реализации задачи, так и процесс ревью — вы просто игнорируете шаблонный сгенерированный код.

      На всякий случай оставлю пару ссылок про идемпотентность для других наших читателей (они не имеют прямого отношения к шине событий, но дают хорошее представление о вопросе):

      Также скрытие реализации шины может привести к тому, что разработчик не знает, как много сообщений консумер (...)

      Ваше сообщение немного оборвалось, как мне показалось, но я отвечу Вам полностью, если дополните ниже.


    1. aytugan
      00.00.0000 00:00
      +1

      Спасибо за статью

      Вы рассматривали вариант использования стандартной (для Кафки) сериализации AVRO и schema registry ?

      Просто, если абстрагироваться от деталей в статье, мы имеем классическую ситуацию "14 competing standards" из xkcd.


      1. pkorobeinikov Автор
        00.00.0000 00:00
        +1

        Добрый день и спасибо за вопрос!

        Вы рассматривали вариант использования стандартной (для Кафки) сериализации AVRO и schema registry ?

        В какой-то момент я хотел положиться на schema registry, но если я не ошибаюсь, туда можно складывать только AVRO. Здесь я не возьмусь утверждать.

        Ещё одним аргументом за формат protobuf, конечно же, является хорошее знание и опыт применения всеми разработчиками gRPC. То есть мы не добавили ещё один формат/язык/инструмент, а остались в рамках тех, что умеем готовить хорошо и применяем каждый день.

        Просто, если абстрагироваться от деталей в статье, мы имеем классическую ситуацию "14 competing standards" из xkcd.

        Нет-нет, речи ещё об одном стандарте не идёт и идти не может. Я бы назвал это скетчем, если угодно. Тезисный рассказ о том, как можно было бы поступить, если вам нужно в сжатые сроки и хорошо известными инструментами сохранить порядок и предсказуемость в вашей шине.


        1. aytugan
          00.00.0000 00:00

          Понятно

          Получается статья о том, как перейти на очереди, сохранив корпоративный стандарт передачи данных (протобаф)

          Тогда написание корпоративного "адаптера" для Кафки имеет смысл.


  1. akurilov
    00.00.0000 00:00

    А зачем структура payload? Это не ответственность шины, ответственность шины - передать метаданные и байты сообщения с определёнными гарантиями.

    Вопрос гарантий доставки также не раскрыт. Что будет, если сообщение провалится ещё до условной кафки под капотом?


    1. pkorobeinikov Автор
      00.00.0000 00:00

      Добрый день!

      А зачем структура payload? Это не ответственность шины, ответственность шины - передать метаданные и байты сообщения с определёнными гарантиями.

      Если говорить на самом низком уровне, Вы совершенно правы, шина — это про гарантированную доставку.

      А если подняться на уровень продукта, то схема данных выходит уже на первый план. Вам же важно не только что-то отправить и получить, а ещё и на принимающей стороне суметь однозначно понять, что именно вам пришло. И автоматически получить заполненную структуру данных для дальнейшей работы. Неужели вы не попадали в ситуацию, когда новое поле или измененный тип данных в API ломал вам сериализацию/маршализацию?

      Вопрос гарантий доставки также не раскрыт. Что будет, если сообщение провалится ещё до условной кафки под капотом?

      Возможно, я не понял или понял Ваш вопрос не правильно. Слабо верится, что можно потерять сообщение до отправки в покрытой тестами и автоматически генерирующейся обертке. Если я не так понял, дайте, пожалуйста, больше контекста.