Всем привет! Меня зовут Елена Калинина, и я технический менеджер проектов в команде YDB — в рамках Yandex Infrastructure наша команда создаёт технологии для работы всего Яндекса.
YDB — это распределённая отказоустойчивая СУБД с открытым исходным кодом. Для потоковых нагрузок в YDB реализован механизм персистентных очередей сообщений под названием YDB Topics. Топики YDB используются в качестве основной шины данных в Яндексе, что позволяет многократно экономить на серверах и их обслуживании.
Но что если какая‑то компания соблазнится такой экономией и захочет перейти с Apache Kafka на YDB Topics? Без API‑совместимых решений придётся переписывать весь код? К счастью, для работы с топиками YDB можно использовать Kafka API — и в этой статье я подробно покажу, как это сделать, на примере чтения и записи в поток данных и дальнейшей выгрузки в объектное хранилище в облаке.
О чём статья:
Какие сценарии работы через Kafka API рассмотрены в этой статье
-
Создание базы данных YDB
Создание потока данных Data Streams
Создание выделенных потребителей для потока данных
Создание бакета в хранилище S3
-
Создание сервисных аккаунтов, выдача ролей и создание ключей
-
Запись и чтение из потока данных с помощью Kafka cli
-
Выгрузка из потока данных в хранилище S3
Совет по безопасности
Немного предыстории, терминологии и базовых понятий
В 2013 году в качестве шины данных в Яндексе начали использовать Apache Kafka, но в этом продукте команде нравилось далеко не всё. При количестве данных, которое накоплено в компании, в Apache Kafka становилось сложно управлять правами доступа, организовывать распределённую работу нескольких команд, расширять или сокращать кластер при необходимости и многое другое.
При отсутствии подходящего решения в открытом доступе, пришлось делать своё. Так в 2017 году и появилась система YDB Topics, выложенная в опенсорс в составе платформы данных YDB — катастрофоустойчивой и масштабируемой на тысячи узлов базы данных. И как показал последующий опыт использования YDB Topics, по сравнению с другими опенсорс‑решениями, в крупной компании шина данных на базе технологий YDB позволяет экономить на оборудовании и его обслуживании в несколько раз.
Наша СУБД существует в трёх вариантах:
опенсорс‑проект, в рамках которого развивается ядро YDB;
управляемый сервис баз данных Yandex Managed Service for YDB в Yandex Cloud, а также интегрированные с ним сервисы Yandex Data Streams для потоковых нагрузок и сервис очередей для обмена отдельными сообщениями Yandex Message Queue;
коммерческое программное обеспечение для установки на собственных серверах пользователей.
В данной статье мы рассмотрим работу с YDB Topics через Kafka API в Yandex Cloud. Об особенностях работы с Kafka API при использовании опенсорс‑версии YDB вы можете прочитать здесь.
YDB поддерживает одновременное выполнение транзакционных (OLTP) и потоковых нагрузок, а также с недавних пор позволяет работать со сложными аналитическими запросами (OLAP). Для работы с потоковыми нагрузками в Yandex Cloud используется отдельный сервис Yandex Data Streams, тесно интегрированный с Yandex Managed Service for YDB. Любой поток данных, который вы создаёте в Yandex Data Streams, создаётся внутри одной из баз данных Yandex Managed Service for YDB.
Исторически потоковая обработка была представлена в Yandex Data Streams в виде сервиса с доступом по Kinesis API и использованием соответствующей терминологии — вы работаете с потоками данных. За сценой же потоки Yandex Data Streams основаны на топиках YDB. Поток данных, или топик в YDB — это сущность для хранения неструктурированных сообщений, предназначенная для их доставки множеству подписчиков. Топик в YDB, как мы уже рассказывали, является аналогом топика в Apache Kafka.
Какие сценарии работы через Kafka API рассмотрены в этой статье
После знакомства с этой статьёй читатель сможет использовать kafka cli для чтения или записи в поток данных Yandex Data Streams, и Kafka Connect для выгрузки данных из потока данных в объектное хранилище, в нашем случае — Yandex Object Storage.
В данной статье не рассматриваются вопросы использования других API для работы с потоками данных, но вы можете прочитать о них здесь: YDB Topic API, Kinesis API.
Создание объектов: база данных YDB, поток данных Data Streams, бакет S3
Нам понадобится предварительно настроенное облачное окружение, где уже созданы базовые аккаунты, облака и каталоги. Если у вас ещё не было опыта работы с ресурсной моделью Yandex Cloud, то рекомендую также ознакомиться с этой статьёй.
Как создать в облаке объекты: базу данных YDB, поток данных Data Streams, S3-бакет
-
Создание базы данных YDB
В консоли управления в списке слева выберем нужное облако, а в нём — каталог, в котором будет создана БД. На странице обзора каталога нажмём справа вверху «Создать ресурс» и выберем «База данных YDB».
На странице создания базы данных в качестве имени введём ydb-habr-demo-1. Для остальных параметров можно оставить значения по умолчанию. Подробно о том, что означают параметры базы данных YDB, можно прочитать в документации. Затем нажмём кнопку «Создать БД».
После создания базы данных можно перейти на страницу обзора базы данных. Для дальнейшей работы понадобится имя базы данных, в нашем примере это ydb-habr-demo-1.
Обратите внимание, что мы создали serverless-базу данных, и примеры скриптов ниже предназначены для работы с потоками данных в рамках serverless-БД.
-
Создание потока данных Data Streams
В консоли управления выберем нужное облако и в нём каталог, в котором создана БД YDB. Далее нажмём справа вверху «Создать ресурс» и выберем «Data Streams».
На странице создания потока данных в поле «База данных» выберем ydb-habr-demo-1, а в поле «Имя» введём yds-habr-demo. Для остальных параметров можно оставить значения по умолчанию. Подробно о том, что означают параметры потока данных Data Streams, тоже можно прочитать в документации. Затем нажмём «Создать».
После создания потока данных перейдём на страницу обзора потока данных. Для дальнейшей работы понадобятся имя потока данных, имя базы данных, путь к базе данных и эндпоинт Kafka API, в нашем примере:
Имя потока данных: yds-habr-demo
Имя базы данных: ydb-habr-demo-1
Путь к базе данных: /ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o
Важно: в вашем каталоге путь будет другим, скопировать его можно со страницы обзора базы данных или обзора потока.
Эндпоинт Kafka API: ydb-03.serverless.yandexcloud.net:9093
В вашем каталоге эндпоинт Kafka API может быть другим. Копируйте эндпоинт Kafka API со страницы обзора базы данных или обзора потока.
-
Создание выделенных потребителей для потока данных
Для чтения из потока данных нужно создать выделенного потребителя — это аналог kafka consumer. Для созданного потока данных на панели меню слева выберем пункт «Выделенные потребители», нажмём справа вверху «Создать потребитель», зададим для него имя, например habr-demo-consumer, и нажмём «Создать».
Для примера ниже с отгрузкой в объектное хранилище потребуется ещё выделенный потребитель, название которого начинается на connect‑ (это особенности работы коннектора). В нашем примере будет использован выделенный потребитель connect-habr-demo-consumer.
-
Создание S3-бакета в объектном хранилище Yandex Object Storage
На странице обзора каталога в Yandex Cloud нажмём справа вверху «Создать ресурс» и выберем «Бакет». На странице создания бакета зададим имя ydb-habr-bucket, для остальных параметров снова можно оставить значения по умолчанию. Для дальнейшей работы понадобится имя бакета, в нашем примере ydb-habr-bucket.
Для доступа к созданным объектам в облаке нам понадобятся сервисные аккаунты, обладающие нужными ролями. Кроме того, для этих сервисных аккаунтов необходимо выписать ключи. Пойдём по порядку.
Создание сервисного аккаунта для доступа к потоку данных и скоупированного ключа
На дашборде каталога перейдём в сервис Identity and Access Management и нажмём справа вверху «Создать сервисный аккаунт». Зададим имя сервисного аккаунта, например sa-habr-demo-kafka
, и добавим этому сервисному аккаунту роли:
ydb.viewer
— для чтения данных из потока,ydb.editor
— для записи данных в поток,ydb.kafkaApi.client
— для доступа к потоку данных по Kafka API.
Далее на странице обзора сервисного аккаунта нажмём справа вверху «Создать новый ключ» и выберем «Создать API‑ключ». Для ключа нужно выбрать область действия — yc.ydb.topics.manage
. Можно также задать описание и срок действия.
После создания ключа он будет выведен на экран — его необходимо сохранить, он понадобится в скриптах для чтения/записи по Kafka API.
Создание сервисного аккаунта для доступа к S3-бакету и статического ключа
Аналогично предыдущему пункту перейдём на дашборде каталога в сервис Identity and Access Management и нажмём справа вверху «Создать сервисный аккаунт». Зададим имя сервисного аккаунта, например sa-habr-demo-bucket
, и добавим этому сервисному аккаунту роль storage.uploader
.
Далее на странице обзора сервисного аккаунта нажмём справа вверху «Создать новый ключ» и выберем «Создать статический ключ доступа». Аналогично, после создания ключа он будет выведен на экран, его также нужно сохранить для использования в скриптах выгрузки в объектное хранилище.
Подготовка окружения
Для работы с дальнейшими примерами нам понадобится установленная Kafka. Здесь есть подробная инструкция по установке. В примере ниже предполагается, что запуск скриптов происходит из папки, в которую установлена Kafka, а сами файлы лежат в соседней папке kafka_habr_examples. Для работы также понадобится распакованный коннектор Aiven's S3 Sink Connector for Apache Kafka. Примеры скриптов ниже были сделаны для Kafka версии 3.6.1 и S3 Connector версии 2.15.0.
Структура папок и файлов для примеров из статьи
Папка |
Содержимое |
kafka_2.13-3.6.1 |
kafka установлена в эту папку |
S3-connector-for-apache-kafka-2.15.0 |
коннектор распакован в эту папку |
kafka_habr_examples |
содержит файлы: |
Запись в поток данных с помощью Kafka cli
Для записи необходимо подготовить следующие файлы. В комментариях подробно написано, какие параметры объектов, заведённых раньше, должны быть прописаны при вызовах Kafka cli:
kafka_write.sh
#--broker-list эндпоинт kafka api для потока данных, в который пишем
#--topic полный путь к потоку данных, в который пишем,
# в формате <путь к БД YDB>/<имя потока данных>
#--producer.config путь к файлу с конфигурацией потока данных
bin/kafka-console-producer.sh --broker-list ydb-03.serverless.yandexcloud.net:9093 \
--topic /ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o/yds-habr-demo \
--producer.config ../kafka_habr_examples/producer.properties
producer.properties
#bootstrap.servers эндпоинт kafka api для потока данных,
# в который пишем
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN пока поддержана только SASL/PLAIN-аутентификация
#sasl.jaas.config
# username @путь к базе данных YDB
# password API Key сервисного аккаунта sa-habr-demo-kafka
# (ниже заменен на ***, подставьте ваш)
bootstrap.servers=ydb-03.serverless.yandexcloud.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";
После подготовки файлов необходимо выполнить скрипт из файла kafka_write.sh
. В результате мы сможем ввести в терминале сообщения, которые будут записаны в поток:
После записи сообщений их можно увидеть в консоли Yandex Cloud. Для этого перейдём в консоли к потоку yds-habr-demo
, в который мы писали, и выберем слева в меню пункт «Просмотр данных». Увидим основные графики записи/чтения для потока, а также сможем просмотреть сообщения, записанные в поток:
Чтение из потока данных с помощью Kafka cli
Для чтения из потока данных с помощью Kafka cli необходимо подготовить следующие файлы. В комментариях также подробно расписала, какие параметры объектов, заведённых раньше, должны быть прописаны при вызовах Kafka cli:
kafka_read.sh
#--broker-list эндпоинт kafka api для потока данных, в который пишем
#--topic имя потока данных, в который пишем
#--producer.config путь к файлу с конфигурацией потока данных
bin/kafka-console-consumer.sh --bootstrap-server ydb-03.serverless.yandexcloud.net:9093 \
--topic /ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o/yds-habr-demo \
--group "habr-demo-consumer" \
--consumer.config ../kafka_habr_examples/consumer.properties
consumer.properties
#bootstrap.servers эндпоинт kafka api для потока данных,
# в который пишем
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN пока поддержана только SASL/PLAIN-аутентификация
#sasl.jaas.config
# username @путь к базе данных YDB
# password API Key сервисного аккаунта sa-habr-demo-kafka,
# (ниже заменен на ***, подставьте ваш)
bootstrap.servers=ydb-03.serverless.yandexcloud.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
check.crcs=false
После подготовки файлов необходимо выполнить скрипт из файла kafka_read.sh
, в результате в терминале будут выведены сообщения из потока yds-habr-demo
, которые соответствуют записанным ранее:
Выгрузка из потока данных в объектное хранилище
Для выгрузки из потока подготовим следующие файлы:
s3-sink.properties
#name постфикс имени выделенного потребителя после <connect->;
# в этом примере был создан потребитель connect-habr-demo-consumer,
# т.е. в name надо передавать habr-demo-consumer
#connector.class коннектор
#topics имя потока данных
#aws.access.key.id идентификатор аутентификационного ключа для S3
#aws.secret.access.key секретный ключ для S3 (ниже заменен на ***, подставьте ваш)
#aws.s3.bucket.name имя бакета Object Storage
#aws.s3.endpoint эндпоинт Object Storage
name=habr-demo-consumer
connector.class=io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector
topics=yds-habr-demo
aws.access.key.id=YCAJEQncz5zY96dTQR-dAkKsn
aws.secret.access.key=***************
aws.s3.bucket.name=ydb-habr-bucket
aws.s3.endpoint=https://storage.yandexcloud.net
format.output.type=json
file.compression.type=none
worker.properties
#bootstrap.servers эндпоинт kafka api для потока данных,
# из которого выгружаем данные
#username @путь к базе данных YDB
#password API Key сервисного аккаунта, ниже заменен на ***
bootstrap.servers=ydb-03.serverless.yandexcloud.net:9093
# AdminAPI connect properties
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";
# Producer connect properties
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";
# Consumer connect properties
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="@/ru-central1/b1g8skpblkos03malf3s/etnaslqlo78g4vlcsb7o" password="*************";
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Worker properties
plugin.path=../s3-connector-for-apache-kafka-2.15.0
offset.storage.file.filename=worker-sink.offset
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
consumer.check.crcs=false
После подготовки файлов запустим следующий скрипт:
../kafka_2.13-3.6.1/bin/connect-standalone.sh worker.properties s3-sink.properties
Скрипт запустит kafka connector и выгрузит содержимое топика yds-habr-demo
в бакет ydb-habr-bucket
с помощью консьюмера connect-habr-demo-consumer
. Каждый раз при запуске этого скрипта он выгружает в новый объект хранилища новые данные из топика, если они появились с момента предыдущего запуска. При просмотре бакета в хранилище можно скачать любой объект из него в виде json-файла.
Совет про хранение паролей
При использовании примеров в реальной работе не рекомендуем хранить пароли в открытом виде в config-файлах. Вместо этого используйте провайдеры.
Итого
Как показывают примеры выше, работать с потоками данных Yandex Data Streams можно через Kafka API и с помощью стандартных инструментов Kafka.
Таким образом, сервис Yandex Data Streams становится аналогом Kafka. Но при этом вы можете использовать его в serverless‑варианте, с выделением ресурсов по требованию.