image


Введение


В современном цифровом мире критически важна возможность обрабатывать данные в режиме реального времени и масштабировать приложения. Для этого хорошо подходит Kafka – платформа для распределённой потоковой обработки событий, особенно, если сочетать её с реактивным программированием. В данной статье будет рассказано, как создавать реактивные приложения при помощи этого инструментария.

Разберём основы


Прежде, чем перейти к деталям, давайте изучим некоторые ключевые концепции.

Kafka: Apache Kafka — это платформа для распределенной обработки потоков событий, применяемая для создания конвейеров данных, работающих в режиме реального времени, а также потоковых приложений. Kafka оперативно обрабатывает огромные объёмы данных. При работе с Kafka приложения получают возможность производить и потреблять потоки записей (событий или сообщений).

Реактивное программирование: это такой подход к обработке ПО, при котором акцент делается на обработке асинхронных потоков данных и на распространении изменений. Такой подход способствует отзывчивости, надёжности и масштабированию приложений.

Масштабируемость: это способность системы справляться с нарастающими рабочими нагрузками или потенциал для увеличения системы, чтобы подстроиться под этот рост. В контексте приложения масштабируемость — это способность справляться с растущим количеством пользователей, объёмами данных или новыми транзакциями без ущерба для производительности.

Потоковая обработка данных в режиме реального времени: это технология обработки данных по мере их поступления. Благодаря этому приложение может сразу же реагировать на изменения или события. Потоковая обработка обычно противопоставляется пакетной; в последнем случае данные аккумулируются и обрабатываются партиями через заранее намеченные интервалы.

Расстановка: Почему Kafka сочетается с реактивным программированием?


Kafka и реактивное программирование хорошо дополняют друг друга. Kafka предоставляет надёжный механизм для обработки потоков данных, а реактивное программирование позволяет эффективно и оперативно обрабатывать эти потоки. В совокупности две эти технологии удобны для создания масштабируемых приложений, работающих в режиме реального времени.

Ключевые достоинства:

  1. Высокая пропускная способность: Kafka с минимальными задержками обрабатывать большие объёмы данных.
  2. Отказоустойчивость: распределённая архитектура Kafka гарантирует репликацию данных и отказоустойчивость системы.
  3. Масштабируемость: как Kafka, так и реактивное программирование рассчитаны на горизонтальное масштабирование. Таким образом, можно добавлять всё новые и новые узлы, чтобы справляться с возрастающими нагрузками.
  4. Асинхронная обработка: поскольку реактивное программирование по своей природе асинхронное, в этой парадигме гарантируются неблокирующие операции и улучшается отзывчивость.

Настройка Kafka


Чтобы приступить к программированию масштабируемых приложений при помощи Kafka, сначала нужно настроить кластер Kafka. Кластер состоит из множества брокеров Kafka, и каждый из них отвечает за обработку потоков данных.

Пошаговая настройка:
  • Скачиваем и устанавливаем Kafka:
  • Переходим на страницу для скачивания Apache Kafka и берём там новейшую версию.
  • Извлекаем загруженные файлы и переходим в каталог с Kafka.
  • Запускаем Zookeeper:
  • При работе Kafka опирается на Zookeeper, распределённый координационный сервис. Запустим Zookeeper следующей командой:
    sh bin/zookeeper-server-start.sh config/zookeeper.properties
  • Запускаем брокер Kafka:
  • Когда Zookeeper уже работает, запускаем брокер Kafka:
    sh bin/kafka-server-start.sh config/server.properties
  • Создаём топик:
  • Данные в Kafka организованы в виде топиков. Создадим новый топик так:
    sh bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  • Начинаем производить и потреблять сообщения:
  • Производим сообщения в топик:
    sh bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
  • Потребляем сообщения из топика:
    sh bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning

Знакомство с реактивным программированием


Концепции реактивного программирования:
  • Асинхронность: операции являются неблокирующими, то есть, ни одна операция не должна дожидаться завершения другой операции.
  • Движущая роль событий: код реагирует на события — то есть, на действия пользователя или на изменения данных.
  • Наблюдаемые потоки: данные понимаются как потоки, которые можно наблюдать и на которые можно реагировать.
Популярные библиотеки:
  • Project Reactor: библиотека для реактивного программирования на Java, предоставляет мощный API для сборки асинхронных приложений, управляемых через события.
  • RxJava: ещё одна популярная библиотека для реактивного программирования на Java, вдохновлённая проектом Reactive Extensions.

Простой реактивный пример


Рассмотрим простейший пример реактивного программирования, в котором используется Project Reactor.

Java
import reactor.core.publisher.Flux;

public class ReactiveExample {
    public static void main(String[] args) {
        Flux<String> dataStream = Flux.just("Hello", "Reactive", "World");
        dataStream
            .map(String::toUpperCase)
            .subscribe(System.out::println);
    }
}

Здесь Flux — это поток данных. Функция map преобразует каждый из элементов, а subscribe потребляет данные, выводя их в консоль.

Интеграция Kafka с реактивным программированием


Чтобы создать масштабируемое приложение, нужно интегрировать Kafka со средствами реактивного программирования. Для этого воспользуемся реактивным клиентом Kafka, который будет производить и потреблять сообщения.

Производство сообщений для Kafka


При помощи библиотеки reactor-kafka можно производить сообщения для Kafka, построив работу по реактивному принципу.

Зависимости:
Добавим следующие зависимости в файл pom.xml (для Maven):

Java
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.3.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

Реактивный продьюсер Kafka:

Java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

import java.util.HashMap;
import java.util.Map;

public class ReactiveKafkaProducer {
    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptions = SenderOptions.create(props);
        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Flux<SenderRecord<String, String, String>> outboundFlux = Flux.range(1, 10)
            .map(i -> SenderRecord.create(new ProducerRecord<>("my-topic", "key" + i, "value" + i), "correlationId" + i));

        sender.send(outboundFlux)
            .doOnError(e -> System.err.println("Send failed: " + e))
            .doOnNext(r -> System.out.println("Message sent: " + r.correlationMetadata()))
            .subscribe();
    }
}

В этом примере мы настраиваем продьюсер Kafka при помощи реактивного программирования. Он отправляет десять сообщений в топик my-topic, занося в лог, как окончилась каждая операция — успешно или неуспешно.

Потребление сообщений из Kafka


Аналогично, можно реактивно потреблять сообщения из Kafka.

Реактивный потребитель Kafka:

Java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ReactiveKafkaConsumer {
    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
            .subscription(Collections.singleton("my-topic"));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        receiver.receive()
            .doOnNext(record -> System.out.println("Received message: " + record.value()))
            .subscribe();
    }
}

В данном примере потребитель подписывается на my-topic и выводит в консоль все принятые сообщения.

Создаём масштабируемое приложение: разбор примера


Чтобы проиллюстрировать, как можно использовать Kafka в сочетании с реактивным программированием, рассмотрим практический пример: напишем систему, которая позволяет в режиме реального времени отслеживать движение целого парка грузовичков. Система должна обрабатывать данные о движении машин и показывать, где находится в данный момент каждая из них.

Компоненты системы

  1. Сенсоры грузовиков: датчики, установленные в автомобилях и отправляющие в систему данные о местоположении и обновлении статуса.
  2. Брокер Kafka: собирает информацию с датчиков и отправляет её в потоковом режиме.
  3. Реактивные микросервисы: обрабатывают и анализируют данные.
  4. Приложение для визуализации данных: демонстрирует пользователю данные, обновляемые в режиме реального времени.
Пошаговая реализация:

1. Сенсоры грузовиков: сымитируем датчики, отправляющие информацию в Kafka.

Java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class TruckSensorSimulator {
    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(Producer

Config.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        SenderOptions<String, String> senderOptions = SenderOptions.create(props);
        KafkaSender<String, String> sender = KafkaSender.create(senderOptions);

        Random random = new Random();
        Flux<SenderRecord<String, String, String>> sensorDataFlux = Flux.interval(Duration.ofSeconds(1))
            .map(tick -> {
                String truckId = "truck-" + random.nextInt(10);
                String location = "loc-" + random.nextInt(100);
                String status = "status-" + random.nextInt(3);
                String value = String.format("%s,%s,%s", truckId, location, status);
                return SenderRecord.create(new ProducerRecord<>("truck-data", truckId, value), "correlationId" + tick);
            });

        sender.send(sensorDataFlux)
            .doOnError(e -> System.err.println("Send failed: " + e))
            .doOnNext(r -> System.out.println("Message sent: " + r.correlationMetadata()))
            .subscribe();
    }
}

2. Брокер Kafka: подготовим и запустим Kafka так, как описано выше.

3. Реактивные микросервисы: будем обрабатывать данные из Kafka и анализировать их в режиме реального времени.

Java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class TruckDataProcessor {
    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "truck-data-processor-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
            .subscription(Collections.singleton("truck-data"));

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);

        receiver.receive()
            .doOnNext(record -> {
                String[] data = record.value().split(",");
                String truckId = data[0];
                String location = data[1];
                String status = data[2];
                // Process and analyze the data (e.g., updating a database or triggering alerts)
                System.out.println("Processed data for truck: " + truckId + ", location: " + location + ", status: " + status);
            })
            .subscribe();
    }
}

4. Приложение для визуализации данных: в режиме реального времени показываем обработанные данные пользователю.

Чтобы создать такое приложение, можно воспользоваться веб-фреймворком, поддерживающим реактивное программирование, например, Spring Boot с WebFlux. Приложение подпишется на конечную точку WebSocket, чтобы получать обновления в режиме реального времени.

Настройка Spring Boot WebFlux:

Зависимости:

Добавим следующие зависимости в файл pom.xml (для Maven):

Java
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

Конфигурация WebFlux:

Java
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {
    // WebFlux configuration can be added here if needed
}

Конфигурация WebSocket:

Java
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new TruckDataWebSocketHandler(), "/truck-data").setAllowedOrigins("*");
    }
}

WebSocket Handler:

Java
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import reactor.core.publisher.Mono;

public class TruckDataWebSocketHandler extends WebSocketHandlerAdapter implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // Simulate sending real-time updates to the client
        return session.send(
            session.receive()
                .map(msg -> session.textMessage("Received: " + msg.getPayloadAsText()))
                .doOnError(e -> System.err.println("WebSocket error: " + e))
        );
    }
}

Reactive Kafka Consumer Service:

Java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.HashMap;
import java.util.Map;

@Service
public class TruckDataService {

    private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

    @KafkaListener(topics = "truck-data", groupId = "truck-data-processor-group")
    public void listen(String message) {
        sink.tryEmitNext(message);
    }

    public Flux<String> getTruckDataStream() {
        return sink.asFlux();
    }

    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "truck-data-processor-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties("truck-data");
        return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProps);
    }
}

REST-контроллер:

Java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class TruckDataController {

    @Autowired
    private TruckDataService truckDataService;

    @GetMapping("/truck-data")
    public Flux<String> getTruckData() {
        return truckDataService.getTruckDataStream();
    }
}

При такой конфигурации данные о передвижении машин в режиме реального времени отправляются на конечную точку WebSocket, к которой может подключаться приложение для визуализации данных. В нём мы и увидим обновления.

Заключение


Сочетая Kafka и реактивное программирование, можно создавать хорошо масштабируемые приложения, способные обрабатывать поступающие в режиме реального времени потоки данных и реагировать на них. Kafka обеспечивает надёжный механизм для обработки больших объёмов данных, а реактивное программирование гарантирует, что приложение будет оставаться отзывчивым, устойчивым и эффективным.

В этой статье было рассказано, как подготовить Kafka к работе, была приведена базовая информация о реактивном программировании, а также было объяснено, как интегрировать два этих инструмента и создать мониторинговую систему для работы в режиме реального времени. Если вы пишете приложение для мониторинга, обработки данных или других задач, решаемых в режиме реального времени, то именно Kafka и реактивное программирование помогут вам преуспеть в этом.

Освоив эти технологии, вы сможете создавать приложения, которые будут не только хорошо масштабироваться по мере роста запросов, но и позволят получать данные в режиме реального времени, а также обеспечат отзывчивость, которая нужна пользователям, работающим в современной динамичной цифровой среде.

P.S Обращаем ваше внимание на то, что у нас на сайте проходит распродажа.

Комментарии (5)


  1. Inan_Morgan_Sebynu
    29.07.2024 14:38

    Я может не слишком шарю, но от текста веет авторством chatGPT-4. Кто шарит за kafka - что можете сказать?


    1. Inan_Morgan_Sebynu
      29.07.2024 14:38

      Прошу прощения, не заметил что это перевод статьи. Из-за этого создалось впечатление что текст написан не для хабра - а собственно так оно и есть...
      Верните карму - я исправлюсь...


      1. murakas
        29.07.2024 14:38
        +1

        Бог простит


  1. LightSouls
    29.07.2024 14:38

    Не совчем понятно, почему сообщения по одному читаются, а не батчами


  1. Pdasilem
    29.07.2024 14:38

    Кафка - это гуд для масштабируемости. А вот реактивный подход не всегда оправдан. Много подводных камней в поддержке и тестировании. 10 раз надо подписать при принятии решения о его использовании