Добрый день, уважаемые ИТ-ники. На связи Михаил Голованов. Продолжаем увлекательные эксперименты с tiered storage в Apache Kafka. Первая часть «Присматриваемся к Kafka tiered storage». Во второй части статьи починим, что не смогли починить ранее (сборка реализации AivenOpen) и поэкспериментируем с реализаций для Minio S3 в качестве бекэнда. Как и в прошлый раз, теория будет перемежаться практикой.

Оглавление

  1. Kafka в ВТБ

  2. Краткие итоги предыдущей и содержание текущей статьи

  3. Чиним e2e тесты в Aiven Open

  4. Remote Storage Manager глазами программиста
    Что такое CustomMetadata
    Класс исключительной ситуации RemoteStorageException и его подтипы
    Класс RemoteSegmentMetadata
    Метод copyLogSegmentData
    Методы fetchLogSegment
    Метод fetchIndex
    Метод deleteLogSegmentData

  5. Копаемся в реализации Aiven Open
    Анализ требований
    Ключевые технические решения
    Выделение слоя работы с различными провайдерами хранилища данных S3 (provider backends)
    Разделение данных на цепочки (chunking)
    Сжатие данных средствами Remote Storage Manager
    Шифрование
    S3 Multipart Upload
    Локальный кэш данных
    Структура исходного кода
    Осматриваемся на местности
    Ищем вход
    Код чтения данных сегмента
    Код чтения индексов
    Код удаления данных из remote storage
    Вывод

  6. Запускаем реализацию AivenOpen
    Настраиваем и запускаем Minio
    Собираем из исходников дистрибутив AivenOpen
    Копируем библиотеки AivenOpen в инсталляцию Apache Kafka
    Настраиваем и запускаем кластер Кафка
    Создаем топик с поддержкой remote storage
    Проверяем работоспособность

  7. Осмысливаем результат

Kafka в ВТБ

Работая в банке ВТБ в качестве архитектора стрима «Интеграция», я занимаюсь развитием продуктов и сервисов, позволяющих информационным системам обмениваться информацией быстро, надежно и дешево (выберите любые два пункта).

Кафка активно используется в банке уже более трех лет для обмена по шаблонам Publish-Subscribe и Streaming. Сейчас в промышленном контуре развернуто уже около 250 кластеров. Любая команда разработчиков банка может нажатием пары клавиш за 10 минут развернуть кластер Кафка в конфигурации с высокой доступностью и георезервированием, подключенным к сервисам мониторинга, аудита и т.д. В ближайшее время мы сделаем и централизованную инсталляцию — «коммунальную» Кафку.

Дружной командой инженеров мы стараемся предоставить Кафку в удобном для других разработчиков виде. При этом сами с удовольствием погружаемся в тонкости работы этого замечательного программного обеспечения и делимся своими знаниями.

Скучать некогда.

Кодирование и отладка сменяются жаркими техническими дискуссиями. Что приятно, к нам нередко приходят в гости другие программисты с идеями и вопросами («сделайте вжух и готово»). Сделать вжух непросто, но оно того стоит с профессиональной точки зрения.

Кафка является частью унифицированной интеграционной платформы ВТБ. Также мы активно используем Istio ServiceMesh, Apache ActiveMQ, RabbitMQ, средства файлового обмена.

Про унифицированную интеграционную платформу постараюсь рассказать в другой раз. А пока сосредоточимся на теме статьи.

Краткие итоги предыдущей и содержание текущей статьи

  • Описаны принципы работы и структура подсистемы Kafka tiered remote storage;

  • Успешно запущена реализация Remote Storage Manager из состава Apache Kafka;

  • Сборка дистрибутива Remote Storage Manager от Aiven Open частично успешна. E2e тесты завершались ошибкой и были отключены;

  • ==> Мы тут;

  • Починка e2e тестов дистрибутива Aiven Open;

  • Remote Storage Manager глазами программиста;

  • Рассматриваем структуру кода и настройки Aiven Open;

  • Запускаем Remote Storage Manager от Aiven Open;

  • Намечаем дальнейшие планы.

Чиним e2e тесты в Aiven Open

Все оказалось элементарно.

В прошлый раз собрать дистрибутив я пробовал командой ./gradlew clean build. E2e тесты при этом завершались с ошибкой и были отключены мной в конфигурации сборки.

Оказалось, что собирать нужно командой make. Пара минут и все собралось, хотя, конечно, сборка java кода командой make несколько непривычна.

Интересная особенность сборки нужна версия 17 JDK. Сборка с версией JDK 21 заканчивается ошибкой.

Remote Storage Manager глазами программиста

Реализации Remote Storage Manager должны имплементировать интерфейс: org.apache.kafka.server.log.remote.storage.RemoteStorageManager.

Интерфейс хорошо задокументирован в исходном коде Apache Kafka и содержит пять методов: copyLogSegmentData, два метода fetchLogSegment (отличаются одним входным параметром), fetchIndex и deleteLogSegmentData.

Декларация интерфейса приведена ниже:

Optional<CustomMetadata> copyLogSegmentData(
  RemoteLogSegmentMetadata remoteLogSegmentMetadata,
  LogSegmentData logSegmentData) 
  throws RemoteStorageException;

InputStream fetchLogSegment(
  RemoteLogSegmentMetadata remoteLogSegmentMetadata,
  int startPosition) 
  throws RemoteStorageException;

InputStream fetchIndex(
  RemoteLogSegmentMetadata remoteLogSegmentMetadata, 
  IndexType indexType) 
  throws RemoteStorageException;

void deleteLogSegmentData(
  RemoteLogSegmentMetadata remoteLogSegmentMetadata) 
  throws RemoteStorageException;

Метод copyLogSegmentData загружает данные файла сегмента партиции из локального хранилища в удаленное. Должны быть скопированы сами данные и индексы.

Первый метод fetchLogSegment извлекает из удаленного хранилища запрошенные данные файла сегмента партиции, начиная с заданного смещения. Во втором методе, кроме начального смещения указывается и конечное смещение (range).

Метод fetchIndex извлекает из удаленного хранилища индекс указанного типа.

Метод deleteLogSegmentData удаляет данные из хранилища.

Фактически это CRUD интерфейс к удаленному хранилищу.

Что такое CustomMetadata

В возвращаемом значении метода copyLogSegmentData присутствует тип CustomMetadata. Это дополнительные данные, которые разработчик может сформировать и передать при записи данных в удаленное хранилище, а затем при операциях извлечения и удаления данных получить назад.

CustomMetadata фактически обертка над массивом байт. Семантика хранящихся в массиве байт отдается на откуп разработчику, реализующему интерфейс RemoteStorageManager.

Как же обеспечено хранение и передача CustomMetadata между вызовами copy/fetch/delete, относящимися к одному сегменту? Это обязанность Remote Metadata Manager. При этом максимальный размер сохраняемого экземпляра CustomMetadata ограничен настройкой remote.log.metadata.custom.metadata.max.bytes.

Класс исключительной ситуации RemoteStorageException и его подтипы

Исключительная ситуация RemoteStorageException сигнализирует об ошибках работы с удаленным хранилищем.

Имеет одного наследника RemoteResourceNotFoundException для исключительных ситуаций отсутствия в удаленном хранилище запрошенного ресурса (файла или информации в другой форме).

Класс RemoteSegmentMetadata

Все методы интерфейса имеют входной параметр типа RemoteSegmentMetadata. Тип содержит описание метаданных сегмента в удаленном хранилище. Класс RemoteSegmentMetadata является наследником класса RemoteLogMetadata. Классы являются контейнерами данных.

Ниже приведена таблица с описанием полей классов RemoteLogMetadata и RemoteSegmentMetadata.

Поле

Тип данных

Описание

Класс RemoteLogMetadata

brokerId

int

Идентификатор брокера Кафка

eventTimestampMs

long

Время эпохи событий в миллисекундах

topicIdPartition

TopicIdPartition

Идентификатор партиции топика

Класс RemoteSegmentMetadata

remoteLogSegmentId

RemoteLogSegmentId

Идентификатор сегмента в удаленном хранилище

startOffset

long

Смещение (offset) первого сообщения в сегменте

endOffset

long

Смещение (offset) последнего сообщения в сегменте

maxTimestampMs

long

Максимальное значение метки времени сообщений сегмента

segmentLeaderEpochs

NavigableMap <Integer, Long>

Сопоставление значения эпохи лидера со смещением сообщений

segmentSizeInBytes

long

Размер файла сегмента в байтах

customMetadata

Optional <CustomMetadata>

Дополнительные данные, сформированные разработчиком реализации RemoteStorage Manager

state

RemoteLogSegmentState

Состояние сегмента

Класс TopicIdPartition

topicId

Uuid

Уникальный идентификатор топика

topicPartition

TopicPartition

Описание партиции топика

Класс TopicPartition

partition

int

Порядковый номер партиции

topic

String

Имя топика

RemoteLogSegmentState

enum
COPY_SEGMENT_STARTED((byte) 0)
COPY_SEGMENT_FINISHED((byte) 1)
DELETE_SEGMENT_STARTED((byte) 2)
DELETE_SEGMENT_FINISHED((byte) 3)

Состояние сегмента

Класс RemoteLogSegmentId

topicIdPartition

TopicIdPartition

Идентификатор партиции топика

id

Uuid

Уникальный идентификатор удаленной партиции топика

Структура хранения информации в локальном и удаленном хранилищах может быть разной, то есть имена и пути к файлам в бакетах S3 не обязательно должны совпадать с именами и путями файлов в локальном файловом хранилище Кафки. Наличие такой подробной информации о сегменте партиции позволяет строить гибкие схемы сопоставления структуры локального и удаленного хранилищ.

Метод copyLogSegmentData

Метод предназначен для копирования данных и индексов сегмента из локального хранилища брокера Кафка в удаленное хранилище S3.

Метод возвращает Optional<CustomMetadata>. С CustomMetadata разобрались в ранее. Разработчик может явно указать, что не формировал CustomMetadata, вернув Optional.empty(). Или вернуть сформированные метаданные для дальнейшего использования.

Метод может создать исключительную ситуацию типа RemoteStorageException.

Метод получает на вход два параметра:

  • remoteLogSegmentMetadata — метаданные для копирования в удаленное хранилище;

  • logSegmentData — помещаемые в удаленное хранилище данные.

Класс LogSegmentData является контейнером для описания данных сегмента в локальном хранилище и имеет в своем составе поля:

logSegment – actual log segment file
offsetIndex – offset index file
timeIndex – time index file
transactionIndex – transaction index file, which can be null
producerSnapshotIndex – producer snapshot until this segment
leaderEpochIndex – leader-epoch-index until this segment

Класс хранит не сами данные, а ссылки на них в виде путей.

Из описания видно, что данные и индексы, кроме transactionIndex обязательно присутствуют в локальном хранилище.

Еще одно полезное практическое наблюдение. В классе LogSegmentData представлен не как путь к файлу, а как ByteBuffer. Между операциями сохранения данных файл leaderEpochIndex может меняться. Поэтому для обеспечения консистентности передается актуальный, на момент старта операции сохранения, слепок. Кроме того, для всех сегментов партиции файл leaderEpochIndex присутствует в единственном экземпляре, постепенно меняя содержимое. От реализации Remote Storage Manager ожидается, что она сохранив слепок leaderEpochIndex, вернет именно его при запросах данных и индексов сегментов.

С учетом вышеизложенных рассуждений алгоритм работы метода может состоять из следующих крупных блоков (вопросы оптимизации алгоритма пока отложим в сторону):

  1. Проверить входные параметры;

  2. Получить данные локального хранилища из файлов:

    — сегмента лога (расширение файла log);
    — индекса по смещению (расширение файла index);
    — индекса по времени (расширение файла timeindex);
    — индекса producerSnapshot (расширение файла snapshot);

  3. Проверить наличие файла индекса транзакций () в локальном хранилище. При наличии файла получить из него данные;

  4. Получить данные leaderEpochIndex из байтового буфера;

  5. По заранее спроектированной схеме вычислить имена и пути файлов в удаленном хранилище;

  6. При необходимости по заранее спроектированной схеме трансформировать данные, полученные из локального хранилища и слепка leaderEpochIndex в форматы данных удаленного хранилища;

  7. Записать данные в удаленное хранилище;

  8. При необходимости запомнить какие-либо метаданные для последующего использования в методах fetch* и deleteLogSegment сформировать и вернуть как результат работы метода CustomMetadata;

  9. При возникновении ошибок поднять RemoteStorageException.

Таким образом при проектировании/анализе готовой реализации метода возникают подзадачи:

  • определить структуру и форматы хранения информации в удаленном хранилище;

  • определить алгоритмы маппинга структуры файлов локального хранилища на структуру файлов удаленного хранилища;

  • определить алгоритмы маппинга форматов данных локального и удаленного хранилищ;

  • при необходимости определить состав и формат данных, которые должны передаваться в CustomMetadata.

Методы fetchLogSegment

Методы отвечают за извлечение из удаленного хранилища данных сегмента. Сигнатуры методов отличаются лишь одним параметром int endPosition.

Казалось бы, что имея в RemoteLogSegmentMetadata информацию о размере сегмента данных, можно было бы ограничиться одним методом. Либо это недочет проектирования, либо конечное смещение — байт от начала файла сегмента не всегда известно.

Отмечу, что startPosition и endPosition это смещения в байтах, а не смещения сообщений в сегменте.

Формат хранение сообщений в сегменте известен. При попытке чтения сообщений смещения преобразуются ядром Кафки в смещения в байтах. Это позволяет Remote Storage Manager вернуть массив ранее сохраненных байт с данными сегмента, не задумываясь об их семантике и границах сохраненных в сегменте сообщений.

При успешной работе методы должны вернуть InputStream с данными.

При ошибках общего характера создать исключение RemoteStorageException. Если ошибка в том, что не удалось обнаружить в удаленном хранилище данные создать RemoteResourceNotFoundException.

Метод fetchIndex

Метод по полученным в качестве аргументов метаинформации и типу запрашиваемого индекса должен вернуть InputStream с данным индекса. В случае ошибок общего характера создать исключение RemoteStorageException. В случае, если не удалось в удаленном хранилище обнаружить данные запрошенного индекса, создать исключение RemoteResourceNotFoundException.

Вспомним, что индекса транзакций может и не быть в локальном хранилище. Это значит, что и в удаленном хранилище его не будет. При этом по-хорошему не должно быть вызова fetchIndex с параметром indexType равным TRANSACTION. Либо при таком вызове нужно сформировать исключение RemoteResourceNotFoundException. В реализации «в лоб» при вызове удаленного S3 хранилища произойдет ошибка 404 notFound. Однако, есть идея небольшой оптимизации. В методе копирования данных сохранить в CustomMetadata виде битовых флагов виды сохраняемых индексов. В методе fetchIndex анализировать флаги и если в метаинформации указано, что индекс не сохранялся в ходе выполнения метода copyLogSegmentData, то формировать исключение без вызова удаленного хранилища.

Кроме того, напомню, что leader epoch index имеет один экземпляр на партицию. При сохранении передается слепок текущего состояния. Соответственно, возвращать необходимо слепок, соответствующий сегменту.

Метод deleteLogSegmentData

Метод удаляет данные и индексы сегмента из удаленного хранилища.

Метод имеет один аргумент RemoteLogSegmentMetadata remoteLogSegmentMetadata, описывающий удаляемый сегмент.

При ошибках удаления метод должен создавать исключительную ситуацию типа: RemoteStorageException.

При реализации метода следует учитывать, что он может быть вызван несколько раз и результаты предыдущих вызовов (частичное или полное удаление данных) не должны вызывать ошибок последующих вызовов.

Копаемся в реализации Aiven Open

Анализ требований

Прежде чем приступить к изучению технических решений и кода всегда полезно понять требования и ограничения реализации. Технические решения есть способ удовлетворения требований.

Открываем и читаем раздел файла README.md в github Aiven Open.

  1. Обеспечить поддержку AWS S3, Google Cloud Storage и Azure Blob Storage (популярные в мире облачные провайдеры S3);

  2. Необходимо реализовать сжатие данных. Сжатие должно быть опционально и включаться/выключаться настройками. Необходимо избежать дублирования сжатия средствами клиента Кафки и remote storage;

  3. Необходимо реализовать шифрование данных. Тоже как опцию;

  4. Следует избегать излишнего обращения к удаленному хранилищу за данными.

Первое требование понятно. Но:

  • Эксперименты и первичные тесты проще проводить на локальной машине;

  • Популярные облачные провайдеры Amazon итд сейчас труднодоступны для клиентов из РФ;

  • Размещение в публичном облаке требует финансовых затрат.

Второе требование, на мой взгляд, слегка избыточно. При необходимости клиент Кафка может на лету сжимать данные и при вычитке разжимать. Многие реализации провайдеров S3 хранилища поддерживают сжатие данных при записи на диск. Добавлять еще и сжатие данных на уровне алгоритмов работы Remote Storage Manager хороший способ усложнения при скромном улучшении пользовательских характеристик продукта.

Третье требование — нетривиальны в реализации с точки зрения требований государственных регуляторов к использованию средствам криптографии.

Четвертое требование можно трактовать двояко.
Первая трактовка — оптимальность трафика к удаленному хранилищу. Прекрасное требование.
Вторая трактовка — минимизировать обращение к тарифицируемым (платным вызовам) S3 хранилища облачного провайдера. Разумно, но в on-premise среде не актуально.

Вывод — реализация Aiven Open предназначена для использования в публичных облаках широким кругом пользователей вне РФ. Требует дополнительного анализа применимости в сложных Enterprise ИТ ландшафтах с высокими регуляторно-правовыми требованиями.

Для «позапускать на ноутбуке» в свободное время не самый оптимальный вариант. Зато с открытым исходным кодом, хорошо реализовано (это я забегаю вперед) и бесплатно.

Однако, пора уже поизучать код.

Ключевые технические решения

И снова спасибо разработчикам Aiven Open за описание принятых технических решений.

Выделение слоя работы с различными провайдерами хранилища данных S3 (provider backends)

Различные реализации S3 имеют свои особенности. Как правило, работа с S3 идет через предоставляемых производителем SDK, а не на уровне самостоятельного формирования http вызовов разработчиком клиентского приложения.

SDK удобная абстракция, скрывающая ненужные детали работы с http. Но каждый облачный провайдер и разработчик реализации S3 стремится предоставить собственный SDK. Выделение слоя работы с различными реализациями S3 позволяет не складывать в одну кучу все используемые SDK, а выбирать при сборке необходимые.

Вывод. Если требуется поддержка нескольких реализаций S3 выделение слоя provider backends хорошая идея.

Разделение данных на цепочки (chunking)

Данные в Кафке хранятся в файлах сегментов партиций. Размер файла сегмента настраивается при создании топика. Сегменты могут иметь весьма большие размеры — единицы и десятки гигабайт. Клиенты при чтении запрашивают данные с брокеров Кафка пачками (диапазон номеров сообщений).

При получении сообщений по диапазону из local storage брокер Кафка выполняет локальный вызов и имеет возможность извлечь из файла сегмента партиции только необходимые данные без загрузки всего файла в память брокера.

При выборке по диапазону сообщений из S3 хранилища брокер Кафка выполняет удаленный сетевой http вызов. При этом при реализации «в лоб» из S3 хранилища по сети будет передаваться все содержимое файла сегмента партиции даже, если требуется получить лишь малую часть данных.

Замечательно, что большая часть клиентов S3 умеют по запросу передавать не весь файл, а указанный диапазон байт. Хотя это и не избавляет от удаленного сетевого http вызова.

Казалось бы задача решена. Но есть нюанс.

Описанная оптимизация с запросом диапазона байт из хранилища S3 работает только если файлы сегментов записаны без изменений. При включении шифрования или сжатия данных средствами RemoteStorage исходные набор байт трансформируется. Чтобы извлечь из зашифрованного или сжатого файла диапазон байт, указанных для исходного файла, нужно сначала расшифровать или разжать данные. При больших размерах данных операции расшифровки и разжатия могут быть весьма затратны.

Что предлагают разработчики Aiven Open для оптимизации? При сохранении данных файла сегмента партиции в S3 хранилище, разбить их на более мелкие части (chunks). Каждую часть отдельно зашифровать/сжать. Затем все части снова соединить в один файл и построить индекс соответствия между диапазонами байт исходного файла и частями полученного файла. При этом в индексе нужно учитывать, что части одного сообщения Кафка могут после манипуляций быть размещены в соседних chunk-ах.

Вот так требования наличия шифрования и сжатия средствами Remote Storage Manager приводят к необходимости реализации непростой подсистемы chunks с индексами.

При отключенном шифровании и сжатии подсистема разбиения на части (chunks) не нужна. А она вообще отключается принудительно в настройках? Проверю позже. В любом случае наличие подсистемы chunks не упрощает понимание базовых концепций Remote Storage Manager на уровне исходного кода, но позволяет реализовать эффективные алгоритмы.

Сжатие данных средствами Remote Storage Manager

Используется алгоритм Zstandard. Сжатие может быть выключено или включено явно. Также заявлен режим эвристики использования сжатия клиентом Кафка. Функциональность весьма крутая, но опять же исходный код от наличия эвристик проще не станет.

Шифрование

Для каждого файла сегмента генерируется по алгоритму AES-256 ключ для шифрации. Этим первым ключом шифруются данные сегмента. В то же время сам ключ шифруется публичной частью второго ключа (по алгоритму RSA). Первый зашифрованный ключ прикрепляется к зашифрованным им данным файла сегмента партиции. Таким образом обеспечивается быстрая ротация ключей шифрования данных (первый ключ), что не позволяет взломав первый ключ, расшифровать данные всех файлов сегментов. Второй ключ является «отмычкой» для всех первых ключей. Вторые ключи хранятся в защищенной связке ключей (key ring), периодически ротируются. Хранение в связке ключей позволяет не использовать ключ для шифрации новых данных, но расшифровать ранее зашифрованные ключом данные.

Хороший подход. Но для шифрования передаваемых данных можно использовать TLS. А шифрование сохраняемых файлов отдать на откуп реализации S3 хранилища.

S3 Multipart Upload

S3 провайдеры поддерживают режим загрузки «частями» файлов большого размера. Вполне хорошая идея.

Локальный кэш данных

При получении chunk можно его закэшировать, надеясь, что при последовательном чтении данных клиентом Кафка следующая пачка требуемых сообщений опять окажется в том же chunk. Можно даже в фоне запросить следующий chunk и сохранить его в кэш (prefetching). Кэширование повысит скорость получения данных клиентами Кафка за счет оптимизации количества удаленных сетевых вызовов.

Разработчиками заявлена реализация кэша с хранением данных в памяти брокера Кафка и с хранением в файловой системе брокера.

Идея годная даже если бы не было chunk, а файлы сегметнов передавались целиком.

Структура исходного кода

Осматриваемся на местности

Концепции хорошо, а их реализация в исходном коде — еще лучше. Следуя этому принципу, разберем структуру исходного кода Remote Storage Manager Aiven Open.

Клонируем репозиторий исходного кода с github к себе на компьютер. Код представляет собой многокомпонентный gradle проект с вкраплениями скриптов сборки make.

Назначение папок кода весьма прозрачно.

Часть из них не содержат исходного кода на Java и выполняют вспомогательную роль.

Подпапка checkstyle содержит конфигурационные файлы проверки качества Java кода. Инструмент checkstyle хорошо известен, часто используется в мире Java для автоматизированного контроля качества исходного кода и поиска наиболее часто встречающихся ошибок, и недочетов, допущенных разработчиком.

Подпапка gradle содержит gradle wrapper. Компонент, позволяющий проводить сборку исходного кода без необходимости установки Gradle. Опять же ничего специфичного, частая практика использования системы сборки Gradle.

Подпапка demo содержит docker compose файлы для запуска демонстрационных примеров в эмуляторах различных облачных провайдеров.

Подпапка docker содержит конфигурационные файлы для сборки docker контейнера.

Подпапка e2e содержит исходный код end to end тестов.

Наконец-то мы добрались до папок с исходным кодом, относящимся к реализации Remote Storage Manager:

  • Подпапка commons содержит вспомогательный исходный код общего назначения;

  • Подпапка core содержит ядро реализации Remote Storage Manager;

  • Подпапка storage является корневой папкой с реализациями бэкендов различных провайдеров S3 (AWS, Google, Azure итд).

Ищем вход

Любая реализация Kafka Remote Storage Manager должна имплементировать интерфейс: org.apache.kafka.server.log.remote.storage.RemoteStorageManager.

В коде Aiven Open обнаруживается одна реализация интерфейса: org.apache.kafka.server.log.remote.storage.RemoteStorageManager в gradle модуле core. Название класса реализации тоже RemoteStorageManager.

Конструктор класса прост. В нем заполняется поле time экземпляром: org.apache.kafka.common.utils.Time. Требуется для вычисления метрик мониторинга.

После конструктора для окончательной настройки работы компонента ядро Кафка вызовет метод configure.

Первым шагом производится получение настроек Remote Storage Manager, заданных в файле конфигурации брокера Кафка (по умолчанию файл server.properties). В документации показан пример настройки. Но найти полный список настроек приведен или только основные настройки по документации не удалось. Посмотрим исходный код класса java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java. Вот он, полный перечень настроек и даже с описанием.

Судя по настройкам механизм нарезки на chunk-и нельзя отключить, можно лишь указать размер chunk-a. Не очень гибко, хотя и помогает избежать ошибок конфигурирования при включении/выключении шифрования и сжатия.

По исходному коду метода configure легко понять, что реальная работа по взаимодействию с S3 делегируется backend-у, состоящему из трех компонент:

  1. uploader — запись данных в S3;

  2. fetcher — чтение данных из S3;

  3. deleter — удаление данных из S3.

Логично предположить, что компоненты связаны общей моделью хранения данных в S3.

Хорошо. Теперь переходим к изучению части кода, ответственной за взаимодействие в tiered remote storage.

Код записи данных в удаленное S3 хранилище

Проанализируем код реализации метода:

public Optional<CustomMetadata> copyLogSegmentData(
    final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
    final LogSegmentData logSegmentData)
    throws RemoteStorageException {
    
    // Проверить, что входные параметры не равны null. 
    // Выглядит как перестраховка
    Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentId must not be null");
    Objects.requireNonNull(logSegmentData, "logSegmentData must not be null");

    // Запись в лог
    log.info("Copying log segment data, metadata: {}", remoteLogSegmentMetadata);

    // Создаем билдер для заполнения custom metadata
    final var customMetadataBuilder =
        new SegmentCustomMetadataBuilder(customMetadataFields, objectKeyFactory, remoteLogSegmentMetadata);

    // Фиксируем время начала операции
    final long startedMs = time.milliseconds();
    
    // Открыть поток чтения локального файла сегмента
    try (final InputStream logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) {
        ... порезать на части, зашифровать, заархивировать итд 
        ...
        // Загрузить данные сегмента в remote storage
        uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher, customMetadataBuilder);
        ... 
    }
    
    final SegmentIndexesV1 segmentIndexes = uploadIndexes(
                remoteLogSegmentMetadata, logSegmentData, encryptionMetadata, customMetadataBuilder);
                
    // Мммм.. Метаданные помимо customMetadata сохраняются еще и в remoteStorage 
    final SegmentManifest segmentManifest = new SegmentManifestV1(
                chunkIndex, segmentIndexes, requiresCompression, encryptionMetadata, remoteLogSegmentMetadata);
    uploadManifest(remoteLogSegmentMetadata, segmentManifest, customMetadataBuilder);
    ...
    
    metrics.recordSegmentCopyTime(
            remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
            startedMs, time.milliseconds());

    final var customMetadata = buildCustomMetadata(customMetadataBuilder);

    log.info("Copying log segment data completed successfully, metadata: {}", remoteLogSegmentMetadata);

    return customMetadata;        
}

Код неплохо читаем, но:

  • Метаданные размазаны между customMetadata и uploadManifest в remote storage;

  • Не сразу понятно, что происходит с customMetadata, так как они передаются, как параметры методов upload;

  • В случае сбоя загрузки одного из элементов будет поднято RemoteStorageException и в remote storage могут остаться «хвосты».

Методы uploadSegmentLog и uploadManifest не содержат ничего интересного. По факту они вызывают uploader для загрузки данных в remote storage.

В методе uploadIndexes обнаруживается интересная особенность — в ходе трансформации индекса анализируется его размер. Анализ выполняется с помощью вызова статического методаindexSize.

static int indexSize(final Path indexPath) throws RemoteStorageException {
        try {
            final var size = Files.size(indexPath);
            if (size > Integer.MAX_VALUE) {
                throw new IllegalStateException(
                    "Index at path "
                        + indexPath
                        + " has size larger than Integer.MAX_VALUE");
            }
            return (int) size;
        } catch (final IOException e) {
            throw new RemoteStorageException("Error while getting index path size", e);
        }
    }

Получается, что файлы индексов с размером более Integer.MAX_VALUE вызовут ошибку записи. В подавляющем числе случаев размер файлов меньше, но потенциально грабли могут быть.

Итог: неплохо, но можно попробовать сделать код более читаемым.

Код чтения данных сегмента

    @Override
    public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                       final int startPosition,
                                       final int endPosition) throws RemoteStorageException {
        ...
        try {
            ...
            final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);

            final var suffix = ObjectKeyFactory.Suffix.LOG;
            final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);
            return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
                .toInputStream();
        }                                   
    }

Получаем манифест сегмента из remote storage, затем пользуясь магией FetchChunkEnumeration делается реальная работа по доставанию нужных байт.

Итог: читается код непросто, какая небольшая то магия.

Код чтения индексов

public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final IndexType indexType) throws RemoteStorageException {
    ...
    return segmentIndexesCache.get(
                key,
                indexType,
                () -> fetchIndexBytes(key, segmentIndex, segmentManifest)
            );
}

Видимо, делается попытка найти нужные данные в кэше индексов и при необходимости запросить их из remote storage.

Итог: Код понятен.

Код удаления данных из remote storage

    @Override
    public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
        throws RemoteStorageException {

        log.info("Deleting log segment data for {}", remoteLogSegmentMetadata);

        metrics.recordSegmentDelete(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
            remoteLogSegmentMetadata.segmentSizeInBytes());

        final long startedMs = time.milliseconds();

        try {
            final Set<ObjectKey> keys = Arrays.stream(ObjectKeyFactory.Suffix.values())
                .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s))
                .collect(Collectors.toSet());
            deleter.delete(keys);
        } catch (final Exception e) {
            metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId()
                .topicIdPartition().topicPartition());
            throw new RemoteStorageException(e);
        }

        metrics.recordSegmentDeleteTime(
            remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
            startedMs, time.milliseconds());

        log.info("Deleting log segment data for completed successfully {}", remoteLogSegmentMetadata);
    }

В коде сначала формируем список удаляемых объектов и затем вызываем `deleter.delete(keys)`, который в цикле вызывает метод удаления соответствующего S3 SDK.

Итог: код понятен.

Вывод

Реализация хорошо продумана, код читаем, хотя местами сложен для понимания. Но кто из нас всегда пишет идеальный код, пусть первым кинет камень.

Запускаем реализацию AivenOpen

Запуск производился на Linux.

Настраиваем и запускаем Minio

Перед запуском необходимо установить Minio. На сайте Minio и в интернете есть большое количество инструкций по установке на различные дистрибутивы Linux. Не будем их дублировать.

Для хранения данных необходимо создать папку. В моем случае папка имеет путь ~/Projects/kafka/data/aiven/minio. Вы можете выбрать другую папку на ваш вкус.

Из консоли запускаем Minio:

minio server --console-address localhost:9999 ~/Projects/kafka/data/aiven/minio

В браузере открываем ссылку http://localhost:9999 и видим web консоль сервера Minio.

Логинимся с учетной записью админа по умолчанию minioadmin/minioadmin. В реальной жизни в целях информационной безопасности, конечно же надо поменять логин и пароль учетной записи.

В web консоли создаем S3 bucket с именем kafka-bucket с настройками по умолчанию.

Поскольку AWS S3 client не может работать без указания S3 region необходимо задать регион по умолчанию. В web консоли выбираем пункт меню Configuration и задаем настройку Region/Server Location равной local-ru (имя может быть выбрано на ваше усмотрение).

После установки региона необходимо перезапустить Minio сервер.

Собираем из исходников дистрибутив AivenOpen

Клонируем код с github и в корневой папке консольной командой `make` выполняем сборку. Признаком успешной сборки является вывод "BUILD SUCCESSFUL".

Копируем библиотеки AivenOpen в инсталляцию Apache Kafka

Поскольку Minio декларирует совместимость с Amazon S3 нам понадобится скопировать из папки исходных кодов AivenOpen в папку libs инсталляции Кафка следующие файлы:

  • Из архива (zip или tar без разницы) ./build/distributions/ все файлы. Обратите внимание, что копировать необходимо jar файлы без учета наличия папки внутри архива. Другими словами jar файлы должны быть скопированы без создания подпапки внутри папки libs;

  • Из архива (zip или tar без разницы) ./storage/s3/build/distributions все jar файлы без учета наличия папки внутри архива.

Настраиваем и запускаем кластер Кафка

  1. Zookeeper не требует специальной настройки. Запуск Zookeeper тоже не имеет особенностей:

    ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

  2. Конфигурируем брокер Кафка. В конец файла ./config/server/properties инсталляции Кафка добавляем строки:

# ----- Enable tiered storage -----

remote.log.storage.system.enable=true

# ----- Configure the remote log manager -----

# This is the default, but adding it for explicitness:
remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager

# Put the real listener name you'd like to you here:
remote.log.metadata.manager.listener.name=PLAINTEXT

# You may need to set this if you're experimenting with a single broker setup:
rlmm.config.remote.log.metadata.topic.replication.factor=1
rlmm.config.remote.log.metadata.topic.num.partitions=1


# ----- Configure the remote storage manager -----

# Here you need either one or two directories depending on what you did in Step 1:
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager

# 4 MiB is the current recommended chunk size:
rsm.config.chunk.size=4194304

# ----- Configure the storage backend -----

# Using S3 as an example:
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.s3.bucket.name=kafka-bucket
rsm.config.storage.s3.region=local-ru
rsm.config.storage.s3.endpoint.url=http://localhost:9000
rsm.config.storage.s3.path.style.access.enabled=true
# default minio server login/password. Change it.
rsm.config.storage.aws.access.key.id=minioadmin
rsm.config.storage.aws.secret.access.key=minioadmin

# The prefix can be skipped:
#rsm.config.storage.key.prefix: "some/prefix/"

# ----- Configure the fetch chunk cache -----

rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
# S3 local cache folder path. Change it.
rsm.config.fetch.chunk.cache.path=/home/mike/Projects/kafka/data/aiven/s3_cache
# Pick some cache size, 16 GiB here:
rsm.config.fetch.chunk.cache.size=17179869184
# Prefetching size, 16 MiB here:
rsm.config.fetch.chunk.cache.prefetch.max.size=16777216

Обратите внимание, что в настройке rsm.config.storage.s3.region необходимо обязательно указать имя региона и в настройке rsm.config.storage.s3.path.style.access.enabled указать значение true. Последняя настройка указывает клиенту AWS S3, что вместо использования имени региона в имени хоста (поведение по умолчанию) необходимо использовать имя региона в url path (в таком режиме Minio ожидает HTTP запросов).

Стартуем брокер Кафка из консоли:

./bin/kafka-server-start.sh -daemon ./config/server.properties

Брокер должен успешно стартовать. В логах брокера не должно быть ошибок.

Создаем топик с поддержкой remote storage

Из консоли выполняем команду:

bin/kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic1 \
    --config remote.storage.enable=true \
    --config segment.bytes=512000 \
    --config local.retention.bytes=1 \
    --config retention.bytes=10000000000000

Проверяем работоспособность

Для проверки работоспособности запишем в созданный топик сообщения:

bin/kafka-producer-perf-test.sh \
    --topic topic1 --num-records=10000 --throughput -1 --record-size 1000 \
    --producer-props acks=1 batch.size=16384 bootstrap.servers=localhost:9092

Через некоторое время после окончания записи можно увидеть с помощью web консоли Minio, что в S3 bucket с именем kafka-bucket начали появляться файлы.

Осмысливаем результат

Итак, получилось в целом понять структуру, работу кода и запустить реализацию RemoteStorageManager от AivenOpen.

Что хорошо, реализация рабочая. Но если хочется погрузиться в тонкости работы именно RemoteStorageManager, то исходный код (исключительно на мой взгляд) перегружен реализацией фич. Реализованные фичи приводят к тому, что формат хранения данных в remote storage сильно отличается от формата хранения данных брокером Кафка.

  • затруднено понимание, что именно скопировано в удаленное хранилище;

  • нельзя скопировать файлы из Minio и изучить их содержимое утилитами Кафка.

В таком виде «играться» с подсистемой RemoteStorageManager и проверять различные гипотезы и технические решения неудобно.

Поэтому мне пришла в голову мысль написать собственную простую реализацию RemoteStorageManager. Планирую рассказать об этом в следующей статье.

Комментарии (0)