Мне понадобилось написать приложение на Ktor с использованием Apache Kafka и Kafka Streams. Официального клиента или плагина для Ktor'a я не нашел, а работать с ванильной Кафкой не хотелось, поэтому я как любой уважающий себя разработчик решил написать велосипед.

Зависимость

Итак, сейчас мой клиент/плагин можно забрать к себе в проект, подключив jitpack в build-файле и указав git-репозиторий:

repositories {
    maven {
        url = uri("https://jitpack.io")
    }
}

dependencies {
    implementation("com.github.IlyaKalashnikov:ktor-kafka-client:-SNAPSHOT")
}

Admin

Подключить клиент к Ktor-приложению так же легко, как и любой другой плагин:

install (Kafka) {
    this.kafkaConfig = mapOf<String, Any>(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers
    )
    this.topics = listOf(
                    NewTopic(topic, 1, 1)
    )
}

Необходимо только прокинуть клиенту список тем, которые необходимо создать, и интересующую вас конфигурацию.

Остальной функционал Admin'a доступен по вызову функции buildKafkaAdmin(). Функция принимает единственный параметр типа Map<String,Any>, где можно декларировать все необходимые вам настройки.

Consumer-Producer

За создание объекта KafkaConsumer<K,V> отвечает функция consumer() с аргументами Map<String, Any> для конфигурации и List<String> для тем, на которые consumer должен подписаться.

val consumer = consumer<String, String>(
                mapOf(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
                    ConsumerConfig.GROUP_ID_CONFIG to "amazing-consumer-group",
                    ConsumerConfig.CLIENT_ID_CONFIG to "amazing-consumer-client"
                ),
                listOf("amazing-topic")
            )

Тем же путем создается KafkaProducer<K,V>:

val producer = producer<String, String>(
                mapOf(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
                )
            )

Экземплярам Consumer'a, Producer'a и Streams можно прокинуть свои сериализаторы-десериализаторы, пример создания кастомного serdes:

class YourClassSerde : Serializer<YourClass>, Deserializer<YourClass>, Serde<YourClass> {
    private val mapper = ObjectMapper()

    override fun serialize(topic: String?, data: YourClass): ByteArray {
        try {
            return mapper.writeValueAsBytes(data)
        } catch (e: Exception) {
            throw SerializationException("Error serializing JSON message", e)
        }
    }

    override fun deserialize(topic: String?, data: ByteArray?): YourClass? {
        return if (data == null) {
            null
        } else try {
            mapper.readValue(data, YourClass::class.java)
        } catch (e: IOException) {
            throw SerializationException(e)
        }

    }

Подробно процесс описан здесь.

Kafka Streams

В случае с KafkaStreams важна не столько инициализация собственно объекта, сколько конфигурация топологии. Для тех, кто не знаком с принципами работы KafkaStreams и Topology, узнать об этом подробнее можно тут.

За создание экземпляра стримов отвечает функция kafkaStreams, которая принимает на вход объект вида:

class KafkaStreamsConfig (
    val topic: String,
    val topologyBuilder: StreamsBuilder.() -> Unit,
    val streamsConfig: Map<String, Any>,
    val builder: StreamsBuilder
)

Инициализация KafkaStreams с самой примитивной топологией может выглядеть так:

val streams = kafkaStreams (
                KafkaStreamsConfig(
                    topic = topic,
                    topologyBuilder = fun StreamsBuilder.() {
                        val stream = this.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
                        stream.toTable(Materialized.`as`(table))
                    },
                    streamsConfig = mapOf(
                        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
                        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
                        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
                        StreamsConfig.APPLICATION_ID_CONFIG to "your-amazing-app",
                        StreamsConfig.CLIENT_ID_CONFIG to "your-amazing-client",
                        StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to 1000,
                    ),
                    builder = StreamsBuilder()
                )
            )

Мне хотелось, чтобы у пользователя была возможность прокинуть клиенту топологию любой сложности по возможности кратко и по делу. Сильно сократить в результате не удалось, но на мой взгляд, через клиент это делать все же удобнее. Пример более тонкой топологии рассмотрим ниже.

Пример использования

Для примера напишем элементарное приложение, которое будет принимать запросы по эндпойнтам "/produce?message=..." и "/count" и либо отправлять сообщение в Кафку, либо выводить на экран слова, отправленные пользователем, и количество их повторений соответственно. В приложении будем использовать Koin для Dependency Injection, заодно сделаем его чуть более приближенным к боевым условиям использования.

В основном модуле опишем декларацию наших "бинов" и внедрим их в Koin'овский модуль. Само собой код лучше распределить по отдельным файлам, но в целях демонстрации обойдемся дефолтным Application.kt.

fun Application.module() {
    install(Kafka) {
        this.kafkaConfig = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVERS
        )
        this.topics = listOf(NewTopic(TOPIC, 1, 1))
    }

    install(Koin) {
        val producer = producer<String, String>(
            mapOf(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVERS,
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
            )
        )

        val streams = kafkaStreams(
            KafkaStreamsConfig(
                topologyBuilder = topology(),
                streamsConfig = streamsConfig(),
                builder = StreamsBuilder()
            )
        )
        streams.cleanUp()
        streams.start()


        environment.monitor.subscribe(ApplicationStopped) {
            streams.close(Duration.ofSeconds(5))
        }

        val storage : ReadOnlyWindowStore<String, Long> =
            streams.store(
                StoreQueryParameters.fromNameAndType(
                    "windowed-word-count",
                    QueryableStoreTypes.windowStore()
                )
            )

        val kafkaModule = module {
            single { producer }
            single { streams }
            single { storage }
        }

        modules(kafkaModule)
    }
}

Конфигурация стримов все же вынесена в отдельный файл для читаемости:

fun topology(): StreamsBuilder.() -> Unit {
    return fun StreamsBuilder.() {
        val stringSerde = Serdes.String()
        val textLines: KStream<String, String> =
            this.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()))

        val groupedByWord: KGroupedStream<String?, String> =
            textLines
                .flatMapValues { value: String ->
                    value.lowercase(Locale.getDefault()).split("\\W+".toRegex())
                }
                .groupBy(
                    { _, word -> word },
                    Grouped.with(stringSerde, stringSerde)
                )

        groupedByWord.count(
            Materialized.`as`<String, Long, KeyValueStore<Bytes, ByteArray>>("word-count")
                .withValueSerde(Serdes.Long())
        )

        groupedByWord.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
            .count(
                Materialized.`as`<String, Long, WindowStore<Bytes, ByteArray>>("windowed-word-count")
                    .withValueSerde(Serdes.Long())
            )
    }
}

fun streamsConfig(): Map<String, Any> {
    return mapOf(
        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVERS,
        StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
        StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.String().javaClass,
        StreamsConfig.APPLICATION_ID_CONFIG to "amazing-app",
        StreamsConfig.CLIENT_ID_CONFIG to "amazing-client",
        StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to 1000,
    )
}

Как видно из примера, чтобы настроить топологию, необходимо прокинуть клиенту extension-функцию StreamsBuilder'a. Внутри которой все манипуляции можем проводить на объекте this.

Теперь напишем контроллеры и "заинжектим" туда синглтоны KafkaStreams и KafkaConsumer.

fun Application.clientExample() {

    val producer by inject<KafkaProducer<String, String>>()
    val storage by inject<ReadOnlyWindowStore<String,Long>>()

    routing {
        get("/produce") {
            val message = call.request.queryParameters["message"]
            producer.send(ProducerRecord("amazing-topic", Random(1337).nextInt().toString(), message))
            call.respondText("Send message $message to tour amazing topic")
        }

        get("/count") {
            val records = storage.all()
            val builder = StringBuilder()
            for (element in records) {
                builder.append("${element.key.key()} ${element.value} | ")
            }
            call.respondText { builder.toString() }
        }
    }
}

Полный код проекта доступен здесь. В корне лежит docker-compose файл для разворачивания Кафки и Zookeeper'a.

Буду рад, если клиент кому-то пригодится, или кто-то решит написать свою имплементацию. В качестве референса я использовал этот проект: https://github.com/gAmUssA/ktor-kafka. Мой вариант отличается тем, что пользователь может не только получит экземпляры KafkaStreams, но и полностью настроить необходимую топологию. Кроме того, на мой взгляд, моя имплементация интуитивно понятнее с точки зрения конфигурации.

Буду благодарен за предложения и конструктивную критику.

Комментарии (0)