В этой статье показано, как можно визуализировать Apache Kafka Streams в реактивных приложениях с помощью пользовательского интерфейса разработчика в Quarkus (Quarkus Dev UI). Quarkus - Java платформа, предоставляющая расширение для использования Kafka Streams API, а также позволяющая реализовывать приложения потоковой обработки, основанные непосредственно на Kafka.
Реактивный обмен сообщениями и Apache Kafka
С появлением архитектур, управляемых событиями, многие разработчики применяют реактивное программирование для написания бизнес-приложений. Требования к этим приложениям буквально указывают, что они не должны обрабатываться в реальном времени, потому что конечные пользователи не ожидают синхронного взаимодействия через веб-браузеры или мобильные устройства. Вместо этого низкая задержка является более важным критерием производительности, независимо от объема данных или одновременных пользователей.
Вам может быть интересно, как реактивное программирование может достичь этой совершенно иной цели. Секрет заключается в асинхронном протоколе связи, отделяющем отправителей от приложений, которые потребляют и обрабатывают события. В этой схеме вызывающий абонент (например, конечный пользователь) отправляет сообщение получателю, а затем продолжает обработку других запросов, не дожидаясь ответа. Асинхронная обработка также может улучшить производительность, безопасность и масштабируемость больших объемов данных.
Однако нелегко реализовать все, что связано с возможностями асинхронной связи, с помощью только реактивного программирования. Это причина того, что платформы очередей сообщений также стали играть важную роль в приложениях, управляемых событиями.
Apache Kafka - одна из самых популярных платформ для асинхронной обработки сообщений о событиях для поддержки реактивных приложений. Kafka Streams - это клиентская библиотека, которая непрерывно абстрагирует изменяющиеся наборы данных событий (также известные как потоки) в кластерах Kafka для поддержки высокой пропускной способности и масштабируемости. Поток - это набор записей данных в виде пар ключ-значение.
Пример использования Quarkus Dev UI
Взгляните на следующий метод getMetaData()
, чтобы увидеть, как Quarkus позволяет отправлять интерактивные запросы к потокам Kafka с помощью внедрения KafkaStreams
. Полный код включен в Quarkus Kafka Streams Quickstart.
@Inject
KafkaStreams streams;
public List<PipelineMetadata> getMetaData(){
return streams
.allMetadataForStore(TopologyProducer.WEATHER_STATIONS_STORE)
.stream()
.map(m->new PipelineMetadata(
m.hostInfo().host()+":"+m.hostInfo().port(),
m.topicPartitions()
.stream()
.map(TopicPartition::toString)
.collect(Collectors.toSet())))
.collect(Collectors.toList());
}
Kafka Streams также позволяет создавать топологию процессов, которая представляет собой граф источников, процессоров и приемников в темах Kafka. Конечно, вы можете отслеживать потоки с помощью инструментов командной строки (таких, как kcat), но текстовый вывод не позволяет легко понять, как потоки обрабатывают и используют сообщения по темам Kafka.
Взглянем на другой пример. Метод buildTopology()
позволяет строить топологию потока. Найдите полный код в Quarkus Кафка Streams Quickstart.
@Produces
public Topology buildTopology(){
StreamsBuilder builder=new StreamsBuilder();
ObjectMapperSerde<WeatherStation> weatherStationSerde=new ObjectMapperSerde<>(WeatherStation.class);
ObjectMapperSerde<Aggregation> aggregationSerde=new ObjectMapperSerde<>(Aggregation.class);
KeyValueBytesStoreSupplier storeSupplier=Stores.persistentKeyValueStore(WEATHER_STATIONS_STORE);
GlobalKTable<Integer, WeatherStation> stations=builder.globalTable(
WEATHER_STATIONS_TOPIC,
Consumed.with(Serdes.Integer(),weatherStationSerde));
builder.stream(
TEMPERATURE_VALUES_TOPIC,
Consumed.with(Serdes.Integer(),Serdes.String()))
.join(
stations,
(stationId,timestampAndValue)->stationId,
(timestampAndValue,station)->{
String[]parts=timestampAndValue.split(";");
return new TemperatureMeasurement(station.id,station.name,Instant.parse(parts[0]),
Double.valueOf(parts[1]));
})
.groupByKey()
.aggregate(
Aggregation::new,
(stationId,value,aggregation)->aggregation.updateFrom(value),
Materialized.<Integer, Aggregation> as(storeSupplier)
.withKeySerde(Serdes.Integer())
.withValueSerde(aggregationSerde))
.toStream()
.to(
TEMPERATURES_AGGREGATED_TOPIC,
Produced.with(Serdes.Integer(),aggregationSerde));
return builder.build();
}
Визуализация топологии Kafka Streams
Для визуализации топологии Kafka Streams разработчикам традиционно требовались дополнительные инструменты визуализации, которые запускались в облаке или локальных средах разработки отдельно от кластеров Kafka. Но встроенный пользовательский интерфейс разработчика Quarkus позволяет вам видеть все загруженные в настоящее время расширения с соответствующей документацией. Когда вы запускаете Quarkus Dev Mode (например, ./mvnw quarkus:dev
) и добавляете расширение quarkus-kafka-streams
в проект, Quarkus Dev UI отображает расширение Apache Kafka Streams графически (рисунок 1).
Рисунок 1. Пользовательский интерфейс разработчика показывает расширение Apache Kafka Streams с кнопкой "Topology".
Когда вы нажимаете на кнопку "Topology", вы попадаете в Kafka Streams topology UI (рисунок 2).
Рисунок 2. Экран "Topology" для Apache Kafka Streams показывает подробную информацию, включая активные темы.
Пользовательский интерфейс "Topology" показывает, как потоки событий погружаются в темы (например, temperature-values
) и как приложения Quarkus используют потоки из тем. Кроме того, вы можете понять, как приложение в конечном итоге объединяет потоки из нескольких тем (temperature-values
и weather-stations
) в одну тему (temperatures-aggregated
). Пользовательский интерфейс "Topology" также демонстрирует последовательности того, как потоки непрерывно поступают, объединяются и агрегируются в кластерах Kafka.
Где узнать больше
В этой статье показано, как визуализировать Apache Kafka Streams в приложениях Quarkus с помощью Quarkus Dev UI. Quarkus также предоставляет потрясающие функции для повышения вашей производительности за счет непрерывного тестирования, интерфейса командной строки (CLI) Quarkus и Dev Services. Чтобы узнать больше о Kafka и программировании реактивного обмена сообщениями, см. следующие статьи: