1. Введение
Сегодня, в эпоху больших данных, когда компании тонут в информации из самых различных локальных и облачных источников, сотрудникам трудно увидеть общую картину. Анализ информации для отделения зерен от плевел требует все больше усилий. Визуализация данных помогает превратить все данные в понятную, визуально привлекательную и полезную информацию. Хорошо продуманная визуализация данных имеет критическое значение для принятия решений на их основе. Визуализация позволяет не только замечать и интерпретировать связи и взаимоотношения, но и выявлять развивающиеся тенденции, которые не привлекли бы внимания в виде необработанных данных. Большинство средств визуализации данных могут подключаться к источникам данных и таким образом использовать их для анализа. Пользователи могут выбрать наиболее подходящий способ представления данных из нескольких вариантов. В результате информация может быть представлена в графической форме, например, в виде круговой диаграммы, графика или визуального представления другого типа.
Большинство средств визуализации предлагает широкий выбор вариантов отображения данных, от обычных линейных графиков и столбчатых диаграмм до временных шкал, карт, зависимостей, гистограмм и настраиваемых представлений. Для решения задачи визуализации принципиальное значение имеет тип источника данных. И хотя современные средства визуализации проделали в этом вопросе большой путь, и предлагают на сегодняшний день весьма большой выбор, задача визуализации не решена в полной мере. Если для баз данных и целого ряда web сервисов задача визуализации не представляет принципиальной проблемы, то понять, что происходит с информационными потоками внутри некоторых программных продуктов из мира больших данных, не так просто.
Инструмент, на котором хотелось бы остановиться более подробно – Kafka. Он используется многими ведущими ИT-компаниями по всему миру. Это унифицированная платформа для обработки всех потоков данных в реальном времени [1]. Среди важных преимуществ данного инструмента следует выделить такие как: горизонтальная масштабируемость, поддержка доставки сообщений с низкой задержкой и гарантия отказоустойчивости при наличии отказов машины. Она способна обрабатывать большое количество разнообразных потребителей. Kafka очень быстрая – она может выполнять до нескольких миллионов операций записи в секунду и сохранять все данные на диск [2].
Как правило, Kafka является центральным элементом событийно-ориентированной архитектуры (см. рис. 1), что позволяет отцепить приложения друг от друга. Поскольку потоки данных, проходящие через Kafka, весьма велики, то визуализировать их непросто, хотя-бы в силу их объема. Однако если исходить из того что нам интересны не сами данные, а некая статистическая информация на их основе, то задача упрощается. Проще говоря, мы могли бы визуализировать результат работы некоторых агрегатных функций, отражающих основные тенденции в потоках данных. Для этого наши функции должны работать непосредственно с данными, т.е. расчет должен быть выполнен «внутри» самой Kafka. И такая возможность действительно существует: для решения такой задачи можно применить SQL-запросы к данным, которые могут выполнены при помощи плагина – ksqIDB.
Давайте рассмотрим такую задачу более подробно.
2. Топология современного ETL-процесса
Перед нами (рис. 1) одна из наиболее популярных схем построения потоковой обработки данных, где Kafka расположена в центре схемы и берет на себя роль инструмента для работы с большим количеством быстрых транзакций. Такая архитектура позволяет решить ряд проблем, связанных с работой хранилищ данных. Традиционные хранилища данных, в первую очередь, предназначены для анализа, где требуются длинные транзакции и сложные аналитические запросы, а вот с задачами OLTP (Online Transaction Processing) справляются хуже. Kafka обладает теми качествами которые приписываются OLTP системам, а следовательно если по топологии расположена перед корпоративных хранилищ данных (КХД), то такая связка Kafka-КХД является очень выгодным решением. Таким образом, можно объединить преимущества как OLAP (Online analytical processing) так и OLTP подходов. Кафка, которая установлена «перед» КХД позволяет сгладить пиковые нагрузки и даже если КХД работает на пределе своих возможностей по приему данных, это не приводит к информационным потерям. Та часть данных, которую не успевает обрабатывать КХД будет оставаться в Kafka, но лишь до того времени пока нагрузка не уменьшится, после чего данные «догрузятся» в КХД.
Обычно данные в Kafka не рассматриваются как «сырье» для анализа, они лишь ждут своего часа, пока не будут загружены в КХД. То есть через некоторое время, (возможно, речь идет о достаточно большом временном промежутке), когда попадут в нужные таблицы, они будут пригодны для анализа. И это потенциальная проблема, так как при таком подходе теряется возможность оперативно увидеть негативные тенденции в данных. Сложные аналитические расчеты позволяют понять причинно-следственные связи, закономерности, установить причину проблемы и т.д. Но какова оперативность такой информации, насколько быстро может быть получен такого рода ответ. Современные КХД даже в простом варианте предусматривают несколько слоев хранения данных и лишь на последнем будет построена витрина, по которой можно будет понять, что не так с данными. А можно ли понять наличие проблемы с данными оперативно, как только данные поступают в систему. Отчасти, да, потому что Kafka, а точнее надстройка над ней в виде возможности выполнять SQL-запросы, позволяет сделать некоторые выводы о полученных сырых данных.
Ну, а минусом такого подхода можно считать ряд ограничений данной технологии, не позволяющей строить сложные SQL- запросы. Однако для оценки ситуации иногда достаточно и относительно простого анализа данных. В качестве демонстрации данного утверждения можно рассмотреть следующий пример. Допустим мы собираем данные о финансовых транзакциях с нескольких магазинов по продаже канцелярских товаров. Если данные поступают из нескольких магазинов равномерно, т. е. количество транзакций примерно соответствует предыдущим промежуткам времени, то будем считать ситуацию штатной. При этом нам не обязательно знать детали о каждой отдельной транзакции, можно исходить из того, что количество и сумма покупок на определённый промежуток времени примерно соответствует той же величине, что наблюдалась в течении нескольких последних дней за соответствующий временной промежуток. В этом случае сигналом к тому, что что-то пошло не так, может служить значительное отклонение от нормы по одному или нескольким параметрам.
Рассмотрим пример, как мог бы быть организован такой процесс в простейшем случае. И самое первое что нам понадобится – Kafka.
3. Загрузка данных в Kafka
Для создания необходимого нам, topic-а kafka, воспользуемся стандартной консольной утилитой kafka_topics.sh:
kafka-topics.sh --create \
--topic tpk_products \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
Теперь, при помощи другого инструмента – kafka-console-producer
– зальем в только что созданный топик тестовые данные:
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic tpk_products
В нашем случае данные будут в JSON формате. JSON сейчас, пожалуй, самый популярный формат обмена данными между приложениями, поэтому логично воспользоваться именно им.
Пример:
{"Shop":"Азбука", "name":"Книга записная", "price":"167"}
4. Преобразования данных с использованием KsqlDB и Schema Registry
Инструмент, который позволит нам выполнять запросы к данным в Kafka – ksqIDB. KsqIDB – это база данных потоковой передачи событий, построенная по архитектуре клиент-сервер.
Конфигурация сервера хранится в соответствующем файле ksql-server.properties, а запуск сервера можно настроить как отдельный сервис. либо запускать его когда вам это нужно. Пример запуска сервера:
/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties
Подобно другим программным продуктам из экосистемы больших данных, ksqIDB можно запустить как в режиме standalone, так и сгруппировать несколько серверов вместе, т.е. здесь также возможно горизонтальное масштабирование. KsqIDB позволяет выполнять различные операции потоковой аналитики больших данных: фильтрация, соединения, агрегация, создание материализованных представлений, преобразования и сопоставления потоков событий с помощью типового инструментария SQL-запросов [3].
Пример запуска клиентской части ksqIDB:
/usr/bin/ksql http://0.0.0.0:8088
В открывшемся консольном приложении создаем стрим str_products
на основе topic-а tpk_products
CREATE OR REPLACE STREAM str_products (
shop VARCHAR
, name VARCHAR
, price DECIMAL(10,2)
) WITH (
KAFKA_TOPIC = 'tpk_products'
, VALUE_FORMAT = 'JSON'
);
Как раннее было показано в [4], объекты в ksqlDB можно создать таким образом, что результат работы объекта (в нашем случае это таблица) запишется в другой Kafka topic. Так, к примеру, в приведенном ниже листинге целевой topic – tpk_shops
. В нашем случае была создана таблица tbl_agg_products
так, чтобы выполнялось суммирование по полю price, также были добавлены дополнительные поля необходимые нам для сбора статических данных. Сама таблица будет содержать данные в AVRO формате, а вот конвертация данных из JSON в AVRO будет выполнена механизмом schema registry.
То, что ключи в topic могут повторяться (поле pkey
выполняет роль первичного ключа), не должно вводить в заблуждение. Строго говоря, Kafka позволяет как сохранять всю историю данных, так и только последнее актуальное значение value для каждого key. Более подробно на эту тему можно почитать здесь [5].
WINDOWSTART
и WINDOWEND
– временное окно «от и до» (в нашем случае это 10 минут). Нас будет интересовать ситуация, при которой total_sum
суммируется для каждого магазина за выбранный промежуток времени (внутри выбранного окна).
Чтобы преобразование в формат AVRO было возможным, необходимо установить и настроить такой продукт как schema-registry [4].
Теперь добавим необходимую нам схему для преобразования:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\": \"record\", \"name\": \"Shops\", \"fields\": [{\"name\": \"shop\", \"type\": \"string\"}, {\"name\": \"total_sum\", \"type\": \"string\"}, {\"name\": \"ts\", \"type\": \"string\"}, {\"name\": \"window_start\", \"type\": \"string\"}, {\"name\": \"window_end\", \"type\": \"string\"}]}"}' \
http://localhost:8081/subjects/tpk_shops/versions
Результат добавления схемы можем просмотреть в браузере по ссылке
http://localhost:8081/schemas/
Чтобы проверить как это работает добавим в topic tpk_products
какую-либо строку в формате JSON {"Shop":"Азбука", "name":"Книга записная", "price":"167"}
можем увидеть результат в формате AVRO в топике tpk_shops
, а поскольку результатом преобразования будут данные в формате AVRO, то и мы воспользуемся соответствующим AVRO консьюмером:
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic tpk_shops \
--property print.key=true --property print.value=true
--from-beginning \
--property schema.registry.url=http://localhost:8081
Получим следующий результат:
{
"SHOP":{"string":"Азбука"},
"TOTAL_SUM":{"string":"167.00"},
"TS":{"string":"2022-07-21 09:14:59"},
"WINDOW_START":{"string":"2022-07-21 09:10:00"},
"WINDOW_END":{"string":"2022-07-21 09:20:00"}
}
Как видно, здесь только value. Если хотите увидеть еще и key, то придётся добавить в листинг соответствующие ключи:
--print.key=true \
--key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Минусом такого подходя является то, что работа таблицы tbl_agg_products
построена так, что при каждом добавлении новой строки во входной topic (tpk_products
) сразу же добавляется новая строка в выходной (tpk_shops
). Таким образом, в tpk_shops мы будем видеть не только актуальный результат суммирования, но и все предыдущие результаты. Ниже, в таблице, приведен результат (tpk_shops
), добавления трёх одинаковых записей в topic tpk_products
, причем две из них попали в одно временное окно (первые две), а третья – в другое.
Азбука |
167.00 |
2022-07-21 09:14:59 |
2022-07-21 09:10:00 |
2022-07-21 09:20:00 |
Азбука |
334.00 |
2022-07-21 09:18:56 |
2022-07-21 09:10:00 |
2022-07-21 09:20:00 |
Азбука |
167.00 |
2022-07-21 09:24:50 |
2022-07-21 09:20:00 |
2022-07-21 09:30:00 |
Первая запись в таблице по сути является не нужной для статистики, однако является неизбежным результатом потоковой обработки данных. Проще говоря, было бы хорошо получить результат, при котором записи с одинаковым названием магазина и временным окном являлись бы уникальными. И такое решение есть – достаточно передать полученный результат в какую-либо таблицу базы данных, выстроив правильным образом уникальные ключи, а данные которые сейчас мы получили в topic tpk_shops
, считать промежуточным результатом. Следовательно, и сам topic можно настроить так, чтобы не хранить в нем записи сколь-нибудь значимый период времени, что сэкономит нам место на диске. Напомним, что параметр, ответственный за временное хранение – retention.ms
. Осталось решить вопрос – каким образом мы передадим данные в базу. Конечно, вариантов можно предложить достаточно много, но можно не придумывать какие-то сторонние решения, а остаться в рамках экосистемы Kafka. Для решения вопроса воспользуемся Kafka Sink Connector.
5. Sink Connector – инструмент для передачи данных в BD
Чтобы Kafka Connect
мог работать с нужными jar-файлами, необходимо развернуть архив в папку, на которую будет указывать параметр plugin.path
из соответствующего файла настроек (plugin.path=/usr/local/share/java
, см. пример файла connect-standalone.properties
ниже по тексту).
Нам также потребуется архив с jar-файлами для коннекта из Kafka к базе, Для этого возьмем архив с сайта Сonfluence [6]. Файл добавим по тому же пути, на который указывает plugin.path.
В качестве базы данных я выбрал MySQL, далее была создана база данных –STATS
, а ниже приведен листинг по созданию таблицы, в которую из нашего topic tpk_shops будут поступать данные:
CREATE TABLE stats.tbl_shops_stat (
shop VARCHAR(100)
, total_sum DECIMAL
, ts TIMESTAMP
, window_start TIMESTAMP
, window_end TIMESTAMP
, CONSTRAINT tbl_shop_pkey PRIMARY KEY (shop, window_start)
);
Для того, чтобы обеспечить подключение к базе MySQL, нам потребуется соответствующий jdbc драйвер, который можно взять с сайта [7]. (при выборе драйвера лучше выбирать вариант Platform Independent – это упростит установку). Скаченный драйвер был добавлен по пути, указанному в plugin.path.
Поскольку мы будем работать в standalone режиме, нам понадобится файл конфигурации рабочего процесса Kafka Connect (connect-standalone.properties) и файл конфигурации коннектора (jdbc-sink.properties
).
Пример запуска Kafka Connect в режиме Standalone.
/usr/local/kafka/bin/connect-standalone.sh /
~/kconf/connect-standalone.properties /
~/kconf/jdbc-sink.properties
Теперь давайте подробнее поговорим о самих конфигурационных файлах, так как они должны решать несколько задач.
Во-первых, необходимо объяснить коннектору что value в топике находятся в формате AVRO. Выбор этого формата неслучаен, как следует из документации [8] именно Avro Converter рекомендован для передачи данных в Kafka Connect. AVRO может быть легко использован для общего формата и обмена данными между системами. Это сводит к минимуму необходимость написания пользовательского кода и стандартизирует данные в гибком формате. Кроме того, мы получаем преимущества, которые дает на schema registry при преобразовании данных. А вот key значений мы не приводили к формату AVRO, следовательно, и указывать для него данный формат не нужно.
Итак, как было сказано выше, мы воспользуемся соответствующими ключами для преобразования:
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Здесь хорошо видно, что для преобразования пары key-value совсем не обязательно использовать один и тот же сериализатор.
Во-вторых, соединиться с базой.
Данные параметры мы пропишем в файле настройки - jdbc-sink.properties
Конфигурация MySQL сервера - url=jdbc:mysql://localhost:3306/stats
Логин - user=<user-name>
Пароль - password=<password>
И наконец, в третьих – особенности загрузки данных. Нам необходимо выполнить вставку данных в таблицу tbl_shops_stat
в режиме upsert
. Для тех, кто работает с базами данных, хорошо известен SQL оператор MERGE
. Несколько упрощая, данный оператор устроен следующим образом: если нет записи (нет совпадения по уникальным ключам), то выполняется insert строки, а если соответствующая запись обнаружена, то будет выполнен update для всех полей, которые не являются уникальными ключами. Upsert будет работать именно так, если запись с таким названием магазина и временным окном уже есть в таблице (SHOP,WINDOW_START
) – обновит поле total_sum
. Если такой записи нет – добавит новую.
Ниже приведу полный листинг файлов параметров:
1. connect-standalone.properties
bootstrap.servers=localhost:9092
offset.storage.file.filename=/tmp/connect.offsets
#Для key не было преобразования формата, по сути он остался стрингом
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
#А вот value было преобразовано в avro формат по схеме приведенной выше
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
value.converter.schemas.enable=true
errors.log.enable=true
errors.log.include.messages=true
print.key=true
errors.tolerance=all
topics=tpk_shops
offset.flush.interval.ms=1000
plugin.path=/usr/local/share/java
2. jdbc-sink.properties
name=jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors
topics=tpk_shops
connection.url=jdbc:mysql://localhost:3306/stats
connection.user=sky
connection.password=*****
connection.ds.pool.size=2
batch.size=1
table.name.format=stats.tbl_shops_stat
pk.mode=record_value
pk.fields=SHOP,WINDOW_START
insert.mode=upsert
6. Grafana – инструмент визуализации полученного результата
Данные, которые мы переложили в MySQL, теперь необходимо визуализировать. Для этого воспользуемся Grafana. Это инструмент с открытым исходным кодом, имеющий лицензию Apache 2.0, разработанный шведским разработчиком Torkel Ödegaard в 2014 году. Одна из причин популярности Grafana состоит в том, что разработчики могут создавать собственные информационные панели с панелями для различных источников данных.
Итак, чтобы продемонстрировать – как работает агрегирование данных в отдельный topic – проведем следующий эксперимент. В консольном окне, topic-а tpk_products введем данные в формате JSON. Причем данные для наглядности разделим на три группы:
Первая группа записей
{"Shop":"Азбука", "name":"Книга записная", "price":"165"}
{"Shop":"Азбука", "name":"Блокнот деловой", "price":"251"}
{"Shop":"Клякса", "name":"Бумага цветная", "price":"35"}
{"Shop":"Отличник", "name":"Калькулятор Brilliant", "price":"389"}
{"Shop":"Отличник", "name":"Фломастеры 24 цвета", "price":"206"}
{"Shop":"Перемена", "name":"Ламинатор Camel", "price":"355"}
{"Shop":"Перемена", "name":"Дырокол", "price":"110"}
Вторая группа записей (добавим с временной задержкой 10 минут)
{"Shop":"Азбука", "name":"Комплект из 2-х ручек", "price":"448"}
{"Shop":"Клякса", "name":"Набор цветной бумаги", "price":"39"}
{"Shop":"Клякса", "name":"Фотобумага глянцевая", "price":"51"}
{"Shop":"Отличник", "name":"Фломастеры 24 цвета", "price":"206"}
{"Shop":"Перемена", "name":"Степлер", "price":"45"}
Третья группа записей (добавим с временной задержкой 10 минут)
{"Shop":"Азбука", "name":"Ручка роллер", "price":"210"}
{"Shop":"Азбука", "name":"Глобус Bestar GOLD", "price":"290"}
{"Shop":"Клякса", "name":"Набор цветной бумаги", "price":"39"}
{"Shop":"Клякса", "name":"Бумага цветная", "price":"52"}
{"Shop":"Отличник", "name":"Клей-карандаш", "price":"126"}
Наши данные можно увидеть простым SQL-запросом к таблице tbl_agg_products.
В консоли KsqLDB выполним следующий запрос:
SELECT shop, total_sum, window_start, window_end
FROM TBL_AGG_PRODUCTS;
+--------+---------+-------------------+-------------------+
|SHOP |TOTAL_SUM|WINDOW_START |WINDOW_END |
+--------+---------+-------------------+-------------------+
|Азбука |416.00 |2022-07-26 23:20:00|2022-07-26 23:30:00|
|Азбука |448.00 |2022-07-26 23:30:00|2022-07-26 23:40:00|
|Азбука |500.00 |2022-07-26 23:40:00|2022-07-26 23:50:00|
|Клякса |35.00 |2022-07-26 23:20:00|2022-07-26 23:30:00|
|Клякса |90.00 |2022-07-26 23:30:00|2022-07-26 23:40:00|
|Клякса |91.00 |2022-07-26 23:40:00|2022-07-26 23:50:00|
|Отличник|595.00 |2022-07-26 23:20:00|2022-07-26 23:30:00|
|Отличник|206.00 |2022-07-26 23:30:00|2022-07-26 23:40:00|
|Отличник|126.00 |2022-07-26 23:40:00|2022-07-26 23:50:00|
|Перемена|465.00 |2022-07-26 23:20:00|2022-07-26 23:30:00|
|Перемена|45.00 |2022-07-26 23:30:00|2022-07-26 23:40:00|
Полученный результат в полной мере отражает ожидания, т.е. произведено суммирование по каждому магазину внутри каждого временного окна. Теперь посмотрим – что попало в MySQL таблицу:
Полученный результат представлен на скриншоте: видно, что механизм upsert при добавлении записей в базу затер промежуточные результаты и конечный результат полностью аналогичен тому, что мы видели при запросе к таблице TBL_AGG_PRODUCTS
.
Теперь отобразим его графически в Grafana. Для этого, настроив datasouce на базу MySQL, можем получить следующий графический результат:
Здесь хорошо видно, что один из магазинов резко уменьшил выручку, а позже и вовсе перестал отображаться на графике, что может свидетельствовать об обрыве связи или других проблемах в его работе. Получая информацию о состоянии продаж, практически в реальном времени мы можем максимально оперативно принимать меры, которые позволяет не допустить негативного развития событий.
Заключение
Экосистема Kafka на сегодняшний день обладает достаточно большим арсеналом инструментов, позволяющим выполнять широкий спектр задач в области ETL.Что примечательно, решение задач потокового ETL выполняется по сути без написания единой строки программного кода. Такое направление в разработке приято назвать «no coding». Данный подход действительно имеет целый ряд преимуществ [9] и сторонников, а фраза «The future of coding is no coding at all» принадлежит Крису Ванстрату, соучредителю и бывшему генеральному директору GitHub.
В статье были продемонстрированы возможности такого компонента экосистемы Kafka как Kafka Connect. Он обеспечивает быструю интеграцию кластера Kafka с внешними источниками и приемниками данных с минимальными затратами на разработку. Его ключевым преимуществом является возможность повторного использования коннекторов, которые действуют как плагины между Kafka и внешними источниками данных или приемниками.
Процесс добавления мониторинга в уже имеющуюся систему не требует хоть сколько ни будь существенного дискового пространства, поскольку сохранять данные в topic-е для агрегации данных можно весьма непродолжительное время, лишь незначительно превосходящее период, указанный при создании таблицы агрегации. Сами же данные в базе могут обновляться не так часто, как показано в примере. К тому же, в потоке для мониторинга нет необходимости хранить большинство полей, которые имеют место при передачи данных в основном потоке. Таким образом, нагрузка на базу (в нашем случае это MySQL) также не должна приводить к существенному росту нагрузки на систему в целом.
Среди положительных сторон такого подхода следует выделить максимальную оперативность получения данных, наглядность и отсутствие необходимости писать программный код.
Источники
[2] Apache Kafka – Краткое руководство
[3] ksqlDB
[4] ksqlDb или SQL как инструмент обработки потоков данных
[5] PVbase: compacted topic в Apache Kafka
[6] JDBC Connector (Source and Sink)
[7] Connector/J
[8] Installing and Configuring Kafka Connect
[9] Движение No-code — конец программистов? Разбираем плюсы и минусы