Рано или поздно в нагруженном проекте возникает потребность в какой-то специализированной базе данных, кэше или ином хранилище. Причина такой потребности, как правило, погоня за производительностью, низким временем отклика или эффективностью хранения данных.


В своем докладе я расскажу о нашем опыте разработки и эксплуатации специализированной timeseries БД, в основе которой лежит Apache Kafka.




Представлюсь. У меня большой опыт в IT. Он был до какой-то поры админский, потом менеджерский, а сейчас и то и другое, да еще и разработческий.



Я хотел вам рассказать про Kafka, но говорить про Kafka в вакууме просто тупо, поэтому я вам буду рассказывать про то, что мы с ней делали и зачем.


  • Мы будем говорить про нашу Timeseries базу данных, о том, как у нас их было много, и мы захотели еще одну.
  • Так как мы изобретаем велосипед, надо ознакомиться с велосипедостроением.
  • И поговорим, как база данных делается.
  • Расскажу, почему для этого приделали Kafka.
  • И поделюсь нашими впечатлениями о Kafka. Она у нас год, определенные впечатления есть. И расскажу, что мы приобрели попутно.


Для того чтобы в контент погрузить, расскажу об Okmeter.


Okmeter – это штука, у которой есть цель найти проблему.


Для того чтобы найти проблему, нужен контент. Это контент метрики. Метрики собирает агент. Он их засовывает в какую-то платформу. И по этим метрикам считает алерты, т. е. считает сценарии проблемы, показывает, где сломалось.


И мы только за контентную часть берем деньги. А платформа вне зоны наших интересов. Мы ее поддерживаем только потому, что ничего готового взять не можем, к сожалению. И это вынужденная история.


Timeseries, точки и т. д. – это все очень весело, но с точки зрения бизнеса это вообще не интересно.



Платформа состоит у нас из двух кусков:


  • Это метаданные, описание того, какие метрики у нас есть, какие у них есть теги, когда мы их в последний раз видели, чтобы иметь возможность поискать, потом по этим тегам погруппировать.
  • Это metric store. Это то, где живут ключ метрики + timestamp + значение.


Лет 5 назад у нас была самая простая база данных на Cassandra, куда мы вставляли точки.


Вот метрики, вот Timestamp, вот значение.


Многие считают, что мы до сих пор так делаем. На самом деле это не работает.


Это просто, это дикая производительность с точки зрения записи, но это очень дорого в хранении, потому что Cassandra хранит на каждую запись еще и timestamp разрешение конфликтов. И это очень медленное чтение. Если нам нужно для одного графика прочитать 5 000 метрик, нам нужно сделать range-запросов в Cassandra. А это для нее очень тяжело. Cassandra – это write only storage.



  • Мы решили, что будем накапливать метрики. И только потом писать в Cassandra, т. е. делать это большими пачками. Мы сделали штуку, которая накапливает в памяти. Она называется chunked. Это одна метрика, в которой 240 точек с минутным разрешением.
  • Мы знаем интервал, нам уже не нужно хранить timestamp. В 240 по 8 байт floats.
  • И мы это можем даже пожать. Это жмется.
  • И можем записать это в Cassandra. Мы сделали так, что при каждой точке мы перезаписываем целиком весь chunk.


Так как хранить в памяти это не надежно, мы сделали такую штуку, чтобы не читать чанк при записи. Т. е. если у тебя этого чанка в памяти нет, то сделай новую версию, добавь туда точку и запиши. Он будет пустой, а при чтении мы все версии берем и мержим.



  • Получили мы из коробки от такого простого хода десятикратное сжатие по месту.
  • Мы ускорили чтение в несколько раз.
  • При этом мы увеличили нагрузку на запись в Cassandra. Т. е. все чанки были синхронизированы и начинались в одно время. Соответственно, в начале этот чанк пустой, он хорошо зажат, потом, когда начинает перезаписываться, у него растет размер, даже пожатого. И мы получаем нагрузку на диск, которую генерирует Cassandra.
  • Можно было бы уповать, что в Cassandra апдейты бесплатные, потому что они происходят в памяти, но на самом деле эти чанки пишутся еще и в CommitLog. CommitLog – это cassandra’ский WAL. И checkpoint происходит не только по размеру, но еще и потому что 8 гигабайтов CommitLog кончились, пора флашиться, вне зависимости оттого, что там все хорошо.
  • Соответственно, понадобилась память для того, чтобы эти чанки накапливать.
  • Мы тратим эту память, но из нее читать не можем, потому что пресловутые версии. Для того чтобы прочитать чанк, нужно достать все его версии, а их может быть в памяти ни на каком узле не быть. И мы пока оставили в таком виде.


  • Дальше реинкарнация всего этого. Мы просто укрупнили чанк, т. е. в один чанк мы начали класть не одну метрику, а много.
  • Критерием того, что это все можно положить в один большой blob является то, что эти метрики генерируются из одного места. Т. е. это requests из одного лога, посчитанные, но у них, допустим, разные статусы или гистограммы.
  • И они читаются рядышком все вместе, поэтому укрупнив всю эту историю, мы получили еще большее сжатие, потому что у нас большой массив данных из-за того, что там будут похожие байты.
  • И мы в несколько раз ускорили чтение, потому что, когда нам нужно прочитать 5 000 метрик, как правило, это означает, что только 1 000 таких чанков нужно за раз достать.
  • Иногда мы, увеличив что-то, точность выборки потеряли. Иногда мы читаем лишнее, но это небольшая проблема.

****


Мы решили, что мы хотим это исправить и хотим последние данные нескольких часов уметь хранить в памяти, при этом надежно.


Что подразумевается под «надежно»? Подразумевается, что это записано на диск, а лучше, на несколько дисков и то, что мы можем поднять и одновременно держать более-менее консистентные копии для того, чтобы переживать отказ железа и т. д.


Так как мы хотим попутно убрать наши версии, мы хотим оттуда читать. Мы хотим читать не из Cassandra все, как мы всегда читали, а хотим читать хвост из памяти. Более исторические в кавычках данные мы хотим читать из Cassandra.


И можем тогда убрать перезапись в Cassandra.



Соответственно, мы начали изобретать свой велосипед. И для того чтобы правильно изобрести велосипед, нужно понимать, как база данных работает.


В любой базе данных есть WAL. Он может называться REDO log, он может называться WAL в чистом виде, но условно это log, в который записываются все изменения для последующего проигрывания.


Попутно эти данные, когда происходит у нас изменения данных, изменяются в памяти базы данных. В MySQL это buffer pool, в Postgres это shared buffers, но смысл один и тот же, т. е. у вас в памяти есть снапшот самых актуальных данных, а также они есть в WAL. А в datafiles их пока еще нет, потому что их очень дорого писать туда синхронно, поэтому мы их запишем потом когда-нибудь.



И для того чтобы переживать crash, когда у нас в памяти данные есть, но мы их еще на datafiles не записали и нужен WAL. Соответственно, мы стартуем, мы понимаем до какой точки у нас все данные в datafile. И с этой точки мы знаем отметку в offset WAL, т. е. откуда нам данные надо проиграть, чтобы все восстановилось. И вся эта история в базе называется checkpoint.



В Postgres в чистом виде на WAL построена репликация. Мы весь поток изменений передаем на реплику. И реплика его у себя применяет. Получается, что у нее есть копия всех изменений. В MySQL это через отдельный log сделано.



Какая в базе самая простая схема, если говорить про распределенность? Есть некий primary. Мастером его теперь нельзя называть. И есть реплики, которые читают поток изменений и могут обслуживать чтение.


Очень важно, что между ними есть задержка. Это все происходит не синхронно. В какой-то момент мы клиента отпустили, успешно приняли его транзакцию, он на commit получил «Ok» и все хорошо, а на реплику эти данные еще не доехали. Это если мы говорим про асинхронную репликацию.


Соответственно, пишем мы только в primary. Есть процедура промоушена любой реплики в primary.


Читать можем с реплик, если не пугает lag. А если пугает, то читаем с primary.



Представим, что мы делаем нечто подобное, но у нас primary не умеет обслуживать чтение. Читать можно только с реплик. И там есть lag. И мы его измеряем, и с ним миримся.


Из этой концепции мы и будем строить нашу TSDB.



Мы вместо WAL берем Kafka. И считаем, что запись прошла, если она прошла на primary, т. е. в Kafka.


Все остальные изменения – это реплики. И это уже их проблема из Kafka посинкаться.



Многие говорят, что Kafka – это то, это се.


  • Для данной задачи я буду считать, что Kafka – это лог. Он, конечно, является логом. Это не брокер очередь. Очереди – это штука, на мой взгляд, несколько посложней.
  • Это надежный лог. Это штука, которая как-то реплицирована на несколько нод.
  • Продюсер – это тот, кто делает в этот лог запись.
  • Consumer из этого лога может читать. Consumer интерфейсно может читать:

  1. Либо с какой-то конкретной позиции, т. е. с offset 1, 2, 3 и до конца, и сидеть на потоке;
  2. Либо в Kafka есть механизм запоминать, где вы закончили читать. Это consumer groups и offset commit.


Лог в Kafka называется partition. И эти партиции объединены в некую логическую штуку, в топик, но топик нам практически никогда не интересен.


Как это работает надежно? Для каждого лога есть один primary, который обслуживает и чтение, и запись. И есть какое-то количество синхронных реплик.


Для того чтобы получить какие-то гарантии по отказоустойчивости, можем при записи в этот лог требовать: «Пожалуйста, отдай мне успешный ответ, если записалось на primary + 2 реплики», т. е. все гарантированно это записали.


Что еще есть в Kafka? Это шардинг. Вы можете создавать несколько партиций. Несколько consumers могут обрабатывать по одной партиции за раз. Соответственно, вы свою задачу распараллеливаете. Мы и все это применяют для того, чтобы не блочиться на одном потоке, а все это обрабатывать.


Шардинг влияет не только на гранулярность хранения, но еще и на масштабирование записи. Допустим, у вас есть лидер. А у каждой партиции свой лидер. Таким образом весь поток данных вы можете разбить на N кусков, и N количество лидеров будут его обслуживать.


Так и с точки зрения consumer. Если мы говорим о consumer group, то одну партицию будет читать один consumer или несколько. Вы можете запустить столько consumers, сколько у вас партиций.



Чтобы понимать, что внутри Kafka бегает в этом логе, я структуру из гошного клиента скопировал. Я выкинул оттуда все неинтересное.


Соответственно, есть топик. Это просто string. Есть ключ у каждого сообщения – это набор байт. И есть value – набор байт. Kafka особо не вникает, что вы в message гоняете.


Есть partition. Это номер партиции, в какой из логов в пределах топика вы записали.


Есть offset. Это смещение в конкретном логе, которое присваивается брокерам после того, как вы записываете сообщение.


И есть timestamp, по-моему, с версии 10 с чем-то. Брокер присваивает каждому логу время, в которое оно было записано, т. е. это брокерное время. У нас с этим есть небольшие проблемы, потому что в наших метриках есть 3 разных времени. Это брокерное время, агентское время и настоящее время по мнению consumer.



Как мы пишем? С записью все достаточно просто. Вы можете определить в какую конкретную партицию будет записано то или иное сообщение.


И мы делаем некий статический шардинг, т. е. у нас есть явно заданная функция, которая определяет, что вот эта пачка метрик с таким-то ключом будет записана в партицию номер 2. И все WAL этой метрики будут попадать ровно в партицию номер 2. Нам это важно для чтения, для delta locality, чтобы одна и та же метрика была в одном логе.


И в Kafka нужно всегда использовать watch write. Это сильно ускоряет запись. И тут ничего нет необычного.



Есть low level интерфейс, когда вы ничего не знаете ни про consumer groups, ни про offset. Вы можете сказать, что я хочу читать такую-то партицию в таком-то топике с offset № X.


Соответственно, есть опция читать с конца. Говорим, что дай мне с последнего, т. е. t – f без указания смещения. Или говорим, что я хочу читать с начала.


Но так как у нас в сообщениях есть timestamp, то Kafka позволяет вам запросить offset сообщения с таким-то timestamp. Она бисектом по своему логу ищет вам смещение, которое относится к now + 4 часа. И это важно, мы именно это используем.


Но если вы хотите использовать ее как очереди и не париться про все эти consumers, есть высокоуровневый интерфейс consumer group. Вы говорите, что я хочу читать такой-то топик. И много consumers делают ровно то же самое, и Kafka сама распределяет, кто и что читает, а также сама разрулит ситуацию, когда один из consumers умрет. Она его работу отдаст другому.


И вы можете периодически сказать, что я эту работу сделал, сохрани offset. И в следующий раз, когда вы подниметесь, она начнет вам отдавать сообщение ровно с того места, с которого вы закоммитили. В общем, все просто.



Возвращаемся к нашим метрикам. Мы используем low level интерфейс, т. е. мы в нашем, так называемом, in-memory storage эта штука будет наполняться из этого лога.


Она стартует пустая. Она знает, сколько там партиций. У Kafka можно спросить, сколько у нее партиций. Потом можно наплодить partition worker. Это workers, которые обрабатывают одну партицию, т. е. каждый instance memory storage обрабатывает одну партицию.


И все данные, которые он получил из одной партиции, он хранит в отдельной области памяти. Условно, это набор кольцевых буферов.



Когда стартует вот эта наша штука, она говорит Kafka: «Дай мне, пожалуйста, смещение для того, чтобы получить все метрики за 4 часа». Она дает смещение. У нее есть пустые буферы. И она начинает весь поток изменений проигрывать, пока не достигнет конца.


Соответственно, данные которые ей априори не интересны, т. е. те, у которых в message в Kafka timestamp нормальный, а внутри она понимает, что это какие-то старые метрики, которые агенты дослали, она их просто выкидывает, они ей не интересны.


Важный критерий в том, что не понятно насколько я в конце. Вот это надо сформулировать. Насколько я в конце можно определять двумя критериями:


  • У Kafka можно спросить HighWatermark для этой партиции, т. е. какой номер offset на момент моего запроса будет выдан следующему сообщению. Можно спросить и посмотреть последний обработанный offset. Таким образом мы поймем, сколько примерно в штуках сообщений до конца осталось, т. е. мы на момент запроса будем знать, что мы сейчас отстаем от хвоста на 1 000 штук.
  • И так как у нас есть timestamp, мы можем сравнивать брокерный timestamp. Получили очередное сообщение и смотрим, что оно датировано четырьмя часами назад, значит мы где-то в четырех часах от конца. И мы в нашей штуке умеем это все программно вычислять имеем ли мы право отвечать на этот запрос, достаточно ли мы сейчас синхронизированы. Если по аналогии с Postgres, то: «Какой у меня сейчас лаг относительно мастера, могу ли я эти запросы обрабатывать?».

И все эти штуки мы вывели на health check. Т. е. health check Kubernetes опрашивает ноду, а она ему говорит: «Нет, я еще не готова, я еще не налилась». Это значит, что соседние еще деплоить нельзя, гасить нельзя. Все эти пробы более осмыслены.



И так как мы апеллируем потоком, нам нужно понимать, где мы в этом потоке, т. е. то, о чем я говорил.


Две картинки приложил. На левой – это момент старта нашего приложения. Оно запустилось, оно понимает, что ему до конца надо читать чуть больше 75 000 000 сообщений. Жрет этот лаг. И за 15 минут до нуля сжирает.


На второй – это скорость обработки сообщений из Kafka. Мы в то же время запустили приложение. Оно запустилось. Начало фигачить 130 000 сообщений в секунду. А потом дочитало до конца и сидит на потоке, т. е. оно вышло на плато и на этом плато работает.



И из этой штуки после того, как оно начало REDO, мы имеем право обслуживать чтение.


На чтение мы отдаем ошибку, если у нас большой лаг, т. е. мы не на хвосте находимся, значит у нас данные не консистентные.


И если просят диапазон больше, чем у нас есть, то мы не вправе отвечать на такой запрос, мы отдаем ошибку. Например, у нас memory storage 4 часа, а у нас просят 8 часов. И тот клиент, который запрашивает, он теоретически знает, кто и сколько часов хранит. У нас есть разные по размеру storages.


Соответственно, приходит запрос. И, как правило, это 1 000 ключей за раз. Он бежит и по каждому ключу вычисляет в какой он партиции. Все эти ключи шардятся на партиции, т. к. мы знаем функцию хэширования, мы можем понять, кто и в каких партициях. И вычитываем.


Вот что у нас получилось. 99 перцентиль у нас меньше 20 миллисекунд. Это в среднем обслуживание запросов чтения, в котором 3 500 метрик запрашивается. Если мы запрашиваем чуть меньше, т. е. 95 перцентиль – это где-то 600 ключей за раз запрашивается и это 3 миллисекунды.



  • Получили быстрое чтение. Мы таких штук можем поставить столько, сколько захотим.
  • Это делается легко. Нам не нужно базу данных переконфигурировать. Нам нужно просто запустить instance.
  • Мы ничего не писали про репликацию, но при этом мы имеем консистентное состояние, т. е. распределенное, консистентное, но мы ничего не писали про репликацию. Мы просто написали consumer, а все остальное дала нам Kafka. А еще мы расшардины, т. е. у нас несколько сегментов памяти. У нас 18 партиций. И война за локи в 18 раз менее значима с точки зрения задержки.
  • Но сидение на полном потоке метрик, которые у нас есть сейчас, это занимает 1,5 ядра. У нас там есть компрессия. Для того чтобы сообщение обработать, ему нужно read buffer раскомпрессить, вставить точку и обратно закомпрессить, иначе у нас в память не лезло, а так у нас в несколько раз все жмется.
  • И есть лаг на то, что у нас нет никаких снапшотов пока. И мы каждый запуск зачитываем сначала. Это долго и по ресурсам достаточно накладно.


Дальше мы переходим к LTS. В нашем случае эта штука называется chenker.


  • Она также абсолютно независима от тех компонентов, которые в памяти хранят данные. Она сидит на потоке всех изменений. И формирует blobs, о которых я говорил.
  • Она сама запоминает, откуда ей надо читать. Она должна помнить до куда она все обработала и до куда она все эти чанки сформировала. Она для каждой пачки метрик хранит свои offsets, хранит в Kafka. И в Kafka есть такая замечательная штука, как компактный топик. Про нее не буду подробно говорить, в документации все есть.
  • Она понимает, что 4 часа закончились, берет и формирует blob, и кладет его Kafka, просто в другой топик.
  • А если она понимает, что этот blob был сформирован, но пришла точка из прошлого в blob, она в этот же топик кладет другой тип message, что эта точка из прошлого.


И есть другая штука, которая сидит на этом отдельном топике. И если прилетает чанк, то она берет его и перекладывает в Cassandra. Мы сделали это специально через Kafka для того, чтобы иметь возможность потом экспериментировать с long term storage, т. е. вместо Cassandra взять и на потоке этих blobs зачитать во что-нибудь другое, например, в файле, в MySQL.


Если прилетает точка из прошлого, то она из Cassandra берет этот чанк, распаковывает, вставляет эту точку, запаковывает и кладет обратно. Так как эти штуки находятся в одной партиции, т. е. и чанк, и запоздалая точка приходят последовательно, то никакой проблемы с данными у нас не будет.


И для этих штук мы используем ConsumerGroup.



Что удалось добиться?


  • Удалось в 200 раз снизить количество записей в Cassandra, т. е. у нас было 30 000 writes в секунду, а стало 150, потому что мы blobs пишем один раз.
  • И Cassandra мы свою сдули. У нас Cassandra обслуживала хвост. И у нас было 12 нод и хвост был на SSD, а теперь нам это все не нужно стало, и мы поставили 3 ноды с большим количеством SATA-дисков. И по latency нас все устраивает и SSD нам больше не нужен.


И немножко поговорим о паттерных штуках. Что делать в случае ошибки? У нас есть некий лог и мы встретили в середине обработки какое-то плохое сообщение. Надо понять, что с ним делать. А отметить, что мы его выполнили, мы можем только offset’ом, т. е. только передвинуть свой маркер.



Встроенного механизма Kafka не дает. И я выделил 3 принципиальных момента, как это делать:


  • Если вы понимаете, что вы во что бы то ни стало должны это сообщение обработать, то вы должны вблочиться и до успеха повторять ту операцию, которую вы должны с этим сообщением выполнить, понижая частоту попыток и т. д.
  • Вы можете перекладывать плохое сообщение в отдельный топик. И потом отдельным worker’ом это все обрабатывать.
  • Или вы можете плохое сообщение перекладывать в начало этого топика, а обработать его потом. И помечать, что эту версию этого сообщения я типа обработал.

Нет никакого готового решения, что нужно делать так и никак иначе. Мы используем это в разных случаях.



Если провести аналогию с http, то в http есть хорошая классификация ошибок:


  • http-400 – это ошибки запроса, т. е. ошибки клиента. Вам послали неполный запрос, вам послали невалидный JSON. Это все ошибки клиента.
  • http-503 – ошибка сервиса. Например, тот storage, в который вы хотите записать, лежит или там еще какая-то непонятная ошибка.

Мы их как-то классифицировали и для разных типов мы понимаем, что стратегия действия совершенно разная.


Если у нас лежит тот storage, в который мы хотим записать, то откидывать сообщение в соседний топик бессмысленно, потому что мы создадим себе большую нагрузку на Kafka. Нам проще заблокироваться и дождаться, когда этот storage будет готов обслуживать наши запросы.


Бывает приходят битые штуки или штуки, которые упираются в лимиты, которых мы не ожидали. В Cassandra есть максимальный размер того, что вы можете туда положить. И очевидно, что этот максимальный размер нужно еще и на клиенте детектить и даже не пытаться его туда сделать, потому что вы таким образом не дифференцируете уперлись ли вы в лимит или просто Cassandra таймаутится, или отдает вам ошибку.


После того как вы свои ошибки классифицировали, работать с ними более-менее уже можно.



Поговорим про production-инсталляцию. По большому счету про Kafka особо и нечего рассказывать. Это лог, он работает. Если вы настроили нормальный уровень, гарантии записи туда и понимаете, как работать с consumers, то про это больше нечего рассказать.


В production на данный момент 6 брокеров. Мы находимся на версии 1.0.


Для этих брокеров мы не выделяем отдельные машины. На машине с Kubernetes мы отрезаем для них кусок.


Что значит отрезаем? На машине есть 2 диска, 2 SSD. Мы отдаем им целиком 2 SATA-диска. Мы в system d мы ограничиваем Kafka и говорим, что тебе не больше 4 ядер доступно и не больше 10 гигабайтов. А в Kubernetes мы говорим, что этих ресурсов у тебя нет. Т. е. мы зарезервировали 4 ядра и 10 гигабайтов под Kafka, поэтому Kubernetes туда ничего не шедулит. И получается все по-честному.



  • Сырые точки, которые являются WAL, мы храним 5 дней. Это занимает с учетом репликации в трех копиях 5 терабайт.
  • Чанки, вот эти blobs, мы храним 2 месяца. И это занимает 3 терабайта. Сами посмотрите, насколько это все сжимается и был ли смысл. Если был смысл, то это очень хорошо. Но при этом мы храним 2 месяца только в Kafka. В Cassandra мы ничего не удаляем, там все лежит намертво.
  • Имеем сейчас около 20 000 событий записей в Kafka в секунду на этих 6 нодах.
  • И суммарно эти 6 нод с учетом consuming, producing, тратят 10 ядер. Это суммарно 6 брокеров. И 45 гигабайтов памяти это тоже суммарно. Т. е. по большому счету она ресурсы не жрет.


Сейчас еще немножко расскажу про наш prod и будем заканчивать.


  • Rolling upgrade мы делали. Мы обновлялись через одну версию даже.
  • Делали все по инструкции. Все хорошо.
  • А потом она сдохла. Но сдохла она не по тому, что rolling не прошел, а потому что сдохла новая версия.


  • Был в Kafka чудесный баг. Это memory leak. То, что это memory leak понятно было сразу, потому что JVM доходит до верхней границы heap и не коллектит ничего.
  • В Kafka можно еще и пожать средствами Kafka. Если продюсер говорит, что мои сообщения зажаты в lz4, то consumer понимает, что это lz4 зажатые сообщения и начинает распаковывать. И у Kafka из-за этого типа компрессии что-то не так срабатывало. И когда мы запускали consumer, который читает зажатый топик, она начала течь.
  • Мы дали heap больше, чтобы она реже повисала, т. е. чтобы было время подумать. Потом поняли, что это lz4, т. е. нашли тикет, поняли, что это происходит именно в этот момент. Нашли корреляцию с тем, что мы делаем и с тем, как чувствует себя Kafka.
  • И решили consumer выключить, потому что нам повезло и у нас не было в prod lz4 зажатых топиков. Все, что нам надо зажать, мы сжимаем в payload сами руками. Мы все это отключили, решили не делать downgrade и вернули heap обратно, потому что большой heap – это зло.


Далее история про то, как Kafka масштабировать и как добавлять, удалять ноды.


Когда мы добавляем в кластер брокер, т. е. мы запускаем еще одну Kafka, которая смотрит на те же ZooKeeper, то говорим, что твой брокер ID N+1. Она запускается, ее все видят, все хорошо, но на нее ничего само не копируется. И вам нужно запускать некий тулинг, который сделает это руками.


В Kafka есть такой тулинг. Он называется kafka-reassign-partitions. Там есть стадия generation. Вы говорите, что вот у меня топик, вот такие-то партиции на таких-то брокерах находятся, а я хочу, чтобы этот набор партиций был размазан по нодам таким-то +1 или 2. И она вам генерирует новую раскладку.


Она для партиции, в которой есть изменения, прибавит нужное количество дополнительных реплик. Дождется пока они все станут синхронизированными. И реплики, которых в новой редакции нет, прибьет. Если в процессе этого лидер был, а это быть не должно, она лидера переизберет.



  • В общем, все логично, все классно, но эта штука на стадии generate заботится о том, чтобы по вашему списку брокеров партиции в штуках размазались равномерно.
  • Она ничего не знает про размеры, она ничего не знает про то, что должно быть меньше копирований. Она не учитывает то, что по факту будет происходить, т. е. был у тебя хороший список, а я тебе сгенерирую еще лучше, пусть там все переедет 10 раз в процессе, меня это не волнует. Т. е. если вы хотите, чтобы было оптимально, если вам большой поток данных надо перетаскивать, то generate или есть tools, которые принимает во внимание количество копирований, которые Kafka придется сделать.
  • И самый важный момент. Когда вы запускаете reassign apply и если вы не поставите лимит на скорость, а Kafka достаточно производительная, она умеет хорошо читать, хорошо все это пересылать без лишнего копирования в сеть, то она убьет и сеть, и диски, и чуть процессора.


И моя вторая история про то, как мы этот лимит не задали.


  • Мы не задали лимит.
  • Kafka нам все прибила. Она по ресурсам все выжрала.
  • И если лимит не задан, то задать его по постфактум нельзя для тех операций копирования, которые уже в полете, т. е. которые сейчас идут. Мы пробовали tooling, мы наспех почитали код, мы что-то в ZooKeeper поправили, но ничего не применяется.
  • И сделать ничего было нельзя, поэтому мы просто дождались, пока оно закончится и больше так не делаем.


Без лимита это точно запускать нельзя. Но если у вас нет там данных, то запускайте ради бога. Если вы запустили с лимитом, то конкретное значение лимита поменять можно в онлайне. Оно копируется, и вы понимаете, что что-то медленно. Ресурсы есть, но решили добавить еще парочку десятков мегабит в секунду. Это делается, Kafka с этим справляется. Не понимаю до сих пор, что там было с тем, когда лимит был задан и с тем, когда лимит не задан, но что-то не сработало.


Мы также не запускаем много переносов. Мы стараемся сделать так, чтобы в одной итерации этого reassign с одного лидера было только одно копирование. Т. е. с брокера ID № 5, если он лидер для трех партиций, то не стоит копировать все три партиции, потому что-то там у нее переклинивает и становится не очень хорошо.



И буквально еще пара историй.


Так как Kafka на уровне брокеров умеет данные распределять между разными хостами, то зачем ей совать REDO? И в начальных доках LinkedIn писал: «Ребята, ставь Kafka на REDO. Все классно, если у вас несколько дисков. Это нормально». А потом появился функционал, который позволяет Kafka сказать: «Вот тебе 5 дисков, используй их все».


Но они, видимо, так же, как нормальные инженеры, идут от простого к сложному. Поэтому в нормальной ситуации, когда она говорит, что у меня Kafka запущена, брокер с двумя дисками, я начинаю по ним размазывать партиции, т. е. я reassign на нее назначаю, и она будет распределять между дисками. Она апеллирует штуками партиций. Т. е. если вы запустите 2 топика: в одном 386 партиций размером по 5 килобайт, а в другом 20 партиций и каждая по 100 гигабайтов, то для нее это одни и те же партиции. Соответственно, перекос балансировки будет.


И сказать ей перебалансируй в пределах хоста, я так понимаю, нельзя. Были какие-то зачатки, что это будет, но на тот момент, когда мы это делали, их не было. И надо понимать, что вся балансировка данных происходит на момент создания партиции. Дальше оно как пишется в одну партицию, так и пишется. Если у вас партиция разрослась и не влезает в диск, то вы с этим ничего не сделаете. Надо руками вмешиваться.


Мы, когда хотим новый брокер подключить, мы самые тяжеловесные берем партиции и делаем reassign только на них. И то не сразу, потому что предыдущий пункт.



Kafka у нас больше года в prod.


К ней надо привыкнуть, она достаточно специфичная, но наработает.


И я из этого сделал выводы, что, когда мы обновлялись в Cassandra, там у них с качеством было все хорошо. Релиз вышел, мы подождали 5 дней. Критичного ничего нет, можно катиться. В Kafka оказалось похуже. Нулевые релизы я бы ставить больше не стал. Подождал бы долго. В message появились заголовки. Мы за ними погнались. Но после того, как у нас все это упало, мы их даже юзать не стали. Лучше бы подождали.


И все maintenance-операции там с подковыркой. Когда она начинает что-то копировать, надо ее ограничивать, чтобы ничего не убило. И несмотря на то, что confluent говорит, что мы выпустили классный оператор, который в Kubernetes вам все сделает, потому что он там имеет ребалансировку, то зная, как мы там бились с reassign, то что-то мне пока не хочется Kafka втаскивать в Kuber.



Бонусы, которые мы получили, очень классные:


  • Это легкость экспериментов, когда у нас есть источник правды из сырых данных, мы можем делать экспериментальные штуки. Мы делаем новый сервис с другой компрессией.
  • Можно ставить на этот же поток рядом с production. Это ни на что не повлияет. В клиенте мы можем сделать shadow-запросы. Т. е. у нас есть запрос в основной storage, делаем запрос в экспериментальный. Сравниваем результат, сравниваем latency. Мы это делали и это круто. Т. е. когда вам нужно на prod под нагрузкой это все разрабатывать, это реально круто.
  • И так как это secretion read, secretion write, вы можете для этого использовать дешевые диски. Вам не нужно SSD. И вы можете хранить много. Если вам хочется получить данные за 3 месяца назад для ваших экспериментов, то вы реально можете это сделать.


И второй момент, о котором я хотел бы сказать. Так как Kafka сама надежная и распределенная, вы можете переходить от кластера Cassandra к набору независимых баз данных. И это прекрасно, потому что вам не нужно заботиться о репликации.


Если вы хотите попробовать Mongo для своих задач, то посадите ее на поток событий и посмотрите, как она справится с этим размером данных. Вы можете посмотреть, насколько у Postgres большой write amplification по сравнению с MySQL и т. д. Мы это тоже используем и это реально круто.


И если вы определитесь с лагом, т. е. когда оно консистентно, а когда оно не консистентно, вы можете рассчитывать, что эта штука надежно реплицируема. При этом репликацию в своем коде вы не писали, да и не нужно.



Подводим итог. Почему велосипедостроение?


  • Потому что по большому счету то, что мы сделали, это очень специализировано для нас. Оно никому не нужно и это не наша фокусная деятельность. Я бы искренне не хотел этим заниматься. Но так как нам это было нужно, мы это сделали.
  • Если вы понимаете, что для ваших задач, для вашего бизнеса это нужно:

  1. Подумайте: «Можете ли вы это не делать?».
  2. Если вы все-таки хотите сделать свою базу данных, вы должны изначально разобраться, как работают большие базы данных с хорошей историей, с умными разработчиками.
  3. И тогда можно сделать что-то наподобие. Репликацию отсюда, WAL отсюда. И Kafka в качестве WAL, на мой взгляд, оправдала себя. Она сильно позволила нам упросить то, как мы делали свою распределенную и надежную базу данных. И часть задач она сняла. Мы очень немного ресурсов потратили на нее. Она не много в эксплуатации требует от нас.
  4. И берите за основу готовые компоненты, дописывайте недостающие куски кода. И будет вам счастье.

Это все, что я хотел рассказать. Спасибо!



Добрый день! Меня зовут Геннадий, Московская биржа. Спасибо большое за доклад! Очень интересно! Очень актуально! Kafka как WAL – это, действительно, очень крутая идея. Мы на нее очень долго смотрим. Может быть, наконец-то, начнем это делать. Где-то полгода назад на JPoint Гамов рассказывал, что они в Kafka 1 втянули транзакции для обработки сложных events, чтобы было проще работать. Вы пробовали использовать транзакции, в частности, для обработки ошибок? Вы в начале упомянули, что с этим были сложности.


Нам не нужны транзакции там, где мы сейчас используем Kafka, потому что все изменения, которые у нас есть, они идемпотентны. Мы можем их проигрывать много раз. И даже если в Kafka что-то появится на этот счет, усложнять это все мы не хотим. Нет профита для нас использовать транзакции. И мы это не делаем.


Здравствуйте! Спасибо за доклад! Вы могли бы рассказать про выбор функции хэширования по определению номера партиции? Там должны быть все партиции одинаково сбалансированы с одной стороны, а с другой стороны, функция должна быть достаточно быстрой.


Мы взяли самую простую, что была в Kafka. Но взяли и переопределили ее сами для того, чтобы, если Kafka-клиент обновится, чтобы он был, а у нас зафиксирован. У нас в штуках событий очень много, поэтому любое распределение у нас получилось равномерное. Нам с дисбалансом партиций и нагрузки не пришлось ничего делать. Выборка ключей пачек метрик – миллионы. И они примерно в штуках распределены.


Я показывал график, там примерно одинаковое количество. Мы просто по ключу взяли некоторую функцию от ключа без знания о том, сколько там данных, сколько штук. И остаток их деления, условно, хэша на количество партиций, дало нам распределение.


Привет! Спасибо за доклад! Тюнили ли вы bunch size на продюсере под размер сообщения или с дефолтным bunch size у вас все нормально работало?


У нас bunch в том плане, что у нас приходит за раз, условно, 1 000 метрик. Мы не каждую пачку пишем, мы в пределах запроса делаем все bunch’ем одним, т. е. между запросами Kafka-клиент ничего не накапливает, поэтому у нас особого тюнинга на этот счет нет.


А когда вы писали в Cassandra или в memory storage, вы просто в consumer свой писали?


Да, у нас есть consumer и memory там встроенная. И есть consumer, который выгребает эти чанки и складывает. Т. е. несколько consumers под разные задачи.


И еще один вопрос. Prometheus и Graphite как базы для метрик вам не подошли?


Какие базы нам не подошли? Prometheus, Influx и, по-моему, что-то еще тестировали. Не подошло. Какие-то вещи мы тестировали-тестировали, а какие-то вещи мы понимали, что они нам по дизайну не подойдут.


Есть вопрос по поводу того, что вы сказали, что не нужно использовать под Kafka REDO. У вас не уходят брокеры целиком, когда ломается один из дисков?


Я не скажу по поводу отказа дисков. Там что-то сделали. По-моему, это конфигурируемое поведение. В Cassandra есть конфигурируемое поведение, когда есть jbot, т. е. либо не обслуживать запись на этом диске, либо целиком ноду фейлить. В Kafka – я затрудняюсь. Может быть, она уйдет целиком, но это не беда.


Спасибо за доклад! В чем профит отрезать ресурсы на тех же тачках, где у вас Kuber, чтобы дать это Kafka? Это просто утилизация или потому что можете?


Мы используем большие и жирные железки. Мы не используем виртуалки, поэтому давать Kafka жирную железку мы не хотим. Мы хотим, чтобы Kafka нагрузила 2 диска и взяла немножко CPU. А остальные CPU и память мы хотим использовать для дела. И ровно поэтому мы это делаем.


Привет! Спасибо за доклад! Я, к сожалению, присутствовал не сначала, поэтому, может быть, я спрошу то, что ты уже освящал. Как вы мониторите брокеры Kafka, что они не отвалились, что они живы и работают?


У нас есть мониторинг. Мы делаем мониторинг и у нас есть мониторинг мониторинга. Он умеет Kafka мониторить. И он нам рассказывает про все бяки, что с Kafka происходит.


Можете про это чуть-чуть конкретней рассказать?


У тебя живы какое-то количество брокеров. Это значит, что что-то будет работать. Мы смотрим есть ли у нас недосинхронизированные партиции. Мы следим за тем, чтобы распределение лидеров партиций было равномерным по брокерам. Мы следим за тем, чтобы диски не упирались в полку. Мы следим за тем, чтобы Kafka не упиралась в лимит CPU. Мы следим за тем, чтобы ZooKeepers были живы и у них не было вылетевших нод и т. д. Это базовые вещи. Есть софт, его надо мониторить.


Николай, спасибо за доклад! Очень интересный опыт использования Kafka. У меня один вопрос и одно замечание. Вы описали ваше шардирование. Оно выглядит так, как будто одна метрика попадает в одну конкретную партицию всегда. Может быть ли такая ситуация, когда одна метрика станет очень жирной? Одну партицию обслуживает один брокер, т. е. может ли случится такое, что он начнет не справляться?


Нет, у нас есть лимиты на размер метрики. Это должна быть очень-очень жирная метрика. И мы ее не примем. Мы будем разбираться, почему она такая. И после того, как мы ее зарежектили, мы постараемся понять, как сделать так, чтобы метрика пришла в нескольких кусках. С этим сейчас нет проблем и не предвидится.


Т. е. это не зависит от объема трафика?


Одна метрика – это какое-то количество float за какой-то timestamp, т. е. не может быть гигабайта и т. д. Это конкретный замер за вот этот timestamp.


Я понял. Второй вопрос. Вы говорили про возможность Kafka использовать несколько дисков и то, что она не балансирует и может складывать на один диск жирный объем данных, а на другой не очень. У Kafka есть понятие «сегмент», т. е. она режет партицию на сегменты. И если партиция за много дней, то это терабайты данных. Но она не создаст многотерабайтный файл, а порежет на несколько. И в этом случае идет раскладка на файловом уровне. Сегменты раскладываются в разные директории, которые в конфиге.


У меня было другое представление. Если по факту так, как вы говорите, то это классно. Но мы столкнулись с тем, что была именно единица позиционирования на диск, т. е. именно партиция целиком. Но с другой стороны, когда Kafka это делает, она не знает, сколько там будет данных, поэтому, может быть, в версиях они переделали, что единица балансинга – это сегмент. Это было бы логично. И если они это переделали, то круто. Значит я сказал неправильно.


Она действительно не смотрит на размеры. Она просто рандомно размазывает сегменты.


Здравствуйте, Николай! Спасибо за доклад! Чем вас не устроили существующие TSDB?*


Prometheus не умеет много копий надежно раскладывать по нодам. Он не умеет хранить LTS. У него есть с этим проблемы. И те запросы, которые мы отправляем на чтение, содержащие 10 000 метрик на запрос, Prometheus под этим складывался.


У нас чуть другие требования к TSDB, исходя из нашего workload. У нас есть пользователи, которые смотрят графики, но большинство нагрузки создают наши триггеры, которые постоянно эти данные дергают и читают. Мы их еще не переделали на поток. И это еще одна причина, почему Kafka – это наш осознанный выбор, потому что триггеры будем проверять на потоке изменений. Мы будем следить на потоке и не будем читать из этого storage.