
Привет, Хабр! Меня зовут Алексей Николаевский, и мы с командой делаем СУБД Яндекса. С 2013 года в Яндексе использовали Kafka для потоковой передачи данных. Но Kafka со временем перестала справляться с растущими объёмами, и в 2017 году мы перешли на своё решение.
Брокер сообщений YDB Topics во многом вдохновлялся Kafka: в нём также есть топики, партиции и аналогичные способы работы с данными. Но есть и существенные отличия, о которых в конце прошлого года я рассказал на московской конференции HighLoad. Под катом — адаптированная для Хабра статья по мотивам этого доклада: про архитектуру транзакций в обеих системах и интересные для разработчиков детали и нюансы, которые мы обсуждали на конференции.
Модель данных топиков
С помощью брокеров сообщений, таких как Kafka и YDB Topics, пользователи надёжно сохраняют и читают данные в виде упорядоченных сообщений. Сообщения сгруппированы в топики, а каждый топик разделён на партиции для масштабирования.
Клиенты (SDK) подключаются к кластеру, дописывают сообщения в конец партиций и читают сообщения из партиций с разными смещениями. Чтобы масштабирование с помощью партиций работало корректно, брокеры сообщений объединяют клиентов в группы. Для Kafka такие группы называются consumer groups, а в YDB Topics просто consumers.

Чтобы клиенты могли отказоустойчиво обрабатывать новые сообщения и не читать уже обработанные, брокеры надёжно сохраняют смещения чтения. Для каждой партиции и каждой группы клиентов Kafka и YDB Topics хранят смещения, начиная с которых группа клиентов ещё не обработала сообщения.
Каждый клиент из группы может запросить такое смещение и приступить к чтению сообщения, начиная с него. Прочитав и обработав некоторое количество сообщений, клиенты сдвигают (коммитят) смещение, чтобы в случае перезагрузки или ребалансировки группа клиентов не обрабатывала сообщения повторно.
Когда появляется необходимость в транзакциях
Каким образом в реальных проектах используется надёжная среда передачи и обработки данных, которую предоставляют брокеры сообщений? Например, для работы с логами, которые нельзя терять.
В большой поисковой системе десятки тысяч серверов обрабатывают запросы пользователей, а для обработки логов нужно гораздо меньше серверов. Использование брокера сообщений позволяет надёжно хранить логи, пока они ещё не обработаны. А если в каждую партицию записывать логи от диапазона пользователей, то сервера смогут держать в памяти кеш по тем пользователям, логи от которых они обрабатывают, и вычитывать много сообщений за раз.
Но у архитектуры с шардированием партиций по диапазону пользователей есть два важных недостатка:
Сервера обрабатывают запросы от всего диапазона пользователей. Поэтому каждый сервер будет записывать логи во все партиции. Для 50 тысяч серверов и тысячи партиций получается 50 миллионов активных TCP-подключений к брокеру сообщений, что даёт высокую нагрузку на инфраструктуру.
Обработав поисковый запрос, каждый из 50 тысяч серверов должен выбрать партицию по идентификатору пользователя и записать туда лог. Партиций много, и сервера не могут накапливать в памяти, группировать и эффективно записывать сообщения группами (батчевать), что увеличивает нагрузку на брокер сообщений ещё больше.
Оба эти недостатка хорошо известны и устраняются с помощью «решардирования» — добавления вспомогательного топика и небольшого количества промежуточных серверов для перекладывания сообщений между двумя топиками.
В улучшенной архитектуре каждый из 50 тысяч поисковых серверов пишет в новый вспомогательный топик, при этом партиция выбирается не на основании идентификатора пользователя, а на основании идентификатора сервера. Решаются обе проблемы на стороне пишущих серверов: каждый сервер устанавливает сетевое соединение только с одним топиком и может группировать вместе сообщения для быстрой записи.
Новый вспомогательный топик адаптирован под пишущие сервера, логи в нём шардированы по идентификатору сервера. А основной топик — под сервера обработки, логи в нём шардированы по идентификатору пользователя.

Дополнительные сервера промежуточного слоя берут на себя задачу перекладывания логов из вспомогательного топика в основной, «решардирования». Таких серверов может быть ещё меньше, чем серверов обработки, например, 500. Прочитав сообщение из вспомогательного топика, промежуточный сервер получает идентификатор пользователя из сообщения, на основании этого идентификатора выбирает партицию основного топика и записывает сообщение туда.
Каждый промежуточный сервер будет писать сообщения в каждую партицию основного топика. Но так как серверов промежуточного слоя гораздо меньше, чем поисковых серверов, то решардирование значительно снижает нагрузку на очередь.
В общем виде задача решардирования формулируется как «переложить данные из топика А в топик Б с соблюдением нескольких требований»:
Локальность данных. Например, «маленькие буквы в первую партицию, большие буквы во вторую», как на иллюстрации ниже.
Сохранение порядка сообщений из исходной партиции. Будет странно встретить в логе сначала запрос про адрес магазина, а потом запрос «где купить утюг?».

Переложить данные нужно ровно один раз (обычно такое требование называется exactly once). Мы хотим отличать настойчивого пользователя, который ищет одно и то же по нескольку раз, от ошибок при обработке.
Если перекладывать сообщения между вспомогательным и основным топиками самым простым способом, то алгоритм работы промежуточных серверов будет выглядеть вот так:
Читаем сообщение из вспомогательного топика.
Записываем это сообщение в основной топик.
Дожидаемся подтверждения о записи сообщения.
Сдвигаем закоммиченное смещение чтения во вспомогательном топике.
Когда смещение чтения сдвинуто — переходим к следующей итерации цикла.
У такого простого алгоритма есть серьёзный недостаток: если во время его выполнения сервер перезагрузится, то состояние исходного топика ещё не успеет поменяться, и закоммиченное смещение чтения не будет сдвинуто.
А вот в целевом топике очередное сообщение может быть как уже записано, так и ещё нет. И если оно записано, то итерация цикла после перезапуска сервера приведёт к дублированию сообщений, что недопустимо — свойство exactly once не будет обеспечено.

Эта проблема решается с помощью атомарных транзакций: сдвиг смещения чтения и запись сообщения в целевой топик происходит атомарно. Если смещение не сдвинуто, то сообщение не может оказаться в целевом топике.
Свойства транзакций: ACID
Разработчики СУБД сформулировали несколько требований к транзакциям, выполнение которых позволяет избежать типовых проблем. Эти требования собраны в акроним ACID.
Атомарность (Atomicity), A в ACID
Все записи в транзакции должны применяться (коммититься) вместе либо отменяться. Брокеры сообщений позволяют сделать два типа записи: запись сообщения в топик и сдвиг смещения чтения.
Брокер сообщений должен гарантировать, что если клиент успешно закоммитил транзакцию, то все изменения в такой транзакции применились и видимы для других клиентов. То есть невозможна ситуация, когда пользователь записал сообщения в два топика, успешно закоммитил транзакцию, но один из серверов перезагрузился и сохранилось только одно сообщение.
Согласованность (Consistency), C в ACID
Это требование обозначает, что после применения транзакции система должна переходить из одного корректного состояния в другое.
На практике «корректность» часто интерпретируется на уровне приложения, а брокер сообщений лишь предоставляет инструменты для его соблюдения. Например, если программист перекладывает сообщения из исходного топика в целевой и сдвигает смещение чтения, то корректным состоянием будет появление в целевом топике тех сообщений, которые были в исходном между сдвигами позиции чтения. А некорректным, соответственно, будет дублирование или потеря этих сообщений.
Изоляция (Isolation), I в ACID
Все записи транзакции должны стать видимыми для других транзакций только после её коммита.
Долговечность (Durability), D в ACID
Закоммиченные данные не должны быть потеряны (в рамках отказоустойчивости СУБД).
Согласованность транзакций
Если нужно обеспечить изоляцию для брокеров сообщений, достаточно сделать так, чтобы данные не были видны пользователям (коду, который использует клиентский SDK), пока не выполнен коммит. С долговечностью тоже нет проблем: если хранить данные с избыточностью, то выход из строя какого-то количества серверов не приведёт к потере данных.
Что может произойти, если не предусмотреть механизма для выполнения требования согласованности? Представьте такую ситуацию. Сервер выполняет транзакцию: атомарно, изолированно, долговечно. Получает смещение чтения из исходного топика, записывает данные в целевой топик. И зависает.
Если решение спроектировано как отказоустойчивое, то через некоторое время топик начнёт читать второй сервер. А так как первый сервер выполнял транзакцию изолированно, то второй сервер ничего не знает об уже записанных в целевой топик данных. Он получает то же смещение чтения из исходного топика, записывает данные в целевой топик, сдвигает смещение чтения и завершает транзакцию. После чего «отвисает» первый сервер.
Так как транзакции изолированы друг от друга, а первый сервер всё ещё выполняет свою транзакцию, то он ничего не знает о том, что второй сервер уже записал данные в целевой топик и сдвинул смещение чтения. Поэтому первый сервер, который раньше второго записал данные в целевой топик, сдвигает смещение чтения и завершает свою транзакцию.
В результате данные оказываются записаны в целевой топик два раза. Чтобы такого не произошло, сервер должен выполнять требование согласованности: если две транзакции пытаются переложить между топиками одни и те же данные — то одна из транзакций должна быть отменена.
Транзакции в Apache Kafka
Для организации транзакций Kafka использует партиции и два координатора — consumer coordinator и transaction coordinator:
Координаторы — это два служебных топика. У каждой партиции топика есть лидер, и на этих лидерах обоих топиков Kafka запускает отдельные процессы, которые надёжно сохраняют нужную для работы транзакций информацию.
Consumer coordinator хранит для каждого клиента информацию, нужную для координации и балансировки партиций: список закоммиченных смещений чтения по партициям и список ещё незакоммиченных транзакционных изменений.
Transaction coordinator хранит метаинформацию для каждой транзакции: её статус, инкрементальное поколение, список партиций и список задействованных в транзакции клиентов.
Consumer coordinator использует имена клиентов в качестве ключей сообщений, а Transaction Coordinator — идентификаторы транзакций. Оба топика компактифицированные: старые записи по ключам удаляются, так что в топике содержатся только последнее состояние для клиентов и транзакций.
Оба координатора являются реплицированными конечными автоматами (replicated state machines). Kafka гарантирует, что координаторы будут запущены рано или поздно вместе с лидером партиции и что будет не более одного координатора, способного писать в партицию. Координаторы — отказоустойчивые, консистентные и высокодоступные компоненты, которые надёжно сохраняют всю информацию о транзакциях.
Теперь, когда вы знаете о принципах работы обоих координаторов, можно описать алгоритм транзакций в Kafka:
Пользователь создаёт транзакцию.
Transaction coordinator надёжно сохраняет в своём топике участников транзакции: партиции для записи и клиентов для сдвига смещения чтения.
Пользователь выполняет операции внутри транзакции, а Kafka надёжно сохраняет изменения в партиции топика и consumer coordinator. Но делает это так, что изменения пока недоступны другим пользователям.
Пользователь коммитит транзакцию.
Transaction coordinator начинает рассылать партициям и consumer coordinator, участвующим в транзакции, нотификацию о том, что изменения надо сделать доступными другим пользователям, «опубликовать».
Участники транзакции публикуют изменения.
Как Kafka надёжно сохраняет изменения данных, чтобы они были недоступны пользователю (коду, который использует клиентский SDK)? Для этого данные записываются в партицию как обычные сообщения, но в метаданных для них указывается transaction ID. Вся магия по недоступности таких данных до публикации реализована на стороне клиентского SDK:

Чтобы опубликовать изменения, transaction coordinator записывает в каждую партицию специальное сообщение «commit marker», которое тоже сразу же доступно клиентам (SDK) и сигнализирует о том, что опубликованные ранее сообщения можно читать:

Как транзакции в Kafka меняют чтение? Предположим, клиент (SDK) прочитал сообщения 6, 7 и 8 на иллюстрации ниже. Но перед тем, как отдать их пользователю, SDK помещает прочитанные сообщения в буфер, который держит в памяти. Так как сообщение 6 было записано вне транзакции, то оно отдаётся пользователю. А 7 и 8 будут находиться в буфере, пока SDK не прочитает commit marker. Таким образом, транзакционно записанные данные будут отданы пользователю только после коммита транзакции:

Consumer coordinator хранит не только закоммиченное смещение чтения, но и новое смещение, которое ещё не опубликовано. А при публикации обновляет закоммиченное смещение, делая новое значение доступным для пользователей.
Kafka использует два механизма для борьбы с конфликтами в транзакциях. Первый механизм, который был придуман для защиты от «зомби-процессов» пользователя, — поколение транзакций. Если процесс завис и пользователь запустил новый с тем же самым идентификатором транзакции, то у нового процесса поколение увеличится, и транзакция от предыдущего процесса закоммичена никогда не будет.
Для борьбы с конфликтами смещения чтения Kafka проверяет «актуальность» процессов чтения. В показанном ранее примере первый сервер, начавший транзакцию, зависает, и ту же транзакцию начинает выполнять второй сервер. Consumer coordinator различает клиентов (SDK) и надёжно сохраняет информацию о том, какой именно клиент читает какую партицию. Так что если первый сервер, который читал партицию ранее, «отвиснет» и попробует закоммитить транзакцию, — то Kafka отменит его транзакцию с ошибкой.
Несмотря на то что транзакции в Kafka появились достаточно давно, полноценная защита от конфликтов появилась только в версии 3.7, релиз которой состоялся в начале 2024 года.
Начальный дизайн Kafka был сделан без учёта транзакций: это выражается в том, что транзакции внесены в топики инородно, с размазыванием логики между записью, чтением и клиентом (SDK). Но такой дизайн позволил сделать Replicated State Machines — техническое решение, с помощью которого реализованы consumer coordinator и transaction coordinator.
Архитектура транзакций YDB
В СУБД Яндекса за каждую партицию топика отвечает «таблетка». В архитектуре есть аналог transaction coordinator, а компонент proxy обрабатывает запросы от клиента.

Таблетки в YDB тоже являются replicated state machines — принцип построения другой, но гарантии надёжности хранения состояния те же самые, что и у координаторов Kafka. Для хранения своего состояния таблетка использует набор пар «ключ — значение». По ключу может храниться ссылка на блоб данных или какое-то значение.
На иллюстрации ниже показано, как может выглядеть состояние таблетки для партиции. Семь сообщений от пользователя сериализованы в четыре блоба, а закоммиченное смещение чтения хранится как значение ключа c_my-consumer (где «my-consumer» — заданное пользователем имя, а «c_» — используемый YDB префикс):

Изменение состояния каждой таблетки само по себе транзакционно. Кроме пользовательских транзакций, в YDB есть то, что мы называем «детерминистические транзакции», которые построены поверх изменения состояний индивидуальных таблеток.
Это точечные распределённые транзакции: они глобально упорядочены по виртуальному времени, и в каждой из них участвуют несколько таблеток. О возможности выполнить транзакцию таблетки договариваются в момент выполнения, и если какая-то таблетка считает, что возник конфликт, то такая транзакция будет отменена.
За детерминистические транзакции отвечает координатор: это тоже таблетка, которая запоминает участников таких транзакций, назначает виртуальное время транзакции и сообщает всем участникам транзакции очередное время и какие транзакции выполнить.
Пользовательская транзакция в YDB
Алгоритм пользовательской транзакции в YDB состоит из следующих шагов:
Пользователь стартует транзакцию, после чего может в любом порядке писать сообщения в топик, читать из топика, выполнять SELECT и INSERT-операции над таблицей, пока не закоммитит транзакцию. Некоторые шаги пользовательской транзакции, например работа с таблицей и коммит транзакции, — это детерминистические транзакции. Во время транзакции сообщения пользователя надёжно сохраняются в партиции и, так же как в Kafka, недоступны другим пользователям.
Когда пользователь начинает коммит транзакции, то YDB использует детерминистическую транзакцию, чтобы сохранить все нужные данные. Детерминистическая транзакция надёжно сохраняет во всех участниках транзакции идентификатор этой транзакции и метаданные, которые, в свою очередь, включают обработанные смещения чтения и информацию о том, какие записанные во время транзакции сообщения нужно опубликовать.
В координаторе надёжно сохраняется намерение закоммитить именно эту детерминистическую транзакцию со списком партиций-участников.
Координатор назначает время и будет рассылать команду на выполнение транзакции, пока она не выполнится.
Участники договариваются между собой и, если нет конфликтов, выполняют транзакцию и публикуют изменения.
Как запись и надёжное сохранение сообщений без их публикации (фиксация) устроены внутри таблеток? Поскольку состояние таблетки — это набор пар «ключ — значение», то для фиксации транзакционных сообщений достаточно записывать их по специальным ключам, имена которых начинаются с «t_».
Такие сообщения не будут доступны пользователю (коду, который использует клиентский SDK) до завершения транзакции. А для публикации достаточно будет переименовать ключи, поменяв префикс:

Изменение смещения чтения устроено следующим образом. Клиент (SDK) запоминает все вычитанные смещения в виде отрезков смещений. Когда клиент коммитит транзакцию, то передаёт на сервер все смещения, а не одно число, как в Kafka. Получив смещение от клиента, сервер может определить, есть ли конфликт: если закоммиченное смещение пересекается с переданными, значит, его передвинул другой клиент и транзакцию надо отменить.
Когда мы разрабатывали YDB Topics, то в YDB уже были транзакции между таблицами, и топики органично вписались в существующую архитектуру, дав возможность использовать топики и таблицы в одной транзакции. Используя YDB, можно транзакционно перекладывать данные между топиками и таблицами с соблюдением ACID транзакций и используя простой код.
Сравнение транзакций в Kafka и YDB

Алгоритмы транзакции в Kafka и YDB очень похожи. У YDB есть дополнительный шаг, но он обусловлен тем, что компонент proxy хранит данные в оперативной памяти, а не сразу в координаторе.
Пользователям транзакционных систем важно время выполнения транзакций. Основной вклад в него вносят дисковые операции.
Алгоритм транзакции в Kafka использует пять последовательных сохранений на диск. У YDB их шесть: последний шаг занимает три сохранения, и мы работаем над тем, чтобы это исправить.
А вот по возможностям системы различаются:
У YDB есть транзакции между топиками и таблицами с сохранением всех гарантий ACID. В Kafka есть только топики, поэтому такие транзакции невозможны. Отдельно стоит заметить, что в Kafka есть compacted топики — но это не полноценные таблицы.
Транзакции в Kafka пессимистичные: нельзя начать новую транзакцию, пока старая не закоммичена или не отменена. В YDB транзакции оптимистичные: в случае конфликта одна из транзакций будет отменена в момент попытки коммита. Нельзя сказать, что оптимистичные транзакции лучше пессимистичных или наоборот. В зависимости от решаемой задачи могут быть лучше транзакции как одного типа, так и другого.
У Kafka способ хранения сообщений в транзакции без публикации усложняет чтение и замедляет восстановление после сбоев. Например, если у вас коммитится только 1% транзакций, то клиенты будут читать гигабайты данных, которые на самом деле не нужны.
Благодаря использованию ключей для маркировки данных транзакции, в YDB данные попадают в конец партиции, в то время как в Kafka они оказываются размазанными по партиции. Также, в отличие от Kafka, транзакции в YDB не меняют логику чтения данных.
Производительность транзакций в Kafka и YDB
Мы сравнили производительность систем на одинаковом железе. В первом тесте мы писали одним процессом 100 Мбайт/с в 100 партиций и коммитили транзакции раз в полсекунды. Как видно на иллюстрации ниже, задержка от момента формирования сообщения до подтверждения записи у YDB в низких перцентилях лучше, а в высоких перцентилях хуже, чем у Kafka. Это явно бутылочное горлышко текущей реализации, и мы это исправим.
А вот end-to-end задержка от создания сообщения на клиенте до его получения после коммита транзакции у систем сравнимо. В Kafka задержка будет меньше, так как клиент сразу вычитывает сообщения и хранит их у себя в буфере, а в YDB данные будут прочитаны только после коммита транзакции.

По потреблению ресурсов Kafka тоже выигрывает. Тот же тест, только пишем в 8 раз больше: 800 Мбайт/с. Мы сравнивали работу систем как с т��анзакциями, так и без них. По результатам теста видно: и Kafka, и YDB немного деградируют при появлении транзакций, увеличивается потребление CPU. Также YDB несколько деградирует по сети и нагрузке на диски.

Узким местом обычно становится пропускная способность диска, а здесь системы сравнимы. Мы планируем оптимизировать использование сети и деградацию по CPU при выполнении транзакций.
Выводы
И Kafka, и YDB архитектурно шли к транзакциям с разных сторон, но в результате архитектура получилась похожей. У Kafka транзакций не было, авторы поняли, что они нужны, и добавили эту фичу. В YDB же транзакции для таблиц были сразу, их расширяемая архитектура позволила при появлении топиков органично добавить транзакции к ним.
Если у вас есть задача, которую вы решаете перекладыванием из Kafka в PostgreSQL, — то YDB позволяет решать такую задачу с помощью одной системы вместо двух. В Kafka таблиц нет.
Обе архитектуры можно продолжать улучшать. YDB можно оптимизировать, уменьшать количество операций ввода-вывода. Но и Kafka могла бы хранить некоторые шаги в оперативной памяти, это вполне возможное изменение архитектуры.
В планах — сравнить обе системы в случае отказов. Рестартовать брокеры, клиенты и смотреть, как это влияет на пользовательские транзакции. И, несомненно, мы будем продолжать улучшать транзакции в YDB!
На прошедшей конференции Yandex Scale мы анонсировали версию YDB 25.2.1, в которой расширили поддержку Kafka API и добавили возможность использовать транзакции Kafka, которые эмулируются с помощью описанных в статье транзакций YDB.
База данных YDB и её компонент YDB Topics доступна как Open Source проект и как коммерческая сборка с открытым ядром. Вы можете запустить её у себя или воспользоваться нашим managed-решением в Yandex Cloud.
Мы общаемся с нашими пользователями в Telegram и на Хабре, ценим мнение коллег по индустрии и готовы обсуждать в комментариях и на предстоящей конференции HighLoad всё, что связано с обработкой данных.