Для надежной поточной обработки данных в реальном времени и принятия решений на основе анализа данных из внешнего источника нужно обеспечить организацию конвейера обработки и хранения данных, который может быть кластеризирован и распараллелен для достижения необходимой производительности и отказоустойчивости. Кроме того, нужно обеспечить механизм своевременной доставки обновленных данных (на основе периодического опроса или использования Web Sockets/SSE) в систему анализа, которая также должна иметь доступ к истории изменений (например, для анализа тренда или получения усредненных значений по временному окну). В этой статье мы поговорим про использование Apache Kafka совместно с Hazelcast для анализа данных в реальном времени, а также разработаем коннектор для Kafka Connect для извлечения данных из внешнего источника (на примере WeatherStack API)

Apache Kafka - распределенная система хранения истории событий, оптимизированная для быстрого последовательного чтения. Единицей хранения в Kafka является сообщение (message), которое является частью темы (topic). Topic может быть представлен несколькими разделами (partition) для отказоустойчивости и производительности. Apache Kafka может быть запущен в режиме кластера, при этом для хранения топологии используется либо внешний Apache Zookeeper, либо внутренние механизмы хранения и протокол KRaft (подробнее этот вопрос был разобран в этой статье). Для каждого топика может быть заданы свои настройки по репликации, времени хранения истории сообщений, правил сохранения снимка в долговременную память и др. Отправку данных в topic выполняют producer по сетевому протоколу Kafka (через совместимую библиотеку), либо может использоваться коннектор в отдельном процессе Kafka Connect, который периодически опрашивается и получает данные из внешнего источника. Также Kafka Connect может использоваться для обработки поточных данных (между топиками) и для выгрузки сообщений во внешнюю систему (Sink Connector). Сообщения извлекаются процессами consumer по сетевому протоколу, при этом используется pull-модель (consumer сам периодически запрашивает Kafka о новых сообщениях).

Мы будем использовать развертывание Apache Kafka и остальных компонентов в Docker, и тут важным является корректное указание Advertised host (должны совпадать с сетевым именем контейнера внутри сети, поскольку именно оно будет возвращаться как адрес для установки сетевого подключения для отправки/извлечения сообщений). Наше решение будет состоять из нескольких компонентов:

  1. Коннектор для извлечения данных из внешнего API

  2. Apache Kafka для хранения истории ответов из внешнего API

  3. Hazelcast для обработки сообщений в реальном времени

Общая архитектура решения представлена на рисунке:

Общая архитектура решения
Общая архитектура решения

Hazelcast может рассматриваться как среда для выполнения конвейерной обработки данных (может работать как с Kafka, так и с другими источниками данных, например Amazon Kinesis, Apache Pulsar или поток изменений, полученных через подключение CDC (Change Data Capture, например Debezium) над реляционными базами данных MySQL/PostgreSQL или NoSQL MongoDB, а также при наблюдении за файлами. Данные для обработки загружаются в оперативную память и могут быть проанализированы через встроенный SQL-подобный запрос (ориентирован на выполнение агрегаций внутри скользящего окна) или с использованием кода (может быть написан на Java/Kotlin, C++, .Net, Python, Node.JS, Go). Во втором случае обработка конвейера определяется через преобразования потока и операции группировки:

  • преобразования: distinct, sort, map, filter, flatMap, join, merge, mergeUsingService/mergeUsingServiceAsync (через внешний сервис), mapUsingReplicatedMap (через сохраненный key-value внутри Hazelcast);

  • агрегации: aggregate (например среднее значение, наименьшее-наибольшее, тренд, применяются к скользящему окну или группе), window/slidingWindow (определение окна, состоящего из N последних замеров).

  • сохранения в sink (например в лог, базу данных, файл, ElasticSearch и др.): writeTo

Начнем с создания конфигурации для запуска Apache Kafka в Docker Compose:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

Здесь мы регистрируем имя kafka и порт 9092 для взаимодействия с Kafka из внешних контейнеров внутри сети. Также нам нужно будет создать несколько очередей, для хранения служебной информации для Kafka Connect (актуальные смещения, конфигурация и состояние передачи данных). Для этого добавим еще один контейнер, который будет выполнять подготовку Kafka при первом запуске:

init-kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - kafka
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      kafka-topics --bootstrap-server kafka:9092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect_config --replication-factor 1 --partitions 1 --config cleanup.policy=compact
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect_offset --replication-factor 1 --partitions 1 --config cleanup.policy=compact
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic connect_status --replication-factor 1 --partitions 1 --config cleanup.policy=compact
      # в эту очередь будут добавляться новые извлеченные из API данные
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic weather --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server kafka:9092 --list

Теперь добавим еще один контейнер для управления коннекторами, которые будут выполнять извлечение данных из внешнего источника. Мы будем использовать вариант контейнера от confluentinc, который также поддерживает управление коннекторами через Confluent Hub. Коннекторы могут решать следующие задачи:

  • source - извлечение данных из внешнего источника (расширение класса SourceConnector)

  • sink - отправка данных во внешнюю систему (SinkConnector)

  • transform - изменение данных (Transformer)

  • converter - сериализации-десериализация сообщений (например, в Avro или JSON, с использованием схемы из Kafka Schema Registry).

В этой статье мы рассмотрим только первый тип коннектора. Для разработки будем использовать Kotlin и начнем с добавления необходимых зависимостей - поддержку API Kafka Connect, сериализацию KotlinX Serialization, Ktor Client для взаимодействия с API, а также библиотеку для логирования Kotlin Logging и ShadowJar (последняя необходима для интеграции Kotlin Runtime в единый JAR с коннектором для корректного запуска в JVM):

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
    kotlin("jvm") version "1.8.21"
    id("com.github.johnrengelman.shadow") version "8.1.1"
    kotlin("plugin.serialization") version "1.8.21"
}

group = "tech.dzolotov"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
    implementation("org.apache.kafka:connect-api:3.4.1")
    implementation("io.ktor:ktor-client-core:2.3.1")
    implementation("io.ktor:ktor-client-cio:2.3.1")
    implementation("io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29")
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1")
}

Поскольку внешний API требует авторизации, получим токен после регистрации на https://weatherstack.com (может использоваться бесплатно с ограничениями). Для запроса актуальных данных будем использовать REST API: https://api.weatherstack.com/current?access_key=TOKEN&query=Moscow, схема данных описана здесь.

Создадим класс для модели данных (возьмем только некоторые из полей):

@Serializable
class WeatherData(val current: CurrentWeather)

@Serializable
class CurrentWeather(
    val observation_time: String,
    val temperature: Double,
    val wind_speed: Double,
    val wind_dir: String,
    val pressure: Double,
)

Для определения коннектора сначала договоримся об используемой конфигурации, она будет использоваться при запуске экземпляра для настройки извлечения данных. В нашем случае для коннектора необходимо задать три значения:

  • адрес для подключения к внешнему API (вместе с токеном)

  • интервал периодического опроса API

  • название Kafka Topic, куда будут отправлять данные

Конфигурация задается через типизированные параметры, которые в коде определяются через Builder-класс ConfigDef (org.apache.kafka.common.config.ConfigDef). Для удобства объединим все константы с названиями параметров конфигурации и определение объекта конфигурации в общем singleton-объекте ApiConnectConfig:

object ApiConnectConfig {

    const val VERSION = "1.0.0"
    const val TOPIC_CONFIG = "topic"
    const val API_URL_CONFIG = "apiUrl"
    const val PERIODIC_POLL = "periodicPoll"

    var topic: String? = null
    var apiUrl: String? = null
    var periodicPoll: Int? = null

    val config: ConfigDef = ConfigDef()
        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic name")
        .define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "API Endpoint URL")
        .define(PERIODIC_POLL, ConfigDef.Type.INT, 60, ConfigDef.Importance.HIGH, "Polling interval in seconds")
}

В последнем определении 60 - это значение по умолчанию для параметра periodicPoll. Представленная схема данных автоматически валидируется в момент регистрации коннектора, но при необходимости может быть создан специальный валидатор как реализация интерфейса ConfigDef.Validator для проверки корректности переданных значений (автоматически проверяются только типы значений и их наличие, если не указано значение по умолчанию)

Создадим реализацию абстрактного класса SourceConnector:

class ApiSourceConnector : SourceConnector() {
    val logger = KotlinLogging.logger("ApiSourceConnector")

    //версия коннектора
    override fun version() = ApiConnectConfig.VERSION

    // конфигурация коннектора
    override fun config() = ApiConnectConfig.config

    //класс обработчик
    override fun taskClass() = ApiSourceTask::class.java

    override fun start(props: MutableMap<String, String>?) {
        //разбираем конфиг
    }

    override fun stop() {
        // при удалении регистрации коннектора
        logger.info { "Stopping connector" }
    }

    override fun taskConfigs(maxTasks: Int): Map<String,String> {
    //создание конфигурации для задачи
    }
}

Здесь в методе start нужно будет разобрать Key-Value Map с параметрами запуска коннектора. Коннектор при запуске создает объект класса SourceTask и будет использовать эту конфигурацию при инициализации, которая будет создаваться в методе taskConfigs. Для удобства объединим все методы для преобразования Map в конфигурацию, создание Map для SourceTask и извлечение отдельных значений конфигурации в объекте ApiConnectConfig, для этого добавим в существующий объект:

object ApiConnectConfig
    const val VERSION = "1.0.0"
    const val TOPIC_CONFIG = "topic"
    const val API_URL_CONFIG = "apiUrl"
    const val PERIODIC_POLL = "periodicPoll"

    var topic: String? = null
    var apiUrl: String? = null
    var periodicPoll: Int? = null

    val config: ConfigDef = ConfigDef()
        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic name")
        .define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "URL Name")
        .define(PERIODIC_POLL, ConfigDef.Type.INT, 60, ConfigDef.Importance.HIGH, "Polling interval in seconds")

    fun buildFromMap(map: Map<String, String>) {
        val parsed = AbstractConfig(config, map)
        topic = parsed.getString(TOPIC_CONFIG)
        apiUrl = parsed.getString(API_URL_CONFIG)
        periodicPoll = parsed.getInt(PERIODIC_POLL)
    }

    fun toMap(): Map<String, String> {
        return mapOf(
            TOPIC_CONFIG to topic.orEmpty(),
            API_URL_CONFIG to apiUrl.orEmpty(),
            PERIODIC_POLL to periodicPoll?.toString().orEmpty()
        )
    }

    fun getTopic(config: Map<String, String>) = config[TOPIC_CONFIG]

    fun getApiUrl(config: Map<String, String>) = config[API_URL_CONFIG]

    fun getPeriodicPoll(config: Map<String, String>) = config[PERIODIC_POLL]?.toIntOrNull() ?: 60
}

Теперь мы может сделать реализацию методов start и taskConfigs в ApiSourceConnector:

override fun start(props: MutableMap<String, String>?) {
        //разбираем конфиг
        ApiConnectConfig.buildFromMap(props ?: mapOf())
        logger.info { 
            "Starting connector for ${ApiConnectConfig.topic}, URL: ${ApiConnectConfig.apiUrl}, Polling interval: ${ApiConnectConfig.periodicPoll}" 
        }
    }

    override fun taskConfigs(maxTasks: Int) = listOf(ApiConnectConfig.toMap())

Дальше мы можем создать реализацию абстрактного интерфейса SourceTask, которая должна реализовать следующие методы:

  • start - конфигурация коннектора (принимает Map, который создается в taskConfigs)

  • stop - очистка ресурсов после отключения коннектора

  • poll - извлечение списка новых сообщений (может вернуть null, если сообщений нет)

Добавим использование Channel для передачи извлеченных данных в метод poll. Сам периодический опрос мы будем делать через flow (но можно использовать и любой механизм запуска задач через интервал времени).

class ApiSourceTask : SourceTask() {

    //Клиент для запросов к REST
    val httpClient = HttpClient(CIO)

    //Логирование
    val logger = KotlinLogging.logger("ApiSourceTask")

    //Канал для передачи извлеченных данных
    val channel = Channel<SourceRecord>(capacity = 1)

    //Версия task'а
    override fun version() = ApiConnectConfig.VERSION

    fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow {
        delay(initialDelay)
        while (true) {
            emit(Unit)
            delay(period)
        }
    }

    private val json = Json { ignoreUnknownKeys = true }

    // Извлекаем данные из REST API
    suspend fun getWeather(url: String): WeatherData {
        val response = httpClient.get(url)
        return Json.decodeFromString(response.bodyAsText())
    }

    // Запуск извлечения данных
    override fun start(props: MutableMap<String, String>?) {
        val config = props ?: mapOf()
        // Получаем конфигурацию
        val topic = ApiConnectConfig.getTopic(config)
        val apiUrl = ApiConnectConfig.getApiUrl(config)
        val periodicPoll = ApiConnectConfig.getPeriodicPoll(config)
        
        logger.info { "Start api connect task" }

        //Запускаем периодический опрос
        CoroutineScope(Dispatchers.IO).launch {
            tickerFlow(periodicPoll.seconds.toJavaDuration()).flowOn(Dispatchers.IO).collect {
                logger.info { "Polling element " }

                val weather = getWeather(apiUrl.orEmpty())
                //Сохраняем извлеченные данные
                //Здесь первые два аргумента - метаданные
                //Затем название topic для отправки сообщений
                //Следующий аргумент - идентификатор раздела (может быть не указан)
                //Следующие два аргумента - схема и содержание ключа
                //И последние два - схема и содержание значения
                channel.send(
                    SourceRecord(
                        mapOf("domain" to "weather"),              //extracted data
                        mapOf("dt" to LocalTime.now().toString()), //timestamp
                        topic,                                     //topic name
                        null,                                      //partition
                        Schema.OPTIONAL_STRING_SCHEMA,
                        weather.current.observation_time,          //key
                        Schema.OPTIONAL_STRING_SCHEMA,
                        Json.encodeToString(weather.current),      //value
                    )
                )
            }
        }
    }

    // При отключении коннектора отключаем канал
    override fun stop() {
        channel.close()
        logger.info { "Stop" }
    }

    // При опросе возвращаем значение, если оно есть в канале или null для пропуска опроса
    override fun poll(): MutableList<SourceRecord>? = runBlocking {
        return@runBlocking try {
            mutableListOf<SourceRecord>(channel.receiveCatching().getOrThrow())
        } catch (e: Exception) {
            null
        }
    }
}

Теперь, когда код коннектора подготовлен, создадим jar-файл. При компиляции нужно использовать Java 11 для совместимости с JRE внутри контейнера Confluent Kafka Connect.

./gradlew shadowJar

И добавим к стеку Docker Compose запуск Kafka Connect:

kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    depends_on:
      - init-kafka
    environment:
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: weather
      CONNECT_CONFIG_STORAGE_TOPIC: connect_config
      CONNECT_OFFSET_STORAGE_TOPIC: connect_offset
      CONNECT_STATUS_STORAGE_TOPIC: connect_status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_TOPIC_CREATION_ENABLE: false
    ports:
      - 8083:8083
    volumes:
      - /tmp/KafkaConnector.jar:/etc/kafka-connect/jars/KafkaConnector.jar
    command:
      - bash
      - -c
      - confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0 && /etc/confluent/docker/run && sleep infinity

Здесь в /tmp/KafkaConnector.jar скопирован результат сборки jar-файла из build/libs. Также здесь приведен пример установки коннектора из Confluent Hub. Добавление jar-файла недостаточно для регистрации и необходимо после завершения инициализации Kafka Connect обратиться к REST API и зарегистрировать коннектор, при этом в POST-запросе кроме названия класса коннектора также необходимо передать значения для параметров конфигурации и используемые сериализаторы. Нам понадобится использовать json-конфигурацию для регистрации коннектора:

{
  "name": "poll", 

  "config": {
    "connector.class": "tech.dzolotov.kafka.connector.poll.ApiSourceConnector",

    "topic": "weather",
    "apiUrl": "https://api.weatherstack.com/current?access_key=TOKEN&query=Moscow",
    "periodicPoll": "30",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Выполним запрос к Kafka Connect:

curl -X POST -d @register.json http://localhost:8083/connectors --header "Content-Type: application/json"

В логах контейнера Kafka Connect мы можем увидеть, что опрос API начался (с интервалом в 30 секунд). Увидеть полученные сообщения мы можем через запрос внутри Kafka-контейнера в консольной consumer-утилите Kafka:

kafka-console-consumer --bootstrap-server kafka:9092 --topic weather

Теперь перейдем к запуску Hazelcast и созданию конвейера для обработки данных. Hazelcast не требует запуска отдельного конфигурирования и запускается из образа контейнера hazelcast/hazelcast. Приложение, определяющее конвейер обработки может инициировать запрос создания нового сервера (на следующем по увеличению номера порта). Для отправки задания можно будет просто запустить процесс приложения, который будет взаимодействовать через Hazelcast Jet, либо использовать утилиту командной строки hz-cli и команду submit.

Для управления конвейером используются классы из com.hazelcast.jet.pipeline (нам понадобится создать и наполнить Pipeline) и использовать встроенный Sink для отправки сообщений в лог. Подключение к Hazelcast будем выполнять через com.hazelcast.core.Hazelcast. Для указания источника будем использовать com.hazelcast.jet.kafka.KafkaSources .

package tech.dzolotov.hazelcast

import com.hazelcast.com.fasterxml.jackson.core.util.RequestPayload
import com.hazelcast.core.Hazelcast
import com.hazelcast.jet.config.JobConfig
import com.hazelcast.jet.kafka.KafkaSources
import com.hazelcast.jet.pipeline.Pipeline
import com.hazelcast.jet.pipeline.Sinks
import com.hazelcast.jet.pipeline.WindowDefinition.sliding
import com.hazelcast.nonapi.io.github.classgraph.json.JSONDeserializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.*

fun main(args: Array<String>) {
    JetJob.apply()
}

@Serializable
class CurrentWeatherData(val payload: String)

@Serializable
data class CurrentWeather(
    val observation_time: String,
    val temperature: Double,
    val wind_speed: Double,
    val wind_dir: String,
    val pressure: Double,
)

class JetJob {
    companion object {
        private val json = Json { ignoreUnknownKeys = true }

        fun apply() {
            val p = Pipeline.create()
            val kafkaSource = KafkaSources.kafka<String, String>(kafkaProps(), "weather")
            val window = p.readFrom(kafkaSource).withNativeTimestamps(0).window(sliding(1,1))
            window.distinct()
                .apply {
                    it.map {
                        val payload =
                            json.decodeFromString<CurrentWeatherData>(it.result().value).payload      //здесь в payload будет json-строка
                        json.decodeFromString<CurrentWeather>(payload)
                    }
                }
            .writeTo(Sinks.logger {
                "Get json " + it
            })
            val cfg = JobConfig().setName("hazelcast-weather")
            Hazelcast.bootstrappedInstance().jet.newJob(p, cfg)
        }

        private fun kafkaProps(): Properties {
            val props = Properties()
            props.setProperty("bootstrap.servers", "kafka:9092")
            props.setProperty("key.deserializer", StringDeserializer::class.java.name)
            props.setProperty("value.deserializer", StringDeserializer::class.java.name)
            props.setProperty("auto.offset.reset", "earliest")
            return props
        }
    }
}

При подключении к Kafka здесь указана политика "чтения всех сообщений, начиная с самого старого", но это может быть изменено и, например, можно получать только новые сообщения, которые этот consumer ранее еще не получал. Также здесь мы не используем возможности окна (размер =1) и извлекаем последнее полученное сообщение. Для декодирования JSON-сообщения также можно использовать конвейнерный метод map для извлечения значения температуры:

val window = p.readFrom(kafkaSource)
                .withNativeTimestamps(0)
                .window(sliding(1, 1))
                .streamStage()
                .map {
                    val payload =
                        json.decodeFromString<CurrentWeatherData>(it.value).payload      //здесь в payload будет json-строка
                    json.decodeFromString<CurrentWeather>(payload).temperature
                }.writeTo(Sinks.logger {
                    "Temperature $it"
                })

Альтернативно можно было бы получить среднее значение за последние 10 измерений температуры с использованием скользящего окна. Для этого мы можем определить свою функцию-агрегатор, которая будет принимать входной набор данных из Kafka (Map.Entry<String,String>) и возвращать среднее значение температуры среди переданных значений. Определение агрегатора выполняется через builder-методы от AggregateOperation с фазой инициализации (whenCreate), последовательных операций накопления (с каждым значением из списка через andAccumulate) и обобщения (andExportFinish), что по сути является определением Reducer в подходе Map-Reduce. В нашем случае функция агрегации может выглядеть следующим образом:

//принимаем Map.Entry<String,String>, аккумулятор будет Double, результат Double (средняя)
val avgTemperatureOp = com.hazelcast.jet.aggregate.AggregateOperation.withCreate {
    listOf(
        DoubleAccumulator(0.0),
        DoubleAccumulator(0.0)
    )
}.andAccumulate<Map.Entry<String,String>> { acc, data ->
    val payload =
        Json.decodeFromString<CurrentWeatherData>(data.value).payload      //здесь в payload будет json-строка
    val temperature = Json.decodeFromString<CurrentWeather>(payload).temperature
    acc[0].accumulate(temperature)
    acc[1].accumulate(1.0)
}.andExportFinish { acc ->
    acc[0].export() / acc[1].export()
}

Тогда конвейер, который будет получать последние 10 измерений (со сдвигом на единицу при появлении новых данных) может выглядеть так:

            val p = Pipeline.create()
            val kafkaSource = KafkaSources.kafka<String, String>(kafkaProps(), "weather")
            p.readFrom(kafkaSource).withNativeTimestamps(0).window(sliding(10, 1))
                .aggregate(avgTemperatureOp)
                .writeTo(Sinks.logger {
                    "Temperature $it"
                })

При запуске используется активный экземпляр Hazelcast. Для сборки также будем использовать Java 11 и собирать через ShadowJar и будем использовать плагин application для определения класса с функцией main.

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
    kotlin("jvm") version "1.8.21"
    id("com.github.johnrengelman.shadow") version "8.1.1"
    kotlin("plugin.serialization") version "1.8.21"
    application
}

group = "tech.dzolotov.hazelcast"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.jetbrains.kotlin:kotlin-stdlib:1.8.21")
    implementation("com.hazelcast:hazelcast:5.2.3")
    implementation("com.hazelcast.jet:hazelcast-jet-kafka:5.2.3")
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1")
    testImplementation(kotlin("test"))
}

project.setProperty("mainClassName", "tech.dzolotov.hazelcast.MainKt")

tasks.test {
    useJUnitPlatform()
}

kotlin {
    jvmToolchain(11)
}

Для запуска приложения в Hazelcast можно переслать jar-файл в контейнер и запустить там команду hz-cli submit:

docker cp HZTest-1.0-SNAPSHOT-all.jar kafka-hazelcast-1:/tmp
docker exec -it kafka-hazelcast-1 hz-cli submit /tmp/HZTest-1.0-SNAPSHOT-all.jar

При запуске через submit код процесса будет запускаться внутри JVM Hazelcast и результаты выполнения можно увидеть через логи контейнера:

docker logs -f kafka-hazelcast-1

Таким образом, сочетание Kafka Connect (извлечение из API) + Kafka (для долговременного хранения истории изменения значений) + Hazelcast (анализ потока событий из Kafka и выполнение Filter-Map-Reduce преобразований) может быть использовано для решения задач анализа потока входных данных в реальном времени для любого источника данных (включая реляционные базы данных, текстовые файлы или любые другие источники, для которых может быть сформирован поток изменений и загружен в topic Kafka через Kafka Connect.

Во второй части статьи мы разберемся с созданием Sink Connector для передачи сообщений во внешнюю систему, а также посмотрим на возможности, доступные нам при реализации трансформеров для Kafka Connect.

А прямо сейчас хочу пригласить вас на бесплатный урок курса Apache Kafka, на котором вы узнаете про особенности Kafka и ее устройство, познакомитесь с основными утилитами, рассмотрите базовое АПИ для работы с Kafka.

Комментарии (1)


  1. alexhott
    13.06.2023 08:28

    В реальном времени и наличие очереди в одном предложении, я привык что или то или то.