Привет, Хабр! Меня зовут Максим, я Go-разработчик из Wildberries. Свою дебютную статью я хочу посвятить довольно популярной теме, когда на проекте приходится уходить с одной технологии на другую. Данная статья будет полезна разработчикам, кто активно использует асинхронный способ передачи данных в своих проектах. Статья несет исключительно опыт автора.

В первую очередь заглянем поглубже во внутреннее устройство каждого брокера и особенности при работе с ним.

Nats

NATS — это высокопроизводительный брокер сообщений, написанный на Go. Он создан Дереком Коллисоном, за плечами которого более 20 лет работы над распределенными очередями сообщений. Для работы с сообщениями в NATS используется простой текстовый протокол в стиле публикации/подписки. Клиенты подключаются и взаимодействуют с сервером через обычный сокет TCP/IP, используя небольшой набор протокольных операций, которые заканчиваются новой строкой. В простой схеме это выглядит следующим образом.

Так как мы пишем на Go, нас не может не радовать та новость, что NATS написан на Go. Ко всему прочему в NATS в отличие от Kafka (о которой будет идти речь позже) поддерживает параллельную обработку сообщений "из коробки", но детали реализации и степень параллелизма зависят от выбранного режима подписки, настроек и логики приложения.

Клиентская библиотека NATS (например, NATS Go) не накладывает ограничений на то, как сообщения обрабатываются в коде. Вы можете запускать обработку сообщений в отдельных горутинах для повышения параллелизма.

Одна из важных конфигураций, которая позволяет быстро обрабатывать сообщений это MaxInflight. MaxInflight — это одна из настроек в библиотеке NATS Go, которая используется для управления потоковыми данными при подписке на темы в NATS (распределённой системе обмена сообщениями). Она задаёт максимальное количество неподтверждённых сообщений (то есть сообщений, полученных подписчиком, но ещё не подтверждённых), которые сервер может отправить подписчику.

Из этого вытекает следующая особенность натса, отличающая его от кафки. Acknowledgment system - система подтверждений в NATS используется для обеспечения надежной доставки сообщений. Это позволяет гарантировать, что каждое сообщение будет обработано, даже если подписчик временно недоступен или сбойнул. Таким образом нам нет необходимости сдвигать оффсеты как в Apache Kafka, что делает NATS удобным при обработки сообщений, нам не нужно задумываться, когда сдвигать offset, мы просто ack'аем его.##

Как работает система Ack?

  1. Доставка сообщения:

    • Сервер отправляет сообщение подписчику.

    • Если установлен AckWaitTimeout, сервер начинает отсчитывать таймер.

  2. Ожидание подтверждения:

    • Подписчик обрабатывает сообщение и подтверждает его вызовом msg.Ack().

  3. Обработка подтверждения:

    • Если подтверждение получено, сообщение удаляется из журнала (stream) или метки невыполненных сообщений.

  4. Повторная доставка:

    • Если подтверждение не получено, сервер повторно отправляет сообщение, либо подписчику, либо другому клиенту в группе подписки.

Kafka

Наконец, перейдем к технологии Apache Kafka. Kafka, изначально разработанная в 2011 году в компании LinkedIn, на сегодняшний день обеспечивает высокую надежность и масштабируемость, позволяя хранить огромные объемы данных. Kafka предоставляет высокопроизводительную шину сообщений, которая позволяет обрабатывать все проходящие через нее данные в реальном времени, даже при экстремально больших нагрузках.

Простейшая схема работы Kafka показана на картинке ниже.

В целом уже по картинке можно понять один нюанс работы с Kafka - очередность считывания сообщений. Для нас было важно, чтобы все полученные сообщения обрабатывались в том же порядке, в котором они были созданы, а Kafka это гарантирует только внутри одного брокера. Также немаловажной проблемой для нас стала невозможность реализации из коробки параллельного чтения, такое чтение ограничивалось количеством созданных партиций внутри Kafka.

Поставленная задача

После чтения из топика или сабджекта необходимо выполнить достаточно медленный сетевой запрос. При этом запрос не всегда проходит успешно и может возвращать ошибку 500, что связано с особенностями области, в которой я работаю.

При работе с NATS о смещениях (оффсетах) задумываться не пришлось. Мы просто настроили MaxInflight 200 и пошли выполнять операции в 200 горутини запустили обработку в 200 горутинах. Каждая из них самостоятельно решает, ack'ать или нет.

Работа с Kafka оказалась сложнее, чем ожидалось. Например, создать 200 партиций сразу оказалось невозможным. Кроме того, нельзя сместить оффсет, пока запрос не обработан корректно, иначе сообщение будет перезаписано и потеряно. Однако и слишком долго задерживаться на обработке сообщения нельзя, так как это может привести к затягиванию процесса или полной остановке, особенно если сообщение было отправлено с ошибкой.

Покопавшись в интернете я выделил два решения, которые больше всего для меня подходили.

Retry topic

Что происходит, если условия, необходимые для обработки события, недоступны, когда приложение пытается его обработать? Например, представьте приложение, которое обрабатывает запросы на покупку товаров. Цена товара может быть рассчитана другим приложением и может отсутствовать в момент получения запроса.

Добавление топика повторных попыток дает возможность обработать большинство событий сразу, а другие события откладываются до тех пор, пока не будут выполнены необходимые условия. В приведенном примере, если цена товара недоступна, событие направляется в тему повторных попыток. Это восстановимое условие не следует рассматривать как ошибку, а нужно периодически пытаться обработать событие до тех пор, пока условия не будут выполнены.

Это упрощенная схема взаимодействия с данной проблемой. Мы читаем сообщения, при получении ошибки, отправляем его в Retry Topic, из которого будет читать соответсвующий бизнес-сервис из сервисного слоя приложения.

Такое решение принимается, но все еще не решен вопрос упорядоченной обработки сообщений, также не совсем удобно отслеживать обработанные и необработанные сообщения, но здесь можно и обойтись логами.

Transactional Inbox

И вот решение, которое мне показалось идеальным в данной ситуации. Паттерн микросервисов Inbox. На первом шаге мы создаем таблицу, которая будет служить в качестве «входящего ящика» для наших сообщений. После того как новое сообщение поступает, мы не начинаем его обработку сразу, а лишь вставляем сообщение в таблицу и сдвигаем offset в Kafka. При правильной настройке таблицы, запросов в БД и обработки сообщений консьюмером, такая реализация с 99% вероятностью будет стабильно читать и записывать данные в нашу таблиц inbox. Затем фоновый процесс по мере удобства извлекает записи из инбокса и запускает их обработку. По завершении работы, если наше событие обработано успешно, то происходит commit и запись удаляется, в противном случае, сообщение будет помечено ошибкой и в таблице бд обновится запись.

Пропускную способность можно улучшить за счет увеличения параллелизма. Когда несколько воркеров одновременно читают инбокс, важно не забывать блокировать строки, которые уже были забраны другими воркерами, данную логику можно воспроизвести благодаря locked_until атрибуту.

Паттерн Inbox может быть очень полезен, когда важен порядок сообщений. Иногда порядок гарантируется системой обмена сообщениями (например, Kafka с включенной конфигурацией идемпотентности), но это не всегда так для каждого брокера. Точно так же HTTP-запросы могут перемешиваться, если клиент не отправляет их по порядку в одном потоке.

К счастью, если сообщения содержат монотонно увеличивающийся идентификатор, порядок можно восстановить в процессе обработки, когда воркер читает данные из инбокса. Он может обнаружить отсутствующие сообщения, подождать их поступления и затем обработать их в правильной последовательности.

Как я внедрял Inbox

Для начала определимся с сущностями. Сущность, с которой будет работать consumer и inbox worker назовем Event.

type Event struct {
  	ID        int64     `db:"id"`
	Payload   []byte    `db:"payload"`
	CreatedAt time.Time `db:"created_at"`
	Success   bool      `db:"-"`
	Err       string    `db:"err"`
}

Эта сущность будет обозначать событие, пришедшие к нам из брокера.

Далее консьюмеру необходимо обработать сообщение из Kafka и положить его в инбокс-таблицу. Для этого я написал следующий код.

type EntityHandler struct {
	logger  *slog.Logger
	service *api_service.ApiService
}

func (h *EntityHandler) Receive(ctx context.Context, events ...cloudevents.Event) error {
	entities, err := eventToEntity(events...)
	if err != nil {
		return err
	}

	err = h.service.SaveEntities(ctx, entities)
	if err != nil {
		return err
	}

	return nil
}

Переменные и структуры названы entity это дженерик название, чтобы для необходимых доменов можно было подменить имя entity на требующее название для бизнес-логики приложения. После записи сущностей в инбокс, начинается основной процесс их обработки.

Процесс обработки в общем случае будет выглядеть следующим образом.

type EventProcessor struct {
	logger  *slog.Logger
	name    string
	handler Handler
}

type Handler interface {
	Get(ctx context.Context) ([]Event, error)
	Process(ctx context.Context, events []Event) (processedEvents []Event, err error)
	Complete(ctx context.Context, events []Event) error
	CompleteFailed(ctx context.Context, events []Event) error
}

func (p EventProcessor) Start(ctx context.Context) error {
	events, err := p.handler.Get(ctx)
	if err != nil {
		return err
	}

	if len(events) == 0 {
		return ErrPause
	}

	events, err = p.handler.Process(ctx, events)
	if err != nil {
		p.logger.Error(
			fmt.Sprintf("[%s] failed to process event", p.name),
			slog.Any("err", err),
		)
	}

	processedEvents := make([]Event, 0)
	failedEvents := make([]Event, 0)

	for _, event := range events {
		if event.Success {
			processedEvents = append(processedEvents, event)

			continue
		}

		failedEvents = append(failedEvents, event)
	}

	if len(processedEvents) != 0 {
		err = p.handler.Complete(ctx, processedEvents)
		if err != nil {
			return err
		}
	}

	if len(failedEvents) != 0 {
		err = p.handler.CompleteFailed(ctx, failedEvents)
		if err != nil {
			return err
		}
	}

	return nil
}

Вся суть этого подхода заключается в том, что независимо от используемой технологии межсервисного взаимодействия, мы можем гибко добавлять воркеров в метод Process по мере необходимости, а также использовать систему ack для удобного коммита как успешно обработанных, так и ошибочных сообщений. Для дополнительного ускорения чтения из Kafka можно реализовать чтение сообщений батчами, что позволит снизить количество сетевых вызовов для получения данных.

Вывод

В процессе перехода с NATS на Kafka, я столкнулся с рядом трудностей, связанных с особенностями работы с разными брокерами, но, к счастью, решение не заставило себя долго ждать. Использование таблицы Inbox, с одной стороны, позволяет гибко обрабатывать сообщения в порядке их поступления, а с другой — предоставляет механизмы для гарантированной доставки и обработки сообщений (at least once) с учетом возможных ошибок или временных задержек.

Этот подход дает несколько преимуществ:

  1. Гибкость и масштабируемость: Мы можем настроить количество воркеров, параллельность обработки и условия для retry, что позволяет эффективно управлять нагрузкой.

  2. Управление порядком сообщений: В случае с важностью порядка сообщений, мы можем гарантировать правильную последовательность их обработки, несмотря на возможные сбои.

  3. Управление ошибками: Благодаря механизму ack и retry, система становится устойчивой к временным ошибкам, не теряя данных.

Комментарии (11)


  1. ababo
    26.12.2024 18:20

    А какова была мотивация/целеполагание в переходе с NATS на Kafka?


    1. mkorobovv Автор
      26.12.2024 18:20

      На самом деле, основная причина тому была частые сбои сервера NATS, что заставляло пользователей ждать, когда заново поднимут кластеры, с Kafka почему-то таких проблем не наблюдалось.


      1. savostin
        26.12.2024 18:20

        Странная мотивация если честно. Может кластер из 3+ решил все проблемы? Ну или таки посмотреть логи почему они падают…


        1. mkorobovv Автор
          26.12.2024 18:20

          Я бы с радостью остался на NATS, но, к сожалению, это была вынужденная мера, на которую бэкэнд разработчики никак повлиять не могли.


  1. dilmuradov
    26.12.2024 18:20

    А если у тебя сервис паралельно будет работать в двух инстансах. У тебя должно быть какая-то распредельенный блокер чтобы воркеры параллельно не обрабатывали одну и ту же данные.


    1. mkorobovv Автор
      26.12.2024 18:20

      Верно подмечено, в статье я указал на то, что в таблице inbox необходимо добавить поле locked_until, которое лочит наше сообщение до какого-то времени. Процесс чтения записей и лок происходит в одной транзакции, поэтому постгрес не даст достать одни и те же записи из разных инстансов.

      UPD: даже если такое как-то произойдет, замысел инбокса это обработка сообщений at least once. Таким образом, не страшно, если запись дважды обработается.


      1. sanzhi
        26.12.2024 18:20

        Есть способ для параллельного разгребания таблицы inbox несколькими инстансами без дополнительных полей. Каждый инстанс должен брать записи из таблицы с помощью select for update skip locked.


  1. alexanderfedyukov
    26.12.2024 18:20

    База данных не стала бутылочным горлышком после реализации Inbox? Если не стала, то может и NATS\Kafka не нужнs для вашей задачи и всю обработку можно организовать на уровне бд?


    1. mkorobovv Автор
      26.12.2024 18:20

      Не стала, в данной теме я раскрыл только реализацию самой задачи на уровне микросервиса. Если говорить про общую архитектуру, то на топик кафки, помимо рассматриваемого микросервиса, еще могут быть подписаны другие сервисы, поэтому удачным решением было бы здесь использовать брокера, чтобы не стучаться в каждый сервис по HTTP.


  1. fuCtor
    26.12.2024 18:20

    Судя по тексту, вы использовали не чистый NATS, а его компонент JetStream.

    Почему Kafka? Почему не отказались от JetStream и остались на Nats + DB?


    1. mkorobovv Автор
      26.12.2024 18:20

      На проекте использовался NATS Streaming. Как описал в комментарии на похожий вопрос выше, проблема была в частых сбоях кластеров NATS. Инициатива была не наша, поэтому полностью раскрыть причину не получится. В статье я лишь привел пример и сравнил работу с брокерами Kafka и NATS Streaming.