Привет! Меня зовут Александра, я работаю в отделе тестирования производительности Тинькофф. Мы продолжаем наш цикл статей, посвященных работе Gatling с различными протоколами. Ранее мы уже рассмотрели работу с HTTP, JDBC и gRPC. В этой статье поговорим о работе Gatling с Kafka. Если вы еще не прочли нашу вводную статью, рекомендую это сделать: в ней мы поделились базовой информацией о работе с Gatling.

Дисклеймер

На момент написания статьи используемые плагины поддерживают версию Gatling не выше 3.7.6. Для получения работающего скрипта в будущем достаточно обновить версии плагинов в соответствии с их документацией.

Пишем скрипты Gatling для Kafka

Kafka — распределенный брокер сообщений с высокой пропускной способностью. Он состоит из нескольких основных элементов:

  1. Broker — сервер.

  2. Producer — отвечает за отправку сообщений в топик.

  3. Consumer — отвечает за считывание обращений.

  4. ZooKeeper — хранилище метаданных и логов.

Также у Kafka особая структура данных: каждое сообщение содержит ключ, значение, временную метку и опционально может содержать метаданные. Сообщения отправляются в топики, которые могут состоять из нескольких партиций. Узнать о Kafka больше можно в официальной документации.

Разворачиваем тестовую Kafka

Для разработки скрипта Gatling развернем тестовую Kafka и Zookeeper. Для локального развертывания сервисов используем файл docker-compose.yml. Актуальные версии можно посмотреть на странице в DockerHub.

services:
 zookeeper:
   image: wurstmeister/zookeeper
   hostname: zoo1
   environment:
     ZOO_MY_ID: "1"
     ZOO_PORT: "2181"
     ZOO_SERVERS: server.1=zoo1:2888:3888
   ports:
     - '2181:2181'
   volumes:
     - "./zookeeper/data:/data"
     - "./zookeeper/logs:/datalog"
   restart: on-failure
 kafka:
   image: wurstmeister/kafka
   depends_on:
     - zookeeper
   environment:
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     KAFKA_ADVERTISED_HOST_NAME: kafka
     KAFKA_LISTENERS: BROKER://:9092,EXTERNAL://:9093
     KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:9093
     KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
     KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
     KAFKA_BROKER_ID: "1"
     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
     KAFKA_CREATE_TOPICS: myTopic:1:1
   ports:
     - '9093:9093'
     - '9092:9092'
   volumes:
     - /var/run/docker.sock:/var/run/docker.sock

 Для запуска создаем файл docker-compose.yml и выполняем команду docker-compose up. Обратите внимание: для выполнения команды нужно установить и запустить Docker.

После выполнения команды будет развернута тестовая Kafka. Чтобы проверить ее доступность, устанавливаем KafkaTool и подключаемся к ней. Для этого создаем новое подключение. В открывшемся окне произвольно называем кластер, на вкладке Advanced вводим адрес localhost:9093.

Разрабатываем скрипт

Для написания скриптов берем тот же шаблон, что и в предыдущей статье. В качестве IDE для разработки скриптов будем использовать IntelliJ IDEA. 

Шаг 1. Обновление зависимостей

В файле project/Dependencies.scala добавим плагин для работы Gatling с Kafka и его зависимости. Актуальную версию можно посмотреть в GitHub.

lazy val gatlingKafka: Seq[ModuleID] = Seq(
 "ru.tinkoff" %% "gatling-kafka-plugin",
).map(_ % "<current version>")

lazy val avro4s: Seq[ModuleID] = Seq(
  "com.sksamuel.avro4s" %% "avro4s-core")
.map(_ % "<current version>" % "provided")

 В файле build.sbt добавляем новые зависимости.

libraryDependencies ++= gatlingKafka,
libraryDependencies ++= avro4s,

Для обновления зависимостей нажимаем Load sbt Changes через интерфейс IntelliJ IDEA.

Шаг 2. Переменные сервиса

Для локального подключения к Kafka указываем ее адрес в файле src/test/resources/simulation.conf.

kafkaBroker: "localhost:9093"

Шаг 3. Запросы

В директории cases создаем новый scala-класс, где будем писать запросы. Назовем его KafkaActions. Пишем запрос для отправки сообщения в топик myTopic, который мы создали в момент разворачивания Kafka.

package ru.tinkoff.load.myKafkaSample.cases

import io.gatling.core.Predef.
import ru.tinkoff.gatling.kafka.Predef.kafka
import ru.tinkoff.gatling.kafka.request.builder.RequestBuilder

object kafkaActions {
 val sendMyMessage: RequestBuilder[_, Any] = 
  // Указываем имя запроса
  kafka("my message")
  // Указываем ключ и сообщение
   .send("myMessage", "Hello!")
  
 val sendOtherMessage: RequestBuilder[_, Any] = kafka("my other message")
   .send("myMessage", "Hello, ${name}!")
}

Для отправки сообщения sendOtherMessage используем feeder, создаем для него директорию feeders. И в ней же — object Feeders.

package ru.tinkoff.load.myKafkaSample.feeders

import ru.tinkoff.gatling.feeders.RandomStringFeeder

 object Feeders {
   // Создаем фидер для генерации строки 6 случайных символов
   val myRandomStringFeeder = RandomStringFeeder("name", 6)
 }

Шаг 4. Сценарий теста

В CommonScenario меняем код для выполнения наших запросов. При такой реализации запросы будут выполняться последовательно в указанном порядке.

package ru.tinkoff.load.myKafkaSample.scenarios

import io.gatling.core.Predef.
import io.gatling.core.structure.ScenarioBuilder
import ru.tinkoff.load.myKafkaSample.cases.
import ru.tinkoff.gatling.kafka.Predef.
import ru.tinkoff.load.myKafkaSample.feeders.Feeders.myRandomStringFeeder

object CommonScenario {
 def apply(): ScenarioBuilder = new CommonScenario().scn
}

class CommonScenario {
 val scn: ScenarioBuilder = 
  // Указываем название сценария
  scenario("Common Scenario")
  // Подключаем feeder
   .feed(myRandomStringFeeder)
  // Отправляем сообщения
   .exec(kafkaActions.sendMyMessage)
   .exec(kafkaActions.sendOtherMessage)
}

Шаг 5. Описание Kafka-протокола

В файле myKafkaSample.scala описываем протокол для работы с Kafka.

package ru.tinkoff.load

import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.config.SimulationConfig.
import ru.tinkoff.gatling.kafka.Predef.kafka
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilder

package object myKafkaSample {
 val kafkaProtocol: KafkaProtocolBuilder = kafka
  // Указываем название топика
   .topic("myTopic")
   .properties(
     Map(
       ProducerConfig.ACKS_CONFIG                   -> "1",
       // Указываем kafka brokers
       ProducerConfig.BOOTSTRAP_SERVERS_CONFIG      -> getStringParam("kafkaBroker"),
       // Указываем тип ключа и сообщения
       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringSerializer",
       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
     ),
   )
}

Шаг 6. Нагрузочные тесты

Описываем, как будем подавать нагрузку, в файле Debug.scala.

package ru.tinkoff.load.myKafkaSample

import io.gatling.core.Predef.
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.load.myKafkaSample.scenarios.CommonScenario

class Debug extends Simulation {
 setUp(
   // Вызываем сценарий
   CommonScenario()
   // Задаем интенсивность
   .inject(atOnceUsers(1)),
 ).protocols(
   // Указываем протокол
   kafkaProtocol,
 )
}

По аналогии изменяем протокол в MaxPerformance и Stability

Шаг 7. Запуск Debug-теста

Для запуска теста используем GatlingRunner. После выполнения скрипта можно подключиться к нашей тестовой Kafka, используя KafkaTool, и проверить, что сообщения появились в топике.

package ru.tinkoff.load.myKafkaSample

import io.gatling.app.Gatling
import io.gatling.core.config.GatlingPropertiesBuilder

object GatlingRunner {
 def main(args: Array[String]): Unit = {

   // Указывает имя симуляции Debug (можно использовать любую симуляцию)
   val simulationClass = classOf[Debug].getName

   val props = new GatlingPropertiesBuilder

   props.simulationClass(simulationClass)

   Gatling.fromMap(props.build)
 }
}

Заключение

В этой статье цикла о Gatling мы узнали, как с помощью плагина можно формировать и отправлять сообщения в Kafka. Если вам нужно использовать avro-схемы, скачать их можно, используя плагин SBT Schema Registry. Мы планируем расширять функционал плагина — в следующих версиях с его помощью можно будет читать сообщения из Kafka. 

Все плагины, использованные в цикле статей о Gatling, есть в открытом доступе. Если вы хотите внести свой вклад в развитие плагинов, которые упоминались в нашем цикле статей, мы будем рады получить PR или issue на GitHub. В следующей статье мы разберемся, как писать скрипты Gatling для протокола AMQP.

Полезные ссылки

  1. Gatling Kafka Plugin

  2. SBT Schema Registry Plugin

  3. Официальная документация Kafka

  4. Gatling Feeders

  5. Проект Gatling из примеров статьи

Комментарии (0)