Всем привет. Меня зовут Женя, я работаю, как это сейчас модно говорить, DevOps-инженером в компании Garage Eight.
В этой статье я бы хотел поделится опытом внедрения и эксплуатации брокера сообщений NATS в нашей инфраструктуре — настолько понравилась эта технология.
Давным-давно, когда вышел последний сезон сериала «Игра престолов», перед нами в компании Garage Eight встала задача максимально быстрой доставки сообщений из пункта A в пункт B. Условия были таковы:
перед тем как сообщение достигнет пункта назначения, оно проходит ряд сервисов;
каждый сервис выполняет определенный набор действий над этим сообщением;
время прохождения сообщений через весь путь должно быть минимальным;
брокер сообщений должен работать стабильно 24\7.
Также желательными требованиями были простота написания кода для работы с брокером сообщений и простота обслуживания брокера.
Поиски решения
Изначально в качестве шины данных хотели взять RabbitMQ, поскольку мы уже с ним работаем. Однако не раз и не два случалось так, что RabbitMQ по своим внутренним причинам (в частности, из-за переполнения очередей) переставал работать. Это сильно влияло как на сервисы, которые с ним работают, так и на людей, которые работают с этими сервисами (думаю, что все, кто работает с RabbitMQ в проде, сталкивались с подобным). Это одна из основных причин, почему мы отказались от использования «кролика». Также были сомнения относительно скорости доставки, что стало еще одним аргументом в пользу поиска другого брокера сообщений (тут должна быть статья про бенчмарки различных брокеров сообщений).
Мы смотрели в сторону брокера Kafka, и это вроде как хорошая штука, но по итогу мы отказались тащить решение, написанное на Java.
Походив по интернету, наткнулись на брокер сообщений NATS. В нем нас подкупил написанный на Go бинарный файл, который можно достаточно легко развернуть в инфраструктуре и писать сервисы для него на очень большом количестве языков (есть даже реализация для Arduino :) ). Но поскольку нашим основным ЯП является Go, а NATS написан на Go, это тоже вошло в копилку аргументов за то, чтобы попробовать NATS. Более того, NATS позиционировался как высокопроизводительный брокер сообщений, и все эти факторы привели к тому, что мы решили его попробовать.
Немного про NATS
NATS — это высокопроизводительный брокер сообщений, написанный на Go. Он создан Дереком Коллисоном, за плечами которого более 20 лет работы над распределенными очередями сообщений. NATS используется широким списком крупных компаний в своей инфраструктуре.
Для работы с сообщениями в NATS используется простой текстовый протокол в стиле публикации/подписки. Клиенты подключаются и взаимодействуют с сервером через обычный сокет TCP/IP, используя небольшой набор протокольных операций, которые заканчиваются новой строкой.
Для роутинга сообщений используются Subject-ы (мы называем их «топики»). Это что-то вроде связки exchange-queue в RabbitMQ. По сути сабжект — это просто строка, которую используют и тот, кто пишет сообщения с одной стороны, и тот, кто читает сообщения с другой стороны. Сабжекты могут иметь иерархическую структуру, которая разделяется символом “.”. Читатели при этом могут читать из многих сабжектов, используя символы “” или “>”.
“” означает читать из всех текущих сабжектов в иерархии, а “>” — из текущего и всех последующих.
Все это отлично описано в документации.
Как мы приготовили Nats
Поскольку в то время, когда мы внедряли NATS, кластерная версия была немного сыровата, мы решили готовить standalone вариант без подтверждения доставки (в целом допускалась потеря 1-3 % сообщений).
Чтобы внедрить NATS, была написана простейшая роль, которая наливает Docker-контейнер с брокером и монтирует туда конфиг.
Для написания сервисов мы пользовались примерами, которые описаны в официальной документации, и вот что у нас получилось:
Пример кода для чтения сообщений:
<code>
func StartReader(ctx context.Context) error {
var err error
bufferSize := 64
//вычитываем в нескольких горутинах
concurrentExecution := 5
//Nats name будет потом отображаться в json на страничке мониторинге
nc, err := nats.Connect("natsURI", nats.Name("natReaderConnectName"))
if err != nil {
return fmt.Errorf("could not connect to nats: %v", err)
}
natsChan := make(chan *nats.Msg, bufferSize)
defer nc.Close()
sub, err := nc.ChanSubscribe("natsReaderSubject", natsChan)
if err != nil {
return fmt.Errorf("could not subscribe to nats: %v", err)
}
defer func() {
_ = sub.Unsubscribe()
close(natsChan)
}()
errorChan := make(chan error, 1)
//Добавляем хэндлеры, которые будут отрабатывать в случае ошибки вычитки, дисконнектов, или закрытого коннекта
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
fmt.Printf("reader error handler %s\n", err.Error())
errorChan <- err
})
nc.SetDisconnectErrHandler(func(_ *nats.Conn, err error) {
fmt.Printf("reader disconnect error handler: %v", zap.Error(err))
})
nc.SetClosedHandler(func(_ *nats.Conn) {
fmt.Printf("reader close handler")
})
//вычитываем из созданного канала и делаем там свою логику (в этом примере просто выводим на экран)
for i := 0; i < concurrentExecution; i++ {
go func() {
for msg := range natsChan {
var data interface{}
err := json.Unmarshal(msg.Data, &data)
fmt.Println(data)
}
if err != nil {
errorChan <- err
}
}()
}
}
</code>
Пример кода, который пишет, и того проще:
<code>
func NatsWriter() error{
nc, err := nats.Connect("natsURI", nats.Name("NatsConnectName"))
var someMsg struct{
msg string
}
//топик, в который будем писать
subject := "nats_subject"
b, err := json.Marshal(someMsg)
if err != nil {
return err
}
err = nc.Publish(subject, b)
return err
}
</code>
Реализовав логику, попробовали запуститься, и у нас получилась следующая схема.
Есть сервис, который пишет в NATS, в топик awesomeService. Есть второй сервис, который читает из этого топика и делает над сообщением свою магию, затем пишет обратно в NATS, но уже в топик serviceA. Из этого топика уже вычитывают сообщения конечные потребители.
Для обеспечения отказоустойчивости мы просто запустили второй NATS и начали писать в него те же сообщения. Сейчас кластерная версия и NATS JetStream уже нормально работают, и их можно использовать в продакшене.
Запутистились, стартанули, заработало.
Постепенно схема разрасталась, сервисов, которые пишут и обрабатывают сообщения становилось больше, читалей, которые считывают сообщения, тоже становилось больше. При этом, помимо Go, используются C++ и иногда Python сервисы.
Сегодня схема выглядит следующим образом (фактически в нашей схеме намного больше сервисов, через которые проходят сообщения, но смысл сохраняется).
Разумеется, количество сообщений, передаваемых через NATS, тоже увеличилось
Стоит отметить, что достаточно большое количество сервисов, которые занимаются обработкой сообщений, находятся на том же сервере, где запущен сам NATS.
Время прохождения сообщений, один из наиболее критичных для нас факторов, составляет, в 99 перцентилей от 500 микросекунд до 2 миллисекунд. Весьма хороший показатель при наших потребностях.
В плане стабильности NATS тоже показал себя с хорошей стороны, но с одной, довольно болезненной проблемой, нам все-таки пришлось столкнуться. В том числе именно из-за этой проблемы у нас стала теряться некоторая часть сообщений, но об этом ниже.
Относительно потребляемых ресурсов за все время эксплуатации не было каких-то неожиданных факапов, из-за которых пришлось бы срочно что-то тюнить. Все в рамках прогнозируемых изменений.
При необходимости в NATS можно добавить авторизацию, как на уровне всего сервиса, так и на уровне отдельных топиков\пользователей. Мы в свое время это сделали, чтобы ограничить работу с топиками
Отдельным пунктом хотелось бы отметить мониторинг. По дефолту он доступен по HTTP в NATS на порту 8222.
В каждой из вкладок — статистика по выбранному пункту.
В standalone-версии, которую мы используем, чаще всего заходим во вкладку Сonnections, где показаны основные текущие коннекты, количество сообщений\байт, когда запущен и так далее.
{
"num_connections": 1,
"total": 1,
"offset": 0,
"limit": 1024,
"connections": [
{
"cid": 4,
"kind": "Client",
"type": "nats",
"ip": "192.168.1.1",
"port": 8085,
"start": "1970-01-01T06:32:17.920532298Z",
"last_activity": "1970-01-01T15:52:17.218785543Z",
"rtt": "78µs",
"uptime": "0d0h1m59s",
"idle": "0s",
"pending_bytes": 0,
"in_msgs": 146288056,
"out_msgs": 0,
"in_bytes": 14592708062,
"out_bytes": 0,
"subscriptions": 0,
"name": "some-awesome-client",
"lang": "go",
}
Но все же хочется мониторинг смотреть не в отдельном веб-интерфейсе, а в системе, с плюшками в виде алертов и графиков. Поскольку для наших сервисов мы используем прометей, нашелся telegraf exporter, с помощью которого снимаем метрики. Правда, этот экспортер снимает только общую информацию касательно количества текущих коннектов, slow_consumers(об этом немного ниже) и количества входящих\исходящих сообщений\байт (основные показатели, которые доступны на вкладке general).
Ложка дегтя
Это, наверное, не столько ложка дегтя, сколько архитектурная особенность NATS, но в свое время мы с ней немного потрепались. В чем суть.
Архитектура NATS направлена, в первую очередь, на стабильность работы самого сервера, а не отдельных его клиентов. Соответственно, если кто-то из читателей по каким-то причинам начинает читать медленнее, чем в сабжект пишутся данные, то такой клиент помечается как Slow consumer, а коннект дропается. Чтобы определять, через какой период определять клиент как slow consumer, в NATS используется параметр write_deadline (подробнее в документации). Я, кстати, так и нашел возможность конфигурирования этого параметра через размер буфера, а не через время «протухания» сообщения.
Нам такая реализация и была нужна в брокере, поскольку, конечно, не очень хорошо бы было, если какой-то из читателей отвалится, но куда критичнее было бы падение всего сервера NATS.
Неприятной же особенностью при такой архитектуре стало то, что если из очереди читают несколько клиентов и один из них начинает тормозить в вычитке (из-за сети, по причине «внутренних противоречий» или из-за чего-нибудь еще), то остальные клиенты, которые читают из этого сабжекта, тоже начинают тормозить при вычитке.
Почему это происходит? Для читателей в NATS реализован цикл, по которому он последовательно проходит и пушит сообщения вместо обработки каждого читателя в отдельной горутине. Подробности этого кода можно посмотреть в исходниках.
В результате мы просто разнесли по разным Subjects стабильных и нестабильных клиентов, чтобы они не аффектили друг друга. Пока что полет нормальный :)
Выводы
Как оказалось, иметь дело с NATS достаточно приятно. Неприхотливый, быстрый, удобный, весьма стабильный, этот брокер оказался хорошим решением для нашей задачи. За все время эксплуатации NATS (+- 3 года) он работает очень стабильно и предсказуемо (за исключением той ложки дегтя, которая была описана выше).
Мы, к сожалению, еще не пробовали работать с кластерной версией или с JetStream, который позволяет сохранять данные на диск. Возможно, там могут выстрелить свои баги\фичи, но это, наверное, будет уже совсем другая история.
Комментарии (25)
nano_e_t_4 Автор
06.01.2023 12:45+1да, спасибо что обратили внимание ) минимальным конечно же должно быть время. поправил в статье
ewolf
06.01.2023 13:38+1А какую топологию кластера натса вы в итоге используете?
Мы в своих внутренних исследованиях обнаружили нестабильность работы NATS при кластером сетапе в 5 нод и использовании Jetstream (один баг даже зарепортили). В частности это проявляется в нестабильном создании стримов, "отключении" консьюмеров.
Также нас не очень устроила скорость работы Jetstream: при 300-400k RPM латенси записи начинает очень сильно уступать кафке. Ну и с масштабированием стримов тоже есть вопросы.
Общий наш вывод в том, что nats без Jetstream хорош, но с ним - пока слишком молод и нестабилен.
nano_e_t_4 Автор
07.01.2023 00:51+1мы изначально стали standalone использовать, ну и оно до сих пор хорошо летит, что перестраивать на кластер или добавлять надежную доставку нет необходимости. Хотя поиграть с этими штуками хочется. Надеюсь что получится найти время, сделаю бенчмарки по nats\rabbit\kafka в разных режимах, сделаю еще одну статью с графаной и цифрами )
ну да, про jetstream хоть у них в доке написано что jetstream стильно модно молодежно, технология вроде как достаточно новая, багов еще не собравшая. но думаю через пару лет будет пушка ))
ewolf
07.01.2023 12:32Кластер же позволяет вам проводить работы на nats, пересетапливать сервера без простоя и т.д.
Как сейчас вы обеспечиваете бесперебойную работу?
pananton
07.01.2023 13:09Не автор, но вклинюсь) Если использовать натс просто как шину, без всякой перзистентности, то перезапуск тривиален, т.к. все сервера кластера равноправны. Естественно, клиенты должны быть сконфигурирована на реконнект автоматический, а в остальном - главное не прибейте все ноды кластера одновременно.
В случае с JetStream все скорее всего посложнее, так как там уже есть мастер нода и всякий Raft для консенсуса. Тут не подскажу.
Ну и есть один момент, по которому я создавал issue в их репозитории, но не помню, исправили они что-то или нет. Смотрите, клиенты обычно скофигурированы так, чтобы рандомно выбирать ноду кластер, куда коннектиться. Получается, что во время работы кластера у нас примерно по N клиентов на каждой ноде. Потом вы начинаете рестарт кластера. Останавливаете одну ноду, и все ее N клиентов переходят на 2 оставшиеся (предположим, что 3 ноды в кластере). Потом нода поднимается, на ней 0 клиентов, на других 1.5N. Думаю, идея понятна, когда вы закончите, у вас на одной ноде будет 0 клиентов, на другой 1/3, и еще на одной 2/3 (посчитайте сами). Но повторюсь, возможно, с этим уже что-то сделано.
nano_e_t_4 Автор
07.01.2023 18:54ну, мы дублируем просто как натс, так и сервисы которые с ним работают. грубо говоря дублируем весь узел системы. это позволяет не только с натс работы проводить но и с сервисами которые с ним взаимодействуют, что иногда нам тоже необходимо
ewolf
06.01.2023 13:42Также, как я понимаю, вам не требуется персистивность сообщений после публикации продюсером?
nano_e_t_4 Автор
07.01.2023 00:54нет, мы в целом были готовы мириться с потерей 3-5% сообщений (хотя думаю в итоге если бы такое было, то в итоге искали бы варианты решения), но поскольку nats показал себя хорошо и стабильно, то ничего добавлять не стали
ewolf
07.01.2023 12:33Но тут дело ведь не только в потере сообщений. Консьюмеры не получат ничего, что было опубликовано в момент, когда они были отключены: например, в следствии бага, а момент редеплоя или каких-либо других работ.
Это не является проблемой?
nano_e_t_4 Автор
07.01.2023 19:05есть критичные консумеры, которые если отвалятся, и будут теряться сообщений, то будет весьма и весьма плохо. но за все время они эксплуатации натс у нас они не падали или очень быстро переподключались) а в случае работ используем как раз дублирующий натс\сервисы
miksir
06.01.2023 16:55Интересно, т.е. любой пик сообщений выше запланированного приводит к отстрелу всех консьюмеров, ибо они переполняются и начинают тормозить?
nano_e_t_4 Автор
07.01.2023 01:01не всех, а только тех кто начинает тормозить. остальные как читали, так и продолжают читать.
Но нюанс заключается в том, если например у вас 5 топиков, и из каждого топика читают по 5 клиентов, один из которых начинает тормозить, то его натс в итог отстрелит. Но при этом этот, которые тормозит может влияет на вычитку остальных 4 клиентов, которые читают из того же топика. Это особенность реализации, мы им писали об этом, даже вроде реквест зарелизили (не помню уже если честно, давно было) но в итоге просто свою схему потюнили немного и оно норм работает
pananton
06.01.2023 17:09Только написал статью, где говорил про то, что про NATS ничего нет на хабре и вуаля) Спасибо)
pananton
06.01.2023 17:46Что касается натса, тоже выбирал его за его низкую latency, и тоже доволен. У нас используется суперкластер: один кластер из 3х нод в амазоне, один такой же в овх. Проблем особых не было, один раз сертификат какой-то только забыли обновить)
Мы пишем микросервисы в основном на плюсах, и с этим есть проблемы. У них есть только си либа, плюсовые только сторонние и скорее мертвы. Я писал свою билиотеку, мне важна была полная асинхронность, и с этим были проблемы. Их сишная либа дает асинхронность либо через libuv, либо через libevent. Обертку-то плюсовую я в итоге написал, добавил туда примитивы для удобной работы с асинхронными операциями, но это была боль, потому что их либа все равно внутри какие-то операции (например, flush) делает синхронно, а выясняется это только опытным путем, когда дедлок ловишь где-то. К счастью, это некоторым образом компенсируется тем, что натсовские разработчики хорошо идут на контакт, с их помощью эти проблемы я либо фиксил, либо они вносили изменения в либу.
nano_e_t_4 Автор
07.01.2023 01:19да, плюсовой любы у них почему то нету (( у нас есть микросервисы на плюсах, но в них видимо как раз сишная либа используется. До вашего коммента я был уверен что ее можно легко с плюсовый код встроить, но щас начал сомневаться ) поизучаю этот момент
pananton
07.01.2023 06:08Если вас устроят синхронные механизмы самой либы, ну или некий вариант полу-асинхронности, когда они просто новые сообщения в пул потоков кидают, то все будет ок. Мне важна была асинхронность на уровне сокетов, а для этого нужно ивент луп подключить. Ну и там были проблемы.
nano_e_t_4 Автор
07.01.2023 01:03ну, я тоже когда искал про nats в рунете в целом и на хабре в часности находил только по касательной, полноценной статьи не видел. Решил исправить сей недостаток, потому как технология как по мне очень интересная
CrzyDocTI
07.01.2023 03:36было бы неплохо описать особенности архитектуры, в том числе и в сравнении.
несколько лет назад смотрел в его сторону, но вот это:
"Поскольку NATS не записывает сообщения на диск, сервисы-получатели должны аккуратно завершать свою работу — вначале отписываться от новых сообщений, затем обрабатывать полученные ранее и только потом останавливать процесс. "не дало шанса попробовать из-за особенностей производства.
pananton
07.01.2023 06:18Да там особенностей изначально-то особо и не было) Он фундаментально очень прост: пришло сообщение, если кто-то слушает, отправил дальше. Если нет, сразу отбросил. Причем года полтора назад, они и никакой ошибки, что никто не слушает, не отправляли. Было проблематично организовывать request-reply, потому что если принимающая сторона отсутствовала, request по таймауту только завершался.
Сейчас есть у них движок jetstream, который превращает натс в а-ля кафку. Про него не расскажу, не использовал.
barloc
07.01.2023 18:16Очень специфичный сервис, по сути это замена пул модели на пуш для отгрузки данных из сервиса.
С кафкой даже рядом не сравнить. А про жаву и кафку - есть редпанда, 2 бинаря: клиент на плюсах и конфиг утилита на го.
pananton
07.01.2023 19:37А вот в этой статье сравнили, и там действительно натс и кафка рядом не валялись. Я думаю все-таки для определенных задач одно, для других другое)
barloc
07.01.2023 21:35Все верно, у каждого своя ниша. А сравнивать сервис, который не сохраняет данные на диск с распределенным логом - дичь какая-то :)
18741878
Тема - отличная. Но текст надо вычитывать. Прямо в начале статьи "скорость прохождения сообщений...должна быть минимальной". Шедевр, в копилку глупостей, одназначно. Либо время прохождения должно быть минимальным, либо, если уж речь о скорости, то она должна быть максимальной.
Я понимаю, праздники еще не закончились и нет повода не выпить :) Но это не оправдание такой невнимательности