Всем привет! Меня зовут Елена Калинина, и я технический менеджер проектов в команде YDB — в рамках Yandex Infrastructure наша команда создаёт технологии для работы всего Яндекса.

YDB — это распределённая отказоустойчивая СУБД с открытым исходным кодом. Для потоковых нагрузок в YDB реализован механизм персистентных очередей сообщений под названием YDB Topics. Топики YDB используются в качестве основной шины данных в Яндексе, что позволяет многократно экономить на серверах и их обслуживании.

Но что если какая‑то компания соблазнится такой экономией и захочет перейти с Apache Kafka на YDB Topics? Без API‑совместимых решений придётся переписывать весь код? К счастью, для работы с топиками YDB можно использовать Kafka API — и в этой статье я подробно покажу, как это сделать, на примере чтения и записи в поток данных и дальнейшей выгрузки в объектное хранилище в облаке.

О чём статья:

  1. Немного предыстории, терминологии и базовых понятий

  2. Какие сценарии работы через Kafka API рассмотрены в этой статье

  3. Создание объектов

    • Создание базы данных YDB

    • Создание потока данных Data Streams

    • Создание выделенных потребителей для потока данных

    • Создание бакета в хранилище S3

  1. Создание сервисных аккаунтов, выдача ролей и создание ключей

  1. Подготовка окружения

  2. Запись и чтение из потока данных с помощью Kafka cli

  1. Выгрузка из потока данных в хранилище S3

    • Совет по безопасности

  2. Итоги

Немного предыстории, терминологии и базовых понятий

В 2013 году в качестве шины данных в Яндексе начали использовать Apache Kafka, но в этом продукте команде нравилось далеко не всё. При количестве данных, которое накоплено в компании, в Apache Kafka становилось сложно управлять правами доступа, организовывать распределённую работу нескольких команд, расширять или сокращать кластер при необходимости и многое другое.

При отсутствии подходящего решения в открытом доступе, пришлось делать своё. Так в 2017 году и появилась система YDB Topics, выложенная в опенсорс в составе платформы данных YDB — катастрофоустойчивой и масштабируемой на тысячи узлов базы данных. И как показал последующий опыт использования YDB Topics, по сравнению с другими опенсорс‑решениями, в крупной компании шина данных на базе технологий YDB позволяет экономить на оборудовании и его обслуживании в несколько раз.

Наша СУБД существует в трёх вариантах:

В данной статье мы рассмотрим работу с YDB Topics через Kafka API в Yandex Cloud. Об особенностях работы с Kafka API при использовании опенсорс‑версии YDB вы можете прочитать здесь.

YDB поддерживает одновременное выполнение транзакционных (OLTP) и потоковых нагрузок, а также с недавних пор позволяет работать со сложными аналитическими запросами (OLAP). Для работы с потоковыми нагрузками в Yandex Cloud используется отдельный сервис Yandex Data Streams, тесно интегрированный с Yandex Managed Service for YDB. Любой поток данных, который вы создаёте в Yandex Data Streams, создаётся внутри одной из баз данных Yandex Managed Service for YDB.

Исторически потоковая обработка была представлена в Yandex Data Streams в виде сервиса с доступом по Kinesis API и использованием соответствующей терминологии — вы работаете с потоками данных. За сценой же потоки Yandex Data Streams основаны на топиках YDB. Поток данных, или топик в YDB — это сущность для хранения неструктурированных сообщений, предназначенная для их доставки множеству подписчиков. Топик в YDB, как мы уже рассказывали, является аналогом топика в Apache Kafka.

Какие сценарии работы через Kafka API рассмотрены в этой статье

После знакомства с этой статьёй читатель сможет использовать kafka cli для чтения или записи в поток данных Yandex Data Streams, и Kafka Connect для выгрузки данных из потока данных в объектное хранилище, в нашем случае — Yandex Object Storage.

В данной статье не рассматриваются вопросы использования других API для работы с потоками данных, но вы можете прочитать о них здесь: YDB Topic API, Kinesis API.

Создание объектов: база данных YDB, поток данных Data Streams, бакет S3

Нам понадобится предварительно настроенное облачное окружение, где уже созданы базовые аккаунты, облака и каталоги. Если у вас ещё не было опыта работы с ресурсной моделью Yandex Cloud, то рекомендую также ознакомиться с этой статьёй.

Как создать в облаке объекты: базу данных YDB, поток данных Data Streams, S3-бакет
  1. Создание базы данных YDB

    В консоли управления в списке слева выберем нужное облако, а в нём — каталог, в котором будет создана БД. На странице обзора каталога нажмём справа вверху «Создать ресурс» и выберем «База данных YDB».

    На странице создания базы данных в качестве имени введём ydb-habr-demo-1. Для остальных параметров можно оставить значения по умолчанию. Подробно о том, что означают параметры базы данных YDB, можно прочитать в документации. Затем нажмём кнопку «Создать БД».

    После создания базы данных можно перейти на страницу обзора базы данных. Для дальнейшей работы понадобится имя базы данных, в нашем примере это ydb-habr-demo-1.

    Обратите внимание, что мы создали serverless-базу данных, и примеры скриптов ниже предназначены для работы с потоками данных в рамках serverless-БД.

  1. Создание потока данных Data Streams

    В консоли управления выберем нужное облако и в нём каталог, в котором создана БД YDB. Далее нажмём справа вверху «Создать ресурс» и выберем «Data Streams».

    На странице создания потока данных в поле «База данных» выберем ydb-habr-demo-1, а в поле «Имя» введём yds-habr-demo. Для остальных параметров можно оставить значения по умолчанию. Подробно о том, что означают параметры потока данных Data Streams, тоже можно прочитать в документации. Затем нажмём «Создать».

    После создания потока данных перейдём на страницу обзора потока данных. Для дальнейшей работы понадобятся имя потока данных, имя базы данных, путь к базе данных и эндпоинт Kafka API, в нашем примере:

    Имя потока данных: yds-habr-demo

    Имя базы данных: ydb-habr-demo-1

    Путь к базе данных: /ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o

    Важно: в вашем каталоге путь будет другим, скопировать его можно со страницы обзора базы данных или обзора потока.

    Эндпоинт Kafka API: ydb-03.serverless.yandexcloud.net:9093

    В вашем каталоге эндпоинт Kafka API может быть другим. Копируйте эндпоинт Kafka API со страницы обзора базы данных или обзора потока.

  2. Создание выделенных потребителей для потока данных

    Для чтения из потока данных нужно создать выделенного потребителя — это аналог kafka consumer. Для созданного потока данных на панели меню слева выберем пункт «Выделенные потребители», нажмём справа вверху «Создать потребитель», зададим для него имя, например habr-demo-consumer, и нажмём «Создать».

    Для примера ниже с отгрузкой в объектное хранилище потребуется ещё выделенный потребитель, название которого начинается на connect‑ (это особенности работы коннектора). В нашем примере будет использован выделенный потребитель connect-habr-demo-consumer.

  3. Создание S3-бакета в объектном хранилище Yandex Object Storage

    На странице обзора каталога в Yandex Cloud нажмём справа вверху «Создать ресурс» и выберем «Бакет». На странице создания бакета зададим имя ydb-habr-bucket, для остальных параметров снова можно оставить значения по умолчанию. Для дальнейшей работы понадобится имя бакета, в нашем примере ydb-habr-bucket.

Для доступа к созданным объектам в облаке нам понадобятся сервисные аккаунты, обладающие нужными ролями. Кроме того, для этих сервисных аккаунтов необходимо выписать ключи. Пойдём по порядку.

Создание сервисного аккаунта для доступа к потоку данных и скоупированного ключа

На дашборде каталога перейдём в сервис Identity and Access Management и нажмём справа вверху «Создать сервисный аккаунт». Зададим имя сервисного аккаунта, например sa-habr-demo-kafka, и добавим этому сервисному аккаунту роли:

  • ydb.viewer — для чтения данных из потока, 

  • ydb.editor — для записи данных в поток,

  • ydb.kafkaApi.client — для доступа к потоку данных по Kafka API.

Далее на странице обзора сервисного аккаунта нажмём справа вверху «Создать новый ключ» и выберем «Создать API‑ключ». Для ключа нужно выбрать область действия — yc.ydb.topics.manage. Можно также задать описание и срок действия.

После создания ключа он будет выведен на экран — его необходимо сохранить, он понадобится в скриптах для чтения/записи по Kafka API.

Создание сервисного аккаунта для доступа к S3-бакету и статического ключа

Аналогично предыдущему пункту перейдём на дашборде каталога в сервис Identity and Access Management и нажмём справа вверху «Создать сервисный аккаунт». Зададим имя сервисного аккаунта, например sa-habr-demo-bucket, и добавим этому сервисному аккаунту роль storage.uploader.

Далее на странице обзора сервисного аккаунта нажмём справа вверху «Создать новый ключ» и выберем «Создать статический ключ доступа». Аналогично, после создания ключа он будет выведен на экран, его также нужно сохранить для использования в скриптах выгрузки в объектное хранилище.

Подготовка окружения

Для работы с дальнейшими примерами нам понадобится установленная Kafka. Здесь есть подробная инструкция по установке. В примере ниже предполагается, что запуск скриптов происходит из папки, в которую установлена Kafka, а сами файлы лежат в соседней папке kafka_habr_examples. Для работы также понадобится распакованный коннектор Aiven's S3 Sink Connector for Apache Kafka. Примеры скриптов ниже были сделаны для Kafka версии 3.6.1 и S3 Connector версии 2.15.0.

Структура папок и файлов для примеров из статьи

Папка

Содержимое

kafka_2.13-3.6.1

kafka установлена в эту папку

S3-connector-for-apache-kafka-2.15.0

коннектор распакован в эту папку

kafka_habr_examples

содержит файлы:
consumer.properties
kafka_connect.sh
kafka_read.sh
kafka_write.sh
producer.properties
s3-sink.properties
worker.properties

Запись в поток данных с помощью Kafka cli

Для записи необходимо подготовить следующие файлы. В комментариях подробно написано, какие параметры объектов, заведённых раньше, должны быть прописаны при вызовах Kafka cli:

kafka_write.sh

#--broker-list      эндпоинт kafka api для потока данных, в который пишем
#--topic            полный путь к потоку данных, в который пишем, 
#                   в формате <путь к БД YDB>/<имя потока данных>
#--producer.config  путь к файлу с конфигурацией потока данных

bin/kafka-console-producer.sh --broker-list  ydb-03.serverless.yandexcloud.net:9093  \
                              --topic /ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o/yds-habr-demo \
                              --producer.config ../kafka_habr_examples/producer.properties

producer.properties

#bootstrap.servers          эндпоинт kafka api для потока данных, 
#                           в который пишем
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN       пока поддержана только SASL/PLAIN-аутентификация
#sasl.jaas.config 
#               username    @путь к базе данных YDB
#               password    API Key сервисного аккаунта sa-habr-demo-kafka 
#                           (ниже заменен на ***, подставьте ваш)

bootstrap.servers=ydb-03.serverless.yandexcloud.net:9093 
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";

После подготовки файлов необходимо выполнить скрипт из файла kafka_write.sh. В результате мы сможем ввести в терминале сообщения, которые будут записаны в поток:

После записи сообщений их можно увидеть в консоли Yandex Cloud. Для этого перейдём в консоли к потоку yds-habr-demo, в который мы писали, и выберем слева в меню пункт «Просмотр данных». Увидим основные графики записи/чтения для потока, а также сможем просмотреть сообщения, записанные в поток:

Чтение из потока данных с помощью Kafka cli

Для чтения из потока данных с помощью Kafka cli необходимо подготовить следующие файлы. В комментариях также подробно расписала, какие параметры объектов, заведённых раньше, должны быть прописаны при вызовах Kafka cli:

kafka_read.sh

#--broker-list      эндпоинт kafka api для потока данных, в который пишем
#--topic            имя потока данных, в который пишем
#--producer.config  путь к файлу с конфигурацией потока данных

bin/kafka-console-consumer.sh --bootstrap-server ydb-03.serverless.yandexcloud.net:9093 \
                          --topic /ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o/yds-habr-demo \
                          --group "habr-demo-consumer" \
                          --consumer.config ../kafka_habr_examples/consumer.properties

consumer.properties

#bootstrap.servers          эндпоинт kafka api для потока данных, 
#                           в который пишем
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN       пока поддержана только SASL/PLAIN-аутентификация
#sasl.jaas.config 
#               username    @путь к базе данных YDB
#               password    API Key сервисного аккаунта sa-habr-demo-kafka, 
#                           (ниже заменен на ***, подставьте ваш)

bootstrap.servers=ydb-03.serverless.yandexcloud.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
check.crcs=false

После подготовки файлов необходимо выполнить скрипт из файла kafka_read.sh, в результате в терминале будут выведены сообщения из потока yds-habr-demo, которые соответствуют записанным ранее:

Выгрузка из потока данных в объектное хранилище 

Для выгрузки из потока подготовим следующие файлы:

s3-sink.properties

#name                   постфикс имени выделенного потребителя после <connect->; 
#                       в этом примере был создан потребитель connect-habr-demo-consumer, 
#                       т.е. в name надо передавать habr-demo-consumer
#connector.class        коннектор
#topics                 имя потока данных
#aws.access.key.id      идентификатор аутентификационного ключа для S3
#aws.secret.access.key  секретный ключ для S3 (ниже заменен на ***, подставьте ваш)
#aws.s3.bucket.name     имя бакета Object Storage
#aws.s3.endpoint        эндпоинт Object Storage

name=habr-demo-consumer 
connector.class=io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector
topics=yds-habr-demo
aws.access.key.id=YCAJEQncz5zY96dTQR-dAkKsn
aws.secret.access.key=***************
aws.s3.bucket.name=ydb-habr-bucket
aws.s3.endpoint=https://storage.yandexcloud.net
format.output.type=json
file.compression.type=none

worker.properties

#bootstrap.servers      эндпоинт kafka api для потока данных, 
#                       из которого выгружаем данные
#username               @путь к базе данных YDB
#password               API Key сервисного аккаунта, ниже заменен на ***

bootstrap.servers=ydb-03.serverless.yandexcloud.net:9093

# AdminAPI connect properties
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";

# Producer connect properties
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";

# Consumer connect properties
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# Worker properties
plugin.path=../s3-connector-for-apache-kafka-2.15.0
offset.storage.file.filename=worker-sink.offset

consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
consumer.check.crcs=false

После подготовки файлов запустим следующий скрипт:

../kafka_2.13-3.6.1/bin/connect-standalone.sh worker.properties s3-sink.properties

Скрипт запустит kafka connector и выгрузит содержимое топика yds-habr-demo в бакет ydb-habr-bucket с помощью консьюмера connect-habr-demo-consumer. Каждый раз при запуске этого скрипта он выгружает в новый объект хранилища новые данные из топика, если они появились с момента предыдущего запуска. При просмотре бакета в хранилище можно скачать любой объект из него в виде json-файла.

Совет про хранение паролей 

При использовании примеров в реальной работе не рекомендуем хранить пароли в открытом виде в config-файлах. Вместо этого используйте провайдеры.

Итого

Как показывают примеры выше, работать с потоками данных Yandex Data Streams можно через Kafka API и с помощью стандартных инструментов Kafka.
Таким образом, сервис Yandex Data Streams становится аналогом Kafka. Но при этом вы можете использовать его в serverless‑варианте, с выделением ресурсов по требованию.

Комментарии (0)