Для надежной поточной обработки данных в реальном времени и принятия решений на основе анализа данных из внешнего источника нужно обеспечить организацию конвейера обработки и хранения данных, который может быть кластеризирован и распараллелен для достижения необходимой производительности и отказоустойчивости. Кроме того, нужно обеспечить механизм своевременной доставки обновленных данных (на основе периодического опроса или использования Web Sockets/SSE) в систему анализа, которая также должна иметь доступ к истории изменений (например, для анализа тренда или получения усредненных значений по временному окну). В этой статье мы поговорим про использование Apache Kafka совместно с Hazelcast для анализа данных в реальном времени, а также разработаем коннектор для Kafka Connect для извлечения данных из внешнего источника (на примере WeatherStack API)
Apache Kafka - распределенная система хранения истории событий, оптимизированная для быстрого последовательного чтения. Единицей хранения в Kafka является сообщение (message), которое является частью темы (topic). Topic может быть представлен несколькими разделами (partition) для отказоустойчивости и производительности. Apache Kafka может быть запущен в режиме кластера, при этом для хранения топологии используется либо внешний Apache Zookeeper, либо внутренние механизмы хранения и протокол KRaft (подробнее этот вопрос был разобран в этой статье). Для каждого топика может быть заданы свои настройки по репликации, времени хранения истории сообщений, правил сохранения снимка в долговременную память и др. Отправку данных в topic
выполняют producer
по сетевому протоколу Kafka (через совместимую библиотеку), либо может использоваться коннектор в отдельном процессе Kafka Connect, который периодически опрашивается и получает данные из внешнего источника. Также Kafka Connect может использоваться для обработки поточных данных (между топиками) и для выгрузки сообщений во внешнюю систему (Sink Connector
). Сообщения извлекаются процессами consumer
по сетевому протоколу, при этом используется pull-модель (consumer сам периодически запрашивает Kafka о новых сообщениях).
Мы будем использовать развертывание Apache Kafka и остальных компонентов в Docker, и тут важным является корректное указание Advertised host (должны совпадать с сетевым именем контейнера внутри сети, поскольку именно оно будет возвращаться как адрес для установки сетевого подключения для отправки/извлечения сообщений). Наше решение будет состоять из нескольких компонентов:
Коннектор для извлечения данных из внешнего API
Apache Kafka для хранения истории ответов из внешнего API
Hazelcast для обработки сообщений в реальном времени
Общая архитектура решения представлена на рисунке:
Hazelcast может рассматриваться как среда для выполнения конвейерной обработки данных (может работать как с Kafka, так и с другими источниками данных, например Amazon Kinesis, Apache Pulsar или поток изменений, полученных через подключение CDC (Change Data Capture, например Debezium) над реляционными базами данных MySQL/PostgreSQL или NoSQL MongoDB, а также при наблюдении за файлами. Данные для обработки загружаются в оперативную память и могут быть проанализированы через встроенный SQL-подобный запрос (ориентирован на выполнение агрегаций внутри скользящего окна) или с использованием кода (может быть написан на Java/Kotlin, C++, .Net, Python, Node.JS, Go). Во втором случае обработка конвейера определяется через преобразования потока и операции группировки:
преобразования:
distinct
,sort
,map
,filter
,flatMap
,join
,merge
,mergeUsingService
/mergeUsingServiceAsync
(через внешний сервис),mapUsingReplicatedMap
(через сохраненный key-value внутри Hazelcast);агрегации:
aggregate
(например среднее значение, наименьшее-наибольшее, тренд, применяются к скользящему окну или группе),window
/slidingWindow
(определение окна, состоящего из N последних замеров).сохранения в sink (например в лог, базу данных, файл, ElasticSearch и др.):
writeTo
Начнем с создания конфигурации для запуска Apache Kafka в Docker Compose:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
Здесь мы регистрируем имя kafka и порт 9092 для взаимодействия с Kafka из внешних контейнеров внутри сети. Также нам нужно будет создать несколько очередей, для хранения служебной информации для Kafka Connect (актуальные смещения, конфигурация и состояние передачи данных). Для этого добавим еще один контейнер, который будет выполнять подготовку Kafka при первом запуске:
init-kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
kafka-topics --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect_config --replication-factor 1 --partitions 1 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect_offset --replication-factor 1 --partitions 1 --config cleanup.policy=compact
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect_status --replication-factor 1 --partitions 1 --config cleanup.policy=compact
# в эту очередь будут добавляться новые извлеченные из API данные
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic weather --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:9092 --list
Теперь добавим еще один контейнер для управления коннекторами, которые будут выполнять извлечение данных из внешнего источника. Мы будем использовать вариант контейнера от confluentinc
, который также поддерживает управление коннекторами через Confluent Hub. Коннекторы могут решать следующие задачи:
source
- извлечение данных из внешнего источника (расширение классаSourceConnector
)sink
- отправка данных во внешнюю систему (SinkConnector
)transform
- изменение данных (Transformer
)converter
- сериализации-десериализация сообщений (например, в Avro или JSON, с использованием схемы из Kafka Schema Registry).
В этой статье мы рассмотрим только первый тип коннектора. Для разработки будем использовать Kotlin и начнем с добавления необходимых зависимостей - поддержку API Kafka Connect, сериализацию KotlinX Serialization, Ktor Client для взаимодействия с API, а также библиотеку для логирования Kotlin Logging и ShadowJar (последняя необходима для интеграции Kotlin Runtime в единый JAR с коннектором для корректного запуска в JVM):
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
plugins {
kotlin("jvm") version "1.8.21"
id("com.github.johnrengelman.shadow") version "8.1.1"
kotlin("plugin.serialization") version "1.8.21"
}
group = "tech.dzolotov"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("org.apache.kafka:connect-api:3.4.1")
implementation("io.ktor:ktor-client-core:2.3.1")
implementation("io.ktor:ktor-client-cio:2.3.1")
implementation("io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1")
}
Поскольку внешний API требует авторизации, получим токен после регистрации на https://weatherstack.com (может использоваться бесплатно с ограничениями). Для запроса актуальных данных будем использовать REST API: https://api.weatherstack.com/current?access_key=TOKEN&query=Moscow, схема данных описана здесь.
Создадим класс для модели данных (возьмем только некоторые из полей):
@Serializable
class WeatherData(val current: CurrentWeather)
@Serializable
class CurrentWeather(
val observation_time: String,
val temperature: Double,
val wind_speed: Double,
val wind_dir: String,
val pressure: Double,
)
Для определения коннектора сначала договоримся об используемой конфигурации, она будет использоваться при запуске экземпляра для настройки извлечения данных. В нашем случае для коннектора необходимо задать три значения:
адрес для подключения к внешнему API (вместе с токеном)
интервал периодического опроса API
название Kafka Topic, куда будут отправлять данные
Конфигурация задается через типизированные параметры, которые в коде определяются через Builder-класс ConfigDef
(org.apache.kafka.common.config.ConfigDef
). Для удобства объединим все константы с названиями параметров конфигурации и определение объекта конфигурации в общем singleton-объекте ApiConnectConfig
:
object ApiConnectConfig {
const val VERSION = "1.0.0"
const val TOPIC_CONFIG = "topic"
const val API_URL_CONFIG = "apiUrl"
const val PERIODIC_POLL = "periodicPoll"
var topic: String? = null
var apiUrl: String? = null
var periodicPoll: Int? = null
val config: ConfigDef = ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic name")
.define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "API Endpoint URL")
.define(PERIODIC_POLL, ConfigDef.Type.INT, 60, ConfigDef.Importance.HIGH, "Polling interval in seconds")
}
В последнем определении 60 - это значение по умолчанию для параметра periodicPoll
. Представленная схема данных автоматически валидируется в момент регистрации коннектора, но при необходимости может быть создан специальный валидатор как реализация интерфейса ConfigDef.Validator
для проверки корректности переданных значений (автоматически проверяются только типы значений и их наличие, если не указано значение по умолчанию)
Создадим реализацию абстрактного класса SourceConnector
:
class ApiSourceConnector : SourceConnector() {
val logger = KotlinLogging.logger("ApiSourceConnector")
//версия коннектора
override fun version() = ApiConnectConfig.VERSION
// конфигурация коннектора
override fun config() = ApiConnectConfig.config
//класс обработчик
override fun taskClass() = ApiSourceTask::class.java
override fun start(props: MutableMap<String, String>?) {
//разбираем конфиг
}
override fun stop() {
// при удалении регистрации коннектора
logger.info { "Stopping connector" }
}
override fun taskConfigs(maxTasks: Int): Map<String,String> {
//создание конфигурации для задачи
}
}
Здесь в методе start
нужно будет разобрать Key-Value Map
с параметрами запуска коннектора. Коннектор при запуске создает объект класса SourceTask
и будет использовать эту конфигурацию при инициализации, которая будет создаваться в методе taskConfigs
. Для удобства объединим все методы для преобразования Map
в конфигурацию, создание Map
для SourceTask
и извлечение отдельных значений конфигурации в объекте ApiConnectConfig
, для этого добавим в существующий объект:
object ApiConnectConfig
const val VERSION = "1.0.0"
const val TOPIC_CONFIG = "topic"
const val API_URL_CONFIG = "apiUrl"
const val PERIODIC_POLL = "periodicPoll"
var topic: String? = null
var apiUrl: String? = null
var periodicPoll: Int? = null
val config: ConfigDef = ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic name")
.define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "URL Name")
.define(PERIODIC_POLL, ConfigDef.Type.INT, 60, ConfigDef.Importance.HIGH, "Polling interval in seconds")
fun buildFromMap(map: Map<String, String>) {
val parsed = AbstractConfig(config, map)
topic = parsed.getString(TOPIC_CONFIG)
apiUrl = parsed.getString(API_URL_CONFIG)
periodicPoll = parsed.getInt(PERIODIC_POLL)
}
fun toMap(): Map<String, String> {
return mapOf(
TOPIC_CONFIG to topic.orEmpty(),
API_URL_CONFIG to apiUrl.orEmpty(),
PERIODIC_POLL to periodicPoll?.toString().orEmpty()
)
}
fun getTopic(config: Map<String, String>) = config[TOPIC_CONFIG]
fun getApiUrl(config: Map<String, String>) = config[API_URL_CONFIG]
fun getPeriodicPoll(config: Map<String, String>) = config[PERIODIC_POLL]?.toIntOrNull() ?: 60
}
Теперь мы может сделать реализацию методов start
и taskConfigs
в ApiSourceConnector
:
override fun start(props: MutableMap<String, String>?) {
//разбираем конфиг
ApiConnectConfig.buildFromMap(props ?: mapOf())
logger.info {
"Starting connector for ${ApiConnectConfig.topic}, URL: ${ApiConnectConfig.apiUrl}, Polling interval: ${ApiConnectConfig.periodicPoll}"
}
}
override fun taskConfigs(maxTasks: Int) = listOf(ApiConnectConfig.toMap())
Дальше мы можем создать реализацию абстрактного интерфейса SourceTask
, которая должна реализовать следующие методы:
start
- конфигурация коннектора (принимаетMap
, который создается вtaskConfigs
)stop
- очистка ресурсов после отключения коннектораpoll
- извлечение списка новых сообщений (может вернутьnull
, если сообщений нет)
Добавим использование Channel
для передачи извлеченных данных в метод poll
. Сам периодический опрос мы будем делать через flow
(но можно использовать и любой механизм запуска задач через интервал времени).
class ApiSourceTask : SourceTask() {
//Клиент для запросов к REST
val httpClient = HttpClient(CIO)
//Логирование
val logger = KotlinLogging.logger("ApiSourceTask")
//Канал для передачи извлеченных данных
val channel = Channel<SourceRecord>(capacity = 1)
//Версия task'а
override fun version() = ApiConnectConfig.VERSION
fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow {
delay(initialDelay)
while (true) {
emit(Unit)
delay(period)
}
}
private val json = Json { ignoreUnknownKeys = true }
// Извлекаем данные из REST API
suspend fun getWeather(url: String): WeatherData {
val response = httpClient.get(url)
return Json.decodeFromString(response.bodyAsText())
}
// Запуск извлечения данных
override fun start(props: MutableMap<String, String>?) {
val config = props ?: mapOf()
// Получаем конфигурацию
val topic = ApiConnectConfig.getTopic(config)
val apiUrl = ApiConnectConfig.getApiUrl(config)
val periodicPoll = ApiConnectConfig.getPeriodicPoll(config)
logger.info { "Start api connect task" }
//Запускаем периодический опрос
CoroutineScope(Dispatchers.IO).launch {
tickerFlow(periodicPoll.seconds.toJavaDuration()).flowOn(Dispatchers.IO).collect {
logger.info { "Polling element " }
val weather = getWeather(apiUrl.orEmpty())
//Сохраняем извлеченные данные
//Здесь первые два аргумента - метаданные
//Затем название topic для отправки сообщений
//Следующий аргумент - идентификатор раздела (может быть не указан)
//Следующие два аргумента - схема и содержание ключа
//И последние два - схема и содержание значения
channel.send(
SourceRecord(
mapOf("domain" to "weather"), //extracted data
mapOf("dt" to LocalTime.now().toString()), //timestamp
topic, //topic name
null, //partition
Schema.OPTIONAL_STRING_SCHEMA,
weather.current.observation_time, //key
Schema.OPTIONAL_STRING_SCHEMA,
Json.encodeToString(weather.current), //value
)
)
}
}
}
// При отключении коннектора отключаем канал
override fun stop() {
channel.close()
logger.info { "Stop" }
}
// При опросе возвращаем значение, если оно есть в канале или null для пропуска опроса
override fun poll(): MutableList<SourceRecord>? = runBlocking {
return@runBlocking try {
mutableListOf<SourceRecord>(channel.receiveCatching().getOrThrow())
} catch (e: Exception) {
null
}
}
}
Теперь, когда код коннектора подготовлен, создадим jar-файл. При компиляции нужно использовать Java 11 для совместимости с JRE внутри контейнера Confluent Kafka Connect.
./gradlew shadowJar
И добавим к стеку Docker Compose запуск Kafka Connect:
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
depends_on:
- init-kafka
environment:
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_GROUP_ID: weather
CONNECT_CONFIG_STORAGE_TOPIC: connect_config
CONNECT_OFFSET_STORAGE_TOPIC: connect_offset
CONNECT_STATUS_STORAGE_TOPIC: connect_status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_TOPIC_CREATION_ENABLE: false
ports:
- 8083:8083
volumes:
- /tmp/KafkaConnector.jar:/etc/kafka-connect/jars/KafkaConnector.jar
command:
- bash
- -c
- confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0 && /etc/confluent/docker/run && sleep infinity
Здесь в /tmp/KafkaConnector.jar скопирован результат сборки jar-файла из build/libs
. Также здесь приведен пример установки коннектора из Confluent Hub. Добавление jar-файла недостаточно для регистрации и необходимо после завершения инициализации Kafka Connect обратиться к REST API и зарегистрировать коннектор, при этом в POST-запросе кроме названия класса коннектора также необходимо передать значения для параметров конфигурации и используемые сериализаторы. Нам понадобится использовать json-конфигурацию для регистрации коннектора:
{
"name": "poll",
"config": {
"connector.class": "tech.dzolotov.kafka.connector.poll.ApiSourceConnector",
"topic": "weather",
"apiUrl": "https://api.weatherstack.com/current?access_key=TOKEN&query=Moscow",
"periodicPoll": "30",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Выполним запрос к Kafka Connect:
curl -X POST -d @register.json http://localhost:8083/connectors --header "Content-Type: application/json"
В логах контейнера Kafka Connect мы можем увидеть, что опрос API начался (с интервалом в 30 секунд). Увидеть полученные сообщения мы можем через запрос внутри Kafka-контейнера в консольной consumer-утилите Kafka:
kafka-console-consumer --bootstrap-server kafka:9092 --topic weather
Теперь перейдем к запуску Hazelcast и созданию конвейера для обработки данных. Hazelcast не требует запуска отдельного конфигурирования и запускается из образа контейнера hazelcast/hazelcast
. Приложение, определяющее конвейер обработки может инициировать запрос создания нового сервера (на следующем по увеличению номера порта). Для отправки задания можно будет просто запустить процесс приложения, который будет взаимодействовать через Hazelcast Jet, либо использовать утилиту командной строки hz-cli
и команду submit
.
Для управления конвейером используются классы из com.hazelcast.jet.pipeline
(нам понадобится создать и наполнить Pipeline) и использовать встроенный Sink
для отправки сообщений в лог. Подключение к Hazelcast будем выполнять через com.hazelcast.core.Hazelcast
. Для указания источника будем использовать com.hazelcast.jet.kafka.KafkaSources
.
package tech.dzolotov.hazelcast
import com.hazelcast.com.fasterxml.jackson.core.util.RequestPayload
import com.hazelcast.core.Hazelcast
import com.hazelcast.jet.config.JobConfig
import com.hazelcast.jet.kafka.KafkaSources
import com.hazelcast.jet.pipeline.Pipeline
import com.hazelcast.jet.pipeline.Sinks
import com.hazelcast.jet.pipeline.WindowDefinition.sliding
import com.hazelcast.nonapi.io.github.classgraph.json.JSONDeserializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.*
fun main(args: Array<String>) {
JetJob.apply()
}
@Serializable
class CurrentWeatherData(val payload: String)
@Serializable
data class CurrentWeather(
val observation_time: String,
val temperature: Double,
val wind_speed: Double,
val wind_dir: String,
val pressure: Double,
)
class JetJob {
companion object {
private val json = Json { ignoreUnknownKeys = true }
fun apply() {
val p = Pipeline.create()
val kafkaSource = KafkaSources.kafka<String, String>(kafkaProps(), "weather")
val window = p.readFrom(kafkaSource).withNativeTimestamps(0).window(sliding(1,1))
window.distinct()
.apply {
it.map {
val payload =
json.decodeFromString<CurrentWeatherData>(it.result().value).payload //здесь в payload будет json-строка
json.decodeFromString<CurrentWeather>(payload)
}
}
.writeTo(Sinks.logger {
"Get json " + it
})
val cfg = JobConfig().setName("hazelcast-weather")
Hazelcast.bootstrappedInstance().jet.newJob(p, cfg)
}
private fun kafkaProps(): Properties {
val props = Properties()
props.setProperty("bootstrap.servers", "kafka:9092")
props.setProperty("key.deserializer", StringDeserializer::class.java.name)
props.setProperty("value.deserializer", StringDeserializer::class.java.name)
props.setProperty("auto.offset.reset", "earliest")
return props
}
}
}
При подключении к Kafka здесь указана политика "чтения всех сообщений, начиная с самого старого", но это может быть изменено и, например, можно получать только новые сообщения, которые этот consumer
ранее еще не получал. Также здесь мы не используем возможности окна (размер =1) и извлекаем последнее полученное сообщение. Для декодирования JSON-сообщения также можно использовать конвейнерный метод map
для извлечения значения температуры:
val window = p.readFrom(kafkaSource)
.withNativeTimestamps(0)
.window(sliding(1, 1))
.streamStage()
.map {
val payload =
json.decodeFromString<CurrentWeatherData>(it.value).payload //здесь в payload будет json-строка
json.decodeFromString<CurrentWeather>(payload).temperature
}.writeTo(Sinks.logger {
"Temperature $it"
})
Альтернативно можно было бы получить среднее значение за последние 10 измерений температуры с использованием скользящего окна. Для этого мы можем определить свою функцию-агрегатор, которая будет принимать входной набор данных из Kafka (Map.Entry<String,String>
) и возвращать среднее значение температуры среди переданных значений. Определение агрегатора выполняется через builder-методы от AggregateOperation с фазой инициализации (whenCreate)
, последовательных операций накопления (с каждым значением из списка через andAccumulate
) и обобщения (andExportFinish
), что по сути является определением Reducer в подходе Map-Reduce. В нашем случае функция агрегации может выглядеть следующим образом:
//принимаем Map.Entry<String,String>, аккумулятор будет Double, результат Double (средняя)
val avgTemperatureOp = com.hazelcast.jet.aggregate.AggregateOperation.withCreate {
listOf(
DoubleAccumulator(0.0),
DoubleAccumulator(0.0)
)
}.andAccumulate<Map.Entry<String,String>> { acc, data ->
val payload =
Json.decodeFromString<CurrentWeatherData>(data.value).payload //здесь в payload будет json-строка
val temperature = Json.decodeFromString<CurrentWeather>(payload).temperature
acc[0].accumulate(temperature)
acc[1].accumulate(1.0)
}.andExportFinish { acc ->
acc[0].export() / acc[1].export()
}
Тогда конвейер, который будет получать последние 10 измерений (со сдвигом на единицу при появлении новых данных) может выглядеть так:
val p = Pipeline.create()
val kafkaSource = KafkaSources.kafka<String, String>(kafkaProps(), "weather")
p.readFrom(kafkaSource).withNativeTimestamps(0).window(sliding(10, 1))
.aggregate(avgTemperatureOp)
.writeTo(Sinks.logger {
"Temperature $it"
})
При запуске используется активный экземпляр Hazelcast. Для сборки также будем использовать Java 11 и собирать через ShadowJar
и будем использовать плагин application
для определения класса с функцией main
.
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
plugins {
kotlin("jvm") version "1.8.21"
id("com.github.johnrengelman.shadow") version "8.1.1"
kotlin("plugin.serialization") version "1.8.21"
application
}
group = "tech.dzolotov.hazelcast"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib:1.8.21")
implementation("com.hazelcast:hazelcast:5.2.3")
implementation("com.hazelcast.jet:hazelcast-jet-kafka:5.2.3")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1")
testImplementation(kotlin("test"))
}
project.setProperty("mainClassName", "tech.dzolotov.hazelcast.MainKt")
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(11)
}
Для запуска приложения в Hazelcast можно переслать jar-файл в контейнер и запустить там команду hz-cli submit
:
docker cp HZTest-1.0-SNAPSHOT-all.jar kafka-hazelcast-1:/tmp
docker exec -it kafka-hazelcast-1 hz-cli submit /tmp/HZTest-1.0-SNAPSHOT-all.jar
При запуске через submit код процесса будет запускаться внутри JVM Hazelcast и результаты выполнения можно увидеть через логи контейнера:
docker logs -f kafka-hazelcast-1
Таким образом, сочетание Kafka Connect (извлечение из API) + Kafka (для долговременного хранения истории изменения значений) + Hazelcast (анализ потока событий из Kafka и выполнение Filter-Map-Reduce преобразований) может быть использовано для решения задач анализа потока входных данных в реальном времени для любого источника данных (включая реляционные базы данных, текстовые файлы или любые другие источники, для которых может быть сформирован поток изменений и загружен в topic Kafka через Kafka Connect.
Во второй части статьи мы разберемся с созданием Sink Connector для передачи сообщений во внешнюю систему, а также посмотрим на возможности, доступные нам при реализации трансформеров для Kafka Connect.
А прямо сейчас хочу пригласить вас на бесплатный урок курса Apache Kafka, на котором вы узнаете про особенности Kafka и ее устройство, познакомитесь с основными утилитами, рассмотрите базовое АПИ для работы с Kafka.
alexhott
В реальном времени и наличие очереди в одном предложении, я привык что или то или то.