1. Введение

Kafka нельзя назвать новым продуктом на рынке ПО. Прошло примерно 10 лет с того времени, как компания разработчик LinkedIn выпустила его в свет. И хотя к тому времени на рынке уже были продукты со схожей функциональностью, но открытый код и широкая поддержка экспертного сообщества прежде всего в лице Apache Incubator позволила ему быстро встать на ноги, а впоследствии составить серьезную конкуренцию альтернативным решениям.

Традиционно Kafka рассматривался как набор сервисов для приема и передачи данных, позволяющий накапливать, хранить и отдавать данные с крайне низкой задержкой и высокой пропускной способностью. Этакий надежный и быстрый (да и в общем-то наиболее популярный на данный момент) брокер сообщений по этой причине весьма востребован во множестве ETL процессов. Преимущества и возможности Kafka многократно обсуждались, в том числе и на Хабре. К тому же, статей на данную тематику весьма много на просторах интернета. Не будем повторять здесь достоинства Kafk-и, достаточно посмотреть на список организаций, выбравших этот продукт  базовым инструментом для технических решений. Обратимся к официальному сайту, согласно которому на данный момент Kafka используется тысячами компаний, в том числе более 60% компаний из списка Fortune 100. Среди них Box, Goldman Sachs, Target, Cisco, Intuit и другие [1].

На сегодняшний день Apache Kafkaне без оснований часто признается лучшим продуктом на рынке систем по передаче данных. Но Kafka не только интересен в качестве брокера сообщений. Огромный интерес он представляет и в силу того, что на его основе возникли и развиваются многие специфические программные продукты, которые позволяют Kafka существенным образом расширить возможности. А это свою очередь позволяет ему уверено продвигаться в новые области ИT рынка.

Как правило, Kafka встроен в ETL процесс (ETL – аббревиатура от Extract, Transform, Load), который в простейшем случае состоит из следующих шагов:

  1. получение данных из источника;

  2. очистка и трансформация данных;

  3. загрузка в целевую систему.

Задачи Kafka сводились к доставке данных внутри ETL-процесса. А есть ли возможность применить Kafka (в широком смысле все экосистему, возникшую вокруг Kafka) для решения задач ETL? Да, так пункты 1 и 3 на сегодняшний день достаточно успешно решаются по средствам так называемых  Kafka-коннекторов. Остановимся здесь кратко, так как всю необходимую информацию можно найти на соответствующих сайтах [2, 3].

Kafka Connect – это инструмент для масштабируемой и надежной потоковой передачи данных между Apache Kafka и другими системами данных. Процесс извлечения данных из внешних систем может быть реализован с использованием механизма Source Kafka Connect, который  может как принимать целые базы данных, так и собирать метрики со всех ваших серверов приложений в топики Kafka. Другой тип коннекторов – Sink Kafka Connect предназначен для передачи данных из Kafka во вторичные хранилища, такие как например Elasticsearch, Hadoop или иные базы данных, для дальнейшей работы.

К настоящему времени написано достаточно большое количество всевозможных коннекторов, что позволяет наладить процедуру взаимодействия по получению данных из обширного числа источников (это же справедливо и для сохранения данных на выбранной платформе). Один из базовых сайтов, предоставляющий такого рода плагины [4] насчитывает несколько сотен коннекторов практически для всех популярных источников данных.

2. Трансформация данных

Как уже было сказано выше, экосистема Kafka к настоящему времени на пути к полноценному ETL-инструменту. То есть на основе только одной экосистемы Kafkи можно успешно строить и развивать полноценные ETL-решения. Однако, до сих пор ничего не было сказано об основной части ETL-процесса – трансформации данных. Поскольку бизнес-требования всегда уникальны, то готовых решений по трансформации нет. В каждом случае это отдельная задача, требующая индивидуального подхода. В данной статье я хотел бы остановиться на одном из таких инструментов, получивших бурное развитие в последние годы, а именно ksqlDB.

ksqlDB –  это база данных, построенная на данных хранящихся в Kafka [5]. Благодаря платформе ksqlDB, у Apache Kafka есть возможность обработки, создания и хранения потоков данных с помощью простых и понятных SQL-запросов. ksqlDB позволяет выполнять различные операции потоковой аналитики больших данных: фильтрация, соединения, агрегация, создание материализованных представлений, преобразования и сопоставления потоков событий с помощью типового инструментария SQL-запросов. Это позволяет аналитикам и разработчикам активно использовать эту платформу потоковой обработки событий в различных задачах.

Рисунок 1. Блок-схема ETL процесса. ksqlDb отвечает за трансформацию данных.
Рисунок 1. Блок-схема ETL процесса. ksqlDb отвечает за трансформацию данных.

Рассмотрим некоторые базовые конструкции ksqlDB.

2.1 Потоки в ksqlDB

Потоки (streams) являются объектами, построенными поверх Kafka топиков (topic), в которых хранятся данные. Потоки представляют данные в движении, вновь поступающие данные не изменяют ни одну из существующих записей, а лишь добавляются к текущему потоку. Таким образом, данные в потоке всегда остаются неизменными.

Рассмотрим простейший пример построений потока.

Первое что нам необходимо сделать – создать Kafka топик.  Для этого воспользуемся стандартной консольной утилитой kafka_topics.sh:

./bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic tpk_persons

Теперь, при помощи другого инструмента – kafka-console-producer – зальем в только что созданный топик тестовые данные:

./bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic tpk_persons

В открывшемся консольном окне введем данные в JSON формате. Для примера:

{"ID":1,"NAME":"IVAN,IVANOV","AGE":25}
{"ID":2,"NAME":"PETR,PETROV","AGE":27}
{"ID":3,"NAME":"SIDR,SIDOROV","AGE":29}

Для создания потока нам потребуется инструмент ksqlDB Cli. Исходим из того, что ksqldb сервер установлен на вашем хосте.

Запуск сервера:

/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties

После чего можно стартовать клиентское приложение:

/usr/bin/ksql http://0.0.0.0:8088
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v0.20.0, Server v0.20.0 located at http://0.0.0.0:8088
Server Status: RUNNING

В приложении ksqlDB Cli можем убедиться, что данные есть в соответствующем топике tpk_persons. Для этого достаточно выполнить команду print tpk_persons. Но для того, чтобы быть уверенным, что наши запросы читают данные с начала топика Kafka, выполним перед этим следующую команду:

SET 'auto.offset.reset' = 'earliest';

Полученный результат представлен ниже:

ksql> print tpk_persons;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/31 15:19:24.922 Z, key: <null>, value: {"ID":1,"NAME":"IVAN,IVANOV","AGE":25}, partition: 0
rowtime: 2021/10/31 15:19:25.019 Z, key: <null>, value: {"ID":2,"NAME":"PETR,PETROV","AGE":27}, partition: 0
rowtime: 2021/10/31 15:19:25.019 Z, key: <null>, value: {"ID":3,"NAME":"SIDR,SIDOROV","AGE":29}, partition: 0

Поскольку формат выбран, то и поток (stream) также будет создан с указанием соответствующего формата

CREATE OR REPLACE STREAM str_persons (
  id             BIGINT
, name           VARCHAR
, age            INT
) WITH (
  KAFKA_TOPIC = 'tpk_persons'
, VALUE_FORMAT = 'JSON'
);

Теперь ksqlDB дает возможность обратиться к потоку как к табличной структуре. И что самое замечательное –  язык, на котором мы будем работать с данными – практически стандартный SQL, а это существенно упрощает работу.

ksql> SELECT * FROM str_persons EMIT CHANGES;
+---------+----------------------+------+
|ID       |NAME                  |AGE   |
+---------+----------------------+------+
|1        |IVAN,IVANOV           |25    |
|2        |PETR,PETROV           |27    |
|3        |SIDR,SIDOROV          |29    |

Как было сказано выше, модифицировать данные нельзя, а вот добавлять – можно. Отсюда следует, что выполнение insert в steam написанного по стандартам sql – абсолютно валидная процедура.

INSERT INTO str_persons (id, name, age) VALUES (4, 'MARK,MOROZOV', 32);

Здесь хотелось бы отметить, что привычный многим sql синтаксис не просто годится для отображения данных, а представляет собой весьма мощный инструмент для преобразования этих самых данных.  Сознательно не буду расписывать некоторые синтаксические особенности построения запросов, так как это сильно загромоздило бы изложение материала и свелось к цитированию документации. Желающие всегда могут обратиться на сайт [6] и найти все необходимые ответы.

Надеюсь, с простыми JSON типами понятно, а как быть со сложными конструкциями, содержащие вложения, и повторениями? Ведь JSON может быть устроен совсем не так просто, как в приведенном примере. И здесь тоже все хорошо, ksqlDB предоставляет возможности решить и эту проблему. Покажем, как это можно сделать, а для этого создадим новый топик  tpk_workers и запишем туда JSON посложней:

{"WOKERS":[
   {"ID":1,"NAME":"IVAN,IVANOV","AGE":25}
 , {"ID":2,"NAME":"PETR,PETROV","AGE":27}
 , {"ID":3,"NAME":"SIDR,SIDOROV","AGE":29}
]} *
(* при добавлении в топик необходимо записать в одну строку, 
форматирование здесь применено для наглядности)

А в ksqlDB создадим стрим, как раз под его структуру:

CREATE STREAM str_wokers (
  WOKERS ARRAY<
           STRUCT<
             id    BIGINT
           , name  VARCHAR
           , age   INT
           >>) 
  WITH (KAFKA_TOPIC='tpk_wokers', 
 VALUE_FORMAT='JSON'
);

Если заранее известно количество вложений, то можно развернуть ответ по колонкам:

SELECT WOKERS[1]->id AS id1, WOKERS[1]->name AS name1, WOKERS[1]->age AS age1
     , WOKERS[2]->id AS id2, WOKERS[2]->name AS name2, WOKERS[2]->age AS age2
     , WOKERS[3]->id AS id3, WOKERS[3]->name AS name3, WOKERS[3]->age AS age3
  FROM str_wokers EMIT CHANGES;
+---+-----------+----+---+-----------+----+---+------------+----+
|ID1|NAME1      |AGE1|ID2|NAME2      |AGE2|ID3|NAME3       |AGE3|
+---+-----------+----+---+-----------+----+---+------------+----+
|1  |IVAN,IVANOV|25  |2  |PETR,PETROV|27  |3  |SIDR,SIDOROV|29  |

Либо сложить все в единую структуру:

SELECT EXPLODE(WOKERS)->id AS id
     , SPLIT(EXPLODE(WOKERS)->name, ',')[1] AS first_name 
     , SPLIT(EXPLODE(WOKERS)->name, ',')[2] AS last_name 
     , EXPLODE(WOKERS)->age AS age 
  FROM str_wokers EMIT CHANGES;
+----+-----------+----------+----+
|ID  |FIRST_NAME |LAST_NAME |AGE |
+----+-----------+----------+----+
|1   |IVAN       |IVANOV    |25  |
|2   |PETR       |PETROV    |27  |
|3   |SIDR       |SIDOROV   |29  |

2.2 Трансформация данных в SQL запросах

ksqlDB обладает достаточно продвинутым SQL синтаксисом, что позволяет преобразовывать, фильтровать и агрегировать данные в потоках. Рассмотрим следующий запрос к потоку:

SELECT '<id>' + CAST(id AS VARCHAR) + '</id>'
     + '<first_name>' + split(name, ',')[1] + '</first_name>'
     + '<last_name>' + split(name, ',')[2] + '</last_name>'
     + '<age>' + CAST(age AS VARCHAR) + '</age>'  
     + '<rowtime>' + CAST(ROWTIME AS VARCHAR) + '</rowtime>'as xml
  FROM str_persons
 WHERE age < 30
  EMIT CHANGES;

Это пример конкатенации текстовых значений, что дает возможность по сути перейти к формату XML, дописав к каждому полученному значению соответствующие теги. Хотелось бы дополнительно обратить внимание на поле ROWTIME – это системный столбец, который ksqlDB резервирует для отслеживания времени события (здесь время в unix формате). ROWTIME позволяет нам понять –  когда данные попали в Kafka.

+------------------------------------------------------------------------------------------------------------------+
|XML
+------------------------------------------------------------------------------------------------------------------+
|<id>1</id><first_name>IVAN</first_name><last_name>IVANOV</last_name><age>25</age><rowtime>1635693564922</rowtime> |
|<id>2</id><first_name>PETR</first_name><last_name>PETROV</last_name><age>27</age><rowtime>1635693565019</rowtime> |
|<id>3</id><first_name>SIDR</first_name><last_name>SIDOROV</last_name><age>29</age><rowtime>1635693565019</rowtime>|

Агрегатные функции также могут быть применены в запросах, но здесь (ksqlDb) всегда требуется условие GROUP BY. И в случае, если на выходе требуются показать только те поля, по которым идет агрегация, условие GROUP BY тоже обязательно, хотя выглядит нелогично. Обойти это противоречие можно достаточно простым образом:

SELECT '<max_age>' + CAST(max(age) AS VARCHAR) + '</max_age>' as max_age
  FROM str_persons 
 GROUP BY 0 
  EMIT CHANGES;
+---------------------+
|MAX_AGE              
+---------------------+
|<max_age>32</max_age>

Напомним, что входящие данные были в формате JSON, а вот результат нетрудно представить как в формате XML, так и в CSV форматах.

2.3 Сохранение полученных результатов в отдельный топик

Раннее было показано, что потоки в ksqlDB можно создать, основываясь на Kafka топиках. Но можем ли передать в Kafka уже преобразованные данные? Мы написали сложный select, в котором выполнили все необходимые нам трансформации, а теперь хотим сохранить результат полученного запроса в отдельный топик. Да, для этого воспользуемся конструкцией вида:

CREATE OR REPLACE STREAM str_jnew_persons
  WITH (KAFKA_TOPIC='tpk_jnew_persons', VALUE_FORMAT='JSON')
    AS SELECT id
      , split(name, ',')[1] AS first_name
      , split(name, ',')[2] AS last_name
      , age
      , TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Moscow') AS ts 
   FROM str_persons
  WHERE age < 30
;

Здесь хотелось бы обратить внимание на три особенности:

  1. При создании потока присутствует новое служебное слово «FROM», так как в этом случае данные получаем из другого потока;

  2. Kafka топик tpk_jnew_persons в данной конструкции не источник, а приемник данных;

  3. Создав новый стрим str_jnew_persons, мы неявно создали консьюмер, задача которого передать данные в новый топик tpk_jnew_persons. То есть добавляя данные в топик tpk_persons, мы автоматически приводим в действие механизм, который преобразует исходные данные согласно запросу и перекладывает их в топик tpk_jnew_persons.

Выборка данных из потока str_jnew_persons показывает, что все трансформации благополучно отработали, и табличная структура имеет именно то количество и содержимое полей, что и требовалось в запросе:

SELECT * FROM str_jnew_persons EMIT CHANGES;
+---+-----------+----------+----+--------------------+
|ID |FIRST_NAME |LAST_NAME |AGE |TS                  |
+---+-----------+----------+----+--------------------+
|1  |IVAN       |IVANOV    |25  |2021-11-02 19:10:41 |
|2  |PETR       |PETROV    |27  |2021-11-02 19:10:49 |
|3  |SIDR       |SIDOROV   |29  |2021-11-02 19:10:57 |

Теперь проверим содержание топика tpk_jnew_persons. Для этого воспользуемся стандартной консольной утилитой kafka-console-consumer.sh:

./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic tpk_jnew_persons \
--from-beginning

Получим следующий результат:

{"ID":1,"FIRST_NAME":"IVAN","LAST_NAME":"IVANOV","AGE":100,"TS":"2021-11-02 19:09:01"}
{"ID":1,"FIRST_NAME":"IVAN","LAST_NAME":"IVANOV","AGE":25,"TS":"2021-11-02 19:10:41"}
{"ID":2,"FIRST_NAME":"PETR","LAST_NAME":"PETROV","AGE":27,"TS":"2021-11-02 19:10:49"}
{"ID":3,"FIRST_NAME":"SIDR","LAST_NAME":"SIDOROV","AGE":29,"TS":"2021-11-02 19:10:57"}

Перенаправив данные в новый топик, мы даем возможность любому программному продукту подключиться к нему и забрать уже преобразованные (очищенные, агрегированные) данные.

2.4 Трансформация форматов данных в ksqlDB

В главе 2.2 мы по сути выполнили преобразования, которые извлекают данные из JSON формата и создали нечто напоминающее XML. Само наличие такой возможности большой плюс, который дает нам sql в преобразовании данных. Однако, трансформация данных не исчерпывается одной такой возможностью.

Под трансформацией данных часто подразумевает и преобразование в другой формат. Одно дело – передавать данные из внешних устройств по http протоколу, где json или xml подходящие форматы, совсем другое – сохранять данные в этих форматах. Да, json или xml поддерживаются, как отдельный тип в большинстве современных баз, но хранить данные в таком виде не самый лучший вариант (сложности в написании запросов, проблемы консистенции данных, потери производительности и т.д.).

Давайте посмотрим, что может предложить ksqlDB в вопросе преобразования форматов.

Из документации известно, что ksqlDB поддерживает некоторый ряд форматов сериализации. Вот лишь некоторые из них: DELIMITED, JSON, AVRO, PROTOBUF. Формат сериализации указывается при создании потока. Так, к примеру, при создании потока str_persons, мы указали формат JSON (VALUE_FORMAT = 'JSON'). Можно ли данные, поступающие в одном формате, скажем JSON, конвертировать в AVRO формат средствами ksqlDB? Да, только одной ksqlDB здесь, увы, будет недостаточно. Тем не менее покажем, как это можно сделать.

Для решения подобной задачи нам потребуется еще один продукт из экосистемы Kafka, а именно Schema Registry. Данный продукт входит в состав Confluent Platform [7] и соответственно может быть инсталлирован как часть Confluent Platform.  Но если все необходимые продукты уже есть на вашем хосте, то для установки одной лишь Schema Registry можно установить только данный продукт.

yum install confluent-schema-registry –y

Schema Registry, как это следует из документации на сегодняшний день, умеет работать с Avro, JSON, JSON Schema и Protobuf форматами. Этот программный продукт предоставляет RESTful интерфейс для хранения и извлечения схем (описание того, как устроена атомарная единица – допустим, строка в Avro формате).

Все типы, преобразования, в которые (либо из которых) поддерживает Schema Registry можно просмотреть непосредственно из браузера. Для в файле настроек ksql-server.properties  параметр 'ksql.schema.registry.url' должен быть установлен в значение 'http://localhost:8081'.

Рисунок 2. Список доступных форматов
Рисунок 2. Список доступных форматов

Для того, чтобы можно было бы воспользоваться возможностями Schema Registry, нам потребуется зарегистрировать AVRO схему для нашего набора данных. Нетрудно понять, что в нашем случае схема будет выглядеть следующим образом:

{
  "type": "record",
  "name": "Persons",
  "fields": [{"name": "Id", "type": "long"},
             {"name": "name", "type": "string"},
             {"name": "age", "type": "int"},
             {"name": "ts", "type": "string"}
            ]
}

Теперь достаточно выполнить REST запрос к серверу, чтобы зарегистрировать новую схему. Это можно сделать стандартной утилитой curl, следующим образом:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
 -d '{"schema": "avro_schema"}' \
 http://localhost:8081/subjects/tpk_anew_persons/versions

Здесь параметр avro_schema необходимо заменить на ту схему, что приведена выше (не забываем экранировать двойные кавычки), а tpk_anew_persons – kafka топик, куда собственно мы планируем загружать данные в AVRO формате.

А после процесса регистрации данную схему можно будет увидеть по ссылке: http://localhost:8081/schemas/

Рисунок 3. Пример зарегистрированной схемы
Рисунок 3. Пример зарегистрированной схемы

И только теперь мы можем создать поток соответствующего формата: он будет построен на новом Kafka топике – tpk_anew_persons. Ниже приведен синтаксис по созданию необходимого потока для консольной утилиты ksqlDb Cli:

CREATE OR REPLACE STREAM str_anew_persons
  WITH (KAFKA_TOPIC='tpk_anew_persons', VALUE_FORMAT='AVRO')
    AS SELECT id
      , split(name, ',')[1] AS first_name
      , split(name, ',')[2] AS last_name
      , age
      , TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Moscow') AS ts 
   FROM str_persons
  WHERE age < 30
;

А вот для просмотра содержимого топика tpk_anew_persons теперь потребуется иная разновидность consumer-а – kafka-avro-console-consumer (ставится с пакетом Schema Registry). Посмотрим на полученный результат:

./bin/kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic tpk_anew_persons \
  --from-beginning \
  --property schema.registry.url=http://localhost:8081
{"ID":{"long":1},"FIRST_NAME":{"string":"IVAN"},"LAST_NAME":{"string":"IVANOV"},"AGE":{"int":25},"TS":{"string":"2021-11-03 00:46:23"}}
{"ID":{"long":2},"FIRST_NAME":{"string":"PETR"},"LAST_NAME":{"string":"PETROV"},"AGE":{"int":27},"TS":{"string":"2021-11-03 00:47:03"}}
{"ID":{"long":3},"FIRST_NAME":{"string":"SIDR"},"LAST_NAME":{"string":"SIDOROV"},"AGE":{"int":29},"TS":{"string":"2021-11-03 00:58:28"}}

Другими словами, преобразование форматов хоть и не отличается разнообразием (список поддерживаемых форматов пока, увы, не богат), но является вполне рабочим решением.

2.5 Лямбда и Каппа архитектуры с точки зрения ksqlDb

При всех достоинствах лямбда-архитектуры главным недостатком этого подхода к проектированию Big Data систем считается его сложность, из-за дублирования логики обработки данных в холодном и горячем путях. Давайте посмотрим, как могла бы выглядеть лямбда-архитектура в том случае, если появляется возможность писать запросы к Kafka. Здесь нет разделения на два пути, но дублирование логики запросов сохраняется. Быстрый слой уже не нуждается в отдельном хранилище по той причине, что быстрые данные – -это данные в Kafka. Остается решить вопрос со строгим разделением данных, иначе есть риск учесть часть данных дважды. Предположим, что топики Kafka настроены таким образом, что хранят их более суток. Тогда холодными будем считать те из них, что хранятся в target базе (см рисунок 1). Таким образом все наши запросы к базе должны учитывать соответствующее условие (до «сегодняшних» данных). А вот быстрые данные, можно получить непосредственно из Kafka, также добавив к запросу соответствующее условие (только данные за сегодня). С другой стороны, топики могут быть настроены таким образом, чтобы вовсе не удалять полученную информацию. Решать вам, насколько хорош такой подход, но в этом случае сложная задача «вырождается» до уровня простой, и мы получаем Каппа-архитектуру, где нет разделения на горячий и холодный путь. Полный объем данных здесь хранится в Kafka и вся логика запросов к данным реализована в ksqlDb.

3. Заключение

Процесс трансформации – один из краеугольных камней ETL. В мире больших данных существует большое количество всевозможных приложений, в той или иной степени успешно решающих данную задачу. Каждое из них обладает своими слабыми и сильными сторонами, и не все могут обеспечить высокую надежность и способность обрабатывать большие потоки информации за малые промежутки времени. По этой причине часто инструменты для трансформации в Big Data работают совместно с Kafka. Kafka успешно справляется с такими задачами, но, как правило, в роли простого брокера сообщений.

С появлением и развитием таких инструментов как CDC (Change Data Capture) и всевозможных Kafka коннекторов, ситуация начала меняться. Экосистема продуктов, выросшая на основе Kafka, получила возможность забирать все изменения данных непосредственно из базы и размещать новые данные в соответствующих топиках Kafka. Даже в таком варианте это уже во многом решало задачу репликации (backup для «старых» данных + Kafka для «новых»), а значит сложные системы stand-by, которые часто строят для повышения надежности, можно в какой-то мере заменить Kafka. Более того, stand-by в ряде случаев – это удвоение лицензий, т.е. дополнительные финансовые затраты. Однако, с другой стороны, stand-by – это не только надежность, но и возможность построения сложных отчетов на дополнительном сервере, без нагрузки на основной. Таким образом, оставалось сделать еще один шаг – научить Kafka работать с потоками данных как с обычными таблицами. То есть научиться получить только ту информацию, которая нужна для отчета, а для этого было необходимо научить Kafka понимать стандартный язык структурированных запросов – SQL.  

С появлением ksqlDb вопросы выполнения отчетов и трансформации данных могут быть решены с высокой степенью эффективности, а программные продукты, ранее решавшие подобного рода задачи (порой весьма недешевые, особенно на фоне весьма гуманной лицензии от Confluent [8]), будут выглядеть уже не столь привлекательно.

Рисунок 4. Блок-схема ETL процесса. ksqlDb отвечает за трансформацию данных.
Здесь Kafka брокер сообщений и целевая база.
Рисунок 4. Блок-схема ETL процесса. ksqlDb отвечает за трансформацию данных. Здесь Kafka брокер сообщений и целевая база.

В ksqlDB реализовано множество конструкций, которые давно стали стандартом для практически любой базы данных. Это и таблицы, построенные на топиках, и объединения как таблиц, так и потоков (в полной мере реализованы inner, left и full объединения). Причем от версии к версии возможности sql модификаций и обогащения данных только возрастают. Да, набор SQL операторов, которые сейчас входят в стандартную поставку ksqlDb, могут не впечатлить людей долгое время работавших с базами данных и привыкших к разнообразию sql синтаксиса. Но уже имеющиеся возможности плюс достаточно быстрое развитие в данном направлении, позволяют утверждать, что ksqlDB обладает весьма широкими возможностями по трансформации данных, и эти возможности будут только возрастать. К плюсам системы следует отнести и возможность самостоятельно дописать недостающие функции через имеющийся механизм UDF, что делает возможным решение средствами ksqlDB большого класса ETL-задач.

Kafka и вся экосистема продуктов, выросшая на ее основе, проделала большой путь от одного из брокеров сообщений до весьма зрелого набора инструментов, позволяющего решать широкий класс задач. Сможет ли Kafka+ksqlDB полноценно заменить базу данных, покажет время [9, 10]. Но уже сейчас понятно, что ряд задач может быть решен и без использования target-database. Ведь все, что нужно для работы, уже есть: и быстрая надёжная система хранения данных (непосредственно Kafka), и возможность сделать практически любую выборку средствами ksqlDb. Конечно, все зависит от конкретной задачи, да и полноценной заменой СУБД связка Kafka+ksqlDB не является. Но если вы работаете с Kafka, стоит присмотреться к ksqlDb: возможно, это именно тот инструмент, которого будет достаточно для решения ваших задач.

Источники

[1] https://kafka.apache.org/powered-by

[2] https://docs.ksqldb.io/en/latest/concepts/connectors/

[3] https://www.confluent.io/product/confluent-connectors/

[4] https://www.confluent.io/hub/

[5] https://ksqldb.io/

[6] https://docs.ksqldb.io/

[7] https://docs.confluent.io/platform/current/release-notes/index.html

[8] https://github.com/confluentinc/ksql

[9] https://dzone.com/articles/is-apache-kafka-a-database-the-2020-update

[10] https://davidxiang.com/2021/01/10/kafka-as-a-database/

Комментарии (2)


  1. korsetlr473
    08.12.2021 18:09

    Добрый день. в ksql появилась или нет фича - посылать только финальный результат агрегации окна?


  1. ggo
    09.12.2021 14:46

    На сегодняшний день Apache Kafkaне без оснований часто признается лучшим продуктом на рынке систем по передаче данных. Но Kafka не только интересен в качестве брокера сообщений.

    Регулярно встречаю подобное. Почему-то кафку причисляют с брокерам сообщений. Хотя кафка - это stream processing. Т.е. не просто переложить "что-то" из одного места в другое, а сделать с этим "что-то" некие, возможно, сложные манипуляции.