Apache Kafka нередко используется как инструмент накопления истории событий или посредник для взаимодействия микросервисов, работающих с разной скоростью или ожидающих доступа к историческим данным для анализа трендов исторических данных (и в этом случае может использоваться как альтернативное решение для Time Series Database). В большинстве случаев в роли producer и consumer выступают сетевые приложения, которые взаимодействуют через драйвер с поддержкой протокола Kafka, но иногда требуется настроить интеграцию с унаследованными системами или готовыми решениями и для этого может использоваться вспомогательный инструмент Kafka Connect, представляющий большое количество готовых коннекторов. В первой части статьи мы рассмотрели способ создания коннектора для опроса REST API и анализа данных с использованием Hazelcast. В этой статье мы последовательно разберемся с созданием коннекторов для пересылки сообщений из топика во внешнюю систему (Sink Connector) для отправки данных из Kafka в TSDB базу данных TDEngine, а также обсудим, как можно выполнить нагрузочное тестирование всего стека с использованием xk6-kafka.

Начнем с установки базы данных TDEngine с использованием Docker Compose:

services:
  tdengine:
    image: tdengine/tdengine

После запуска можно зайти внутрь контейнера и перейти в командный режим через запуском taos:

docker exec -it kafka_tdengine_1 taos

TDEngine реализует SQL-подобный язык для управления схемой и выполнения запросов и предоставляет для доступа режим JDBC (двоичный протокол) или REST (текстовый протокол, чуть менее эффективный). Мы будем использовать в нашем примере REST-протокол.

На верхнем уровне структуры используется база данных, которая по сути является разновидностью TSDB, которая при создании может быть сконфигурирована по времени хранения, количеству реплик и др.

val jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata"
    val conn = DriverManager.getConnection(jdbcUrl)
    val createDB = conn.prepareStatement("CREATE DATABASE IF NOT EXISTS connect KEEP 120 DURATION 7")
    createDB.execute()

Здесь срок хранения данных - 120 дней, при этом каждые 7 дней создается новый файл для физического хранения базы данных. Следующая концептуальная единица - Supertable, определяет список полей и тэги, по которым будет дифференцироваться таблица, созданная на основе Supertable:

val stableDB = conn.prepareStatement("CREATE STABLE IF NOT EXISTS connect.weather (ts timestamp, temperature float, winddir varchar(4), wind float) TAGS (location BINARY(32))")
    stableDB.execute()

И последним этапом создаем таблицу из Supertable:

val tableDB = conn.prepareStatement("CREATE TABLE IF NOT EXISTS connect.weather_moscow USING connect.weather TAGS ('Moscow')")
    tableDB.execute()

Теперь мы можем добавлять данные в созданную таблицу:

val insert = conn.prepareStatement("INSERT INTO connect.weather_moscow VALUES (now,12,'NW',15)")
    insert.execute()

Начнем с создания Sink-коннектора и, по аналогии с Source-коннектором, добавим необходимые зависимости (Kafka Connect API, Jackson будет использоваться для десериализации JSON из сообщения):

plugins {
    kotlin("jvm") version "1.8.21"
    id("com.github.johnrengelman.shadow") version "8.1.1"
}

group = "tech.dzolotov"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("com.taosdata.jdbc:taos-jdbcdriver:3.2.1")
    testImplementation(kotlin("test"))
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
    implementation("org.apache.kafka:connect-api:3.4.1")
    implementation("io.github.oshai:kotlin-logging-jvm:4.0.0-beta-29")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
}

kotlin {
    jvmToolchain(11)
}

Конфигурация Sink Connector в нашем случае потребует указания JDBC-строки подключения и названия топика для подписки в Kafka.

object TDEngineConnectConfig {

    const val VERSION = "1.0.0"
    const val TOPIC_CONFIG = "topic"
    const val JDBC_CONFIG = "jdbc"

    var topic: String? = null
    var jdbc: String? = null

    val config: ConfigDef = ConfigDef()
        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Topic name")
        .define(JDBC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "JDBC connection string")


    fun buildFromMap(map: Map<String, String>) {
        val parsed = AbstractConfig(config, map)
        topic = parsed.getString(TOPIC_CONFIG)
        jdbc = parsed.getString(JDBC_CONFIG)
    }

    fun toMap(): Map<String, String> {
        return mapOf(
            TOPIC_CONFIG to topic.orEmpty(),
            JDBC_CONFIG to jdbc.orEmpty(),
        )
    }

    fun getTopic(config: Map<String, String>) = config[TOPIC_CONFIG]

    fun getJdbc(config: Map<String, String>) = config[JDBC_CONFIG]
}

Реализация абстрактного класса SinkConnector принципиально не отличается от аналогичного класса для SourceConnector, его задача - извлечь переданную при запуске конфигурацию и передать её в создаваемый объект реализации класса SinkTask.

class TDEngineSinkConnector : SinkConnector() {
    override fun version() = TDEngineConnectConfig.VERSION

    override fun start(props: MutableMap<String, String>?) =
        TDEngineConnectConfig.buildFromMap(props.orEmpty())

    override fun taskClass(): Class<out Task> = TDEngineSinkTask::class.java

    override fun taskConfigs(maxTasks: Int) = listOf(TDEngineConnectConfig.toMap())

    override fun stop() {}

    override fun config(): ConfigDef = TDEngineConnectConfig.config
}

Реализация абстрактного класса SinkTask определяет жизненный цикл коннектора:

  • start(map: Map<String,String>?) - инициализация подключения к внешней системе

  • stop() - завершение взаимодействия с внешней системой

  • version() - получить версию реализацию задачи

  • put(records: MutableCollection<SinkRecord>?) - вызывается при любом изменении в топиках Apache Kafka

В нашем случае start/stop используются для установки подключения к базе данных и очистки используемых ресурсов.

class TDEngineSinkTask : SinkTask() {
    override fun version() = TDEngineConnectConfig.VERSION

    lateinit var conn: Connection
    lateinit var topic: String

    override fun start(props: MutableMap<String, String>?) {
        val jdbcUrl = TDEngineConnectConfig.getJdbc(props.orEmpty())
        conn = DriverManager.getConnection(jdbcUrl)
        topic = TDEngineConnectConfig.getTopic(props.orEmpty()).orEmpty()
        conn.prepareStatement("CREATE DATABASE IF NOT EXISTS connect KEEP 120 DURATION 7").execute()
        conn.prepareStatement("CREATE STABLE IF NOT EXISTS connect.weather (ts timestamp, temperature float, winddir varchar(4), wind float, pressure float) TAGS (location BINARY(32))")
        conn.prepareStatement("CREATE TABLE IF NOT EXISTS connect.weather_moscow USING connect.weather TAGS ('Moscow')").execute()
    }

    override fun stop() {
        conn.commit()
        conn.close()
    }

    override fun put(records: MutableCollection<SinkRecord>?) {}
}

Put принимает список обновлений с последнего вызова метода, каждый элемент списка содержит метаданные и содержание сообщения:

  • topic() - название топика, откуда было извлечено сообщение

  • kafkaOffset() - внутренний идентификатор (смещение в коммит-логе)

  • kafkaPartition() - идентификатор секции для топика

  • timestampTime() - возвращает тип значения временного отпечатка (константы в TimestampType: NO_TIMESTAMP_TIME - не указан, CREATE_TIME - время создания сообщения из исходных метаданных, LOG_APPEND_TIME - время фактического добавления в topic).

  • headers() - заголовки сообщения

  • keySchema() и key() - описание схемы для валидации ключа и содержание ключа

  • valueSchema() и value() - описание схемы данных для сообщения и само сообщение

Кроме непосредственно сообщения, здесь может быть получена информация о схеме. Kafka поддерживает дополнительный компонент для хранения схемы Kafka Schema Registry, который может быть добавлен при запуска Kafka Connect для создания схемы (из SchemaBuilder) в source/transform connector и проверки корректности сообщения. Для примера, в нашем случае схема на стороне отправителя может выглядеть таким образом:

val schema = SchemaBuilder.struct().name("weather").version(1).doc("Weather schema")
  .field("observation_time", Schema.STRING_SCHEMA).field("temperature", Schema.FLOAT64_SCHEMA)
  .field("wind_speed", Schema.FLOAT64_SCHEMA).field("wind_direction", Schema.STRING_SCHEMA)
  .field("pressure", Schema.FLOAT64_SCHEMA)
  .build()

И дальше этот объект может использоваться при создании SourceRecord в качестве valueSchema (или keySchema). Для корректной работы валидации на стороне отправителя и получателя, при запуске Kafka Connect необходимо дополнительно указать URL для Kafka Schema Registry:

registry:
    image: confluentinc/cp-schema-registry
    depends_on:
      - kafka
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092
      - SCHEMA_REGISTRY_HOST_NAME=registry
      - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
      - SCHEMA_REGISTRY_KAFKASTORE_GROUP_ID=registry
      - SCHEMA_REGISTRY_MASTER_ELIGIBILITY=true
    ports:
      - 8081:8081
  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    depends_on:
      - init-kafka
    environment:
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: weather
      CONNECT_CONFIG_STORAGE_TOPIC: connect_config
      CONNECT_OFFSET_STORAGE_TOPIC: connect_offset
      CONNECT_STATUS_STORAGE_TOPIC: connect_status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://registry:8081
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_TOPIC_CREATION_ENABLE: false
    ports:
      - 8083:8083
    volumes:
      - /tmp/KafkaConnector.jar:/etc/kafka-connect/jars/KafkaConnector.jar
    command:
      - bash
      - -c
      - /etc/confluent/docker/run && sleep infinity

Также схема может передаваться без использования registry как часть сообщения (кодируется в JSON ключе "schema", а само сообщение будет доступно в "payload") и CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true . В обоих случаях валидация сообщения будет выполняться при десериализации сообщения. При использовании JSON-сериализатора value() будет иметь тип JsonNode (из библиотеки Jackson).

override fun put(records: MutableCollection<SinkRecord>?) {
        records?.forEach { record ->
            if (record.topic() == topic) {
                val data = record.value() as JsonNode       //JSONNode
                val wind_speed = data.get("wind_speed").asDouble(0.0)
                val temperature = data.get("temperature").asDouble(0.0)
                val pressure = data.get("pressure").asDouble(0.0)
                val wind_direction = data.get("wind_direction").asText()
                val st = conn.prepareStatement("INSERT INTO connect.weather_moscow VALUES (now, ?, ?, ?, ?)")
                st.apply {
                  setDouble(1, temperature)
                  setString(2, wind_direction)
                  setDouble(3, wind_speed)
                  setDouble(4, pressure)
                  execute()
                }
            }
        }
    }

После сборки проекта (gradle-задача shadowJar, нужна для объединения Kotlin Runtime внутри JAR-файла) скопируем его в контейнер Kafka Connect docker cp connector.jar:/etc/kafka-connect/jars/connector.jar (и зарегистрируем через REST API от Kafka Connect с использованием json-файла для конфигурации:

{
  "name": "poll", 

  "config": {
    "connector.class": "tech.dzolotov.kafka.connector.tdconnect.SinkConnector",

    "topic": "weather",
    "jdbcUrl": "jdbc:TAOS-RS://tdengine:6041?user=root&password=taosdata",
    "apiUrl": "https://api.weatherstack.com/current?access_key=TOKEN&query=Moscow",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

И зарегистрируем коннектор:

curl -X POST -d @register.json http://localhost:8083/connectors --header "Content-Type: application/json"

Теперь сообщения, отправленные из source-коннектора в формате JSON (будет работать и без отправки схемы, в этом случае в "schema" будет пустой список и появится уведомление, что не удалось проверить корректность схемы данных) будет извлекаться нашим Sink Connector и отправляться в базу данных TDEngine.

Нужно отметить, что многие технологии предлагают готовые решения для Sink-коннекторов (в том числе есть универсальный коннектор JDBC) и они могут устанавливаться через Confluent Hub и конфигурироваться через properties-файлы. Для TDEngine также есть официальный коннектор, подробности про его настройку можно посмотреть здесь.

Теперь, когда мы создали коннектор, посмотрим как можно протестировать Sink и имитировать высокую нагрузку для оценки общей производительности системы. Для этого мы можем использовать расширение xk6-kafka, которое устанавливается поверх инструмента нагрузочного тестирования k6 (был рассмотрен, в том числе, в этой статье). Расширение позволяет выполнять пакетную отправку в topic или извлечение сообщений из topic с оценкой эффективной скорости выполнения операций. Для тестирования будем использовать готовый образ k6 + xk6-kafka и сценарий:

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_STRING,
  SCHEMA_TYPE_JSON
} from "k6/x/kafka";

const writer = new Writer({
  brokers: ["kafka:9092"],
  topic: "weather",
});

export default function () {
  writer.produce({
    messages: [
      {
        key: schemaRegistry.serialize({
          data: "key",
          schemaType: SCHEMA_TYPE_STRING,
        }),
        value: schemaRegistry.serialize({
          data: {},      //содержание запроса
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

Для запуска диагностики добавим контейнер в стек Docker Compose:

k6:
    image: mostafamoradian/xk6-kafka
    volumes:
      - ./k6.js:/tmp/k6.js
    cmd:
      - run 
      - --vus 50 
      - --duration 
      - 120s 
      - /tmp/k6.js

Также для упрощения тестирования Kafka-коннекторов можно использовать готовую конфигурацию Kafka + Connect + Registry.

Напоследок хочу пригласить вас на бесплатный урок курса Apache Kafka, на котором вы узнаете про особенности Kafka и ее устройство, познакомитесь с основными утилитами, рассмотрите базовое АПИ для работы с Kafka.

Комментарии (0)