В этой статье я хочу показать как можно использовать Kafka в дата-инженерии и как её "пощупать".

Я не хотел бы повторяться по важным моментам, которые касаются архитектуры Kafka, поэтому рекомендую ознакомиться с данным видео.

В нём хорошо рассказано про основные концепции, которые будут дальше использоваться в статье, такие как:

  • Что такое producer.

  • Что такое consumer.

  • Что такое topic.

  • Что такое offset.

  • Что такое commit.

  • Что такое partition .

  • Что такое replication .

Весь код, который будет использоваться в статье будет доступен в моём репозитории.

Разворачивание сервиса

Начнём с того, что развернем Kafka локально в Docker. Для этого создадим docker-compose.yaml со следующим кодом:

version: '3.8'  
  
services:  
  zookeeper:  
    image: 'confluentinc/cp-zookeeper:7.7.0'  
    hostname: zookeeper  
    container_name: zookeeper  
    environment:  
      ZOOKEEPER_CLIENT_PORT: 2181  
      ZOOKEEPER_TICK_TIME: 2000  
    ports:  
      - '2181:2181'  
  
  kafka:  
    image: 'confluentinc/cp-kafka:7.7.0'  
    hostname: kafka  
    container_name: kafka  
    depends_on:  
      - zookeeper  
    environment:  
      KAFKA_BROKER_ID: 1  
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092  
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:19092  
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  
    ports:  
      - '9092:9092'  
      - '19092:19092'  
  
  kafka-ui:  
    image: 'provectuslabs/kafka-ui:v0.7.2'  
    container_name: kafka-ui  
    ports:  
      - '8080:8080'  
    environment:  
      KAFKA_CLUSTERS_0_NAME: local  
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092  
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181  
    depends_on:  
      - kafka  
  
networks:  
  default:  
    name: kafka-network

Чтобы запустить все сервисы выполним команду:

docker-compose up -d

После этого у нас запустится Kafka, ZooKeeper и UI for Apache Kafka.

UI for Apache Kafka будет доступен по адресу http://localhost:8080/ через него можно будет: создавать topic, удалять topic, смотреть сообщения в topic и прочее. Очень удобный инструмент для работы с Kafka.

Создание и удаление topic

В данном разделе мы с вами попробуем создавать и удалять topic.

Создание и удаление topic через CLI

Чтобы создать topic нужно выполнить команды ниже.

Зайти в контейнер с Kafka:

docker exec -it kafka /bin/bash

Создание topic test в Kafka:

kafka-topics --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Просмотр всех доступных topic в Kafka:

kafka-topics --list --bootstrap-server kafka:9092

Удаление topic test в Kafka:

kafka-topics --delete --topic test --bootstrap-server kafka:9092

Создание и удаление topic через Python

Если вам удобнее взаимодействовать с Kafka через Python, то это не проблема.

Для работы с Kafka нам понадобится библиотека confluent-kafka. В примерах ниже я использую версию 2.5.0. Весь код и список всех зависимостей находится в моём репозитории.

Точно также эти операции можно произвести без подключения к контейнеру c Kafka, а через Python.

Чтобы создать topic через Kafka:

from confluent_kafka.admin import AdminClient, NewTopic  
  
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})  
  
  
def example_create_topics(a: AdminClient = None, topics: list[str] = None) -> None:  
    """  
    Функция для создания `topic` в Kafka  
    :param a: AdminClient с параметрами инициализации. Default `None`.    :param topics: Список `topic` для создания. Default `None`.    :return: Ничего не возвращает  
    """  
    new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]  
        try:  
            f.result()  # The result itself is None  
            print("Topic {} created".format(topic))  
        except Exception as e:  
            print("Failed to create topic {}: {}".format(topic, e))  
  
  
example_create_topics(  
    a=admin_client,  
    topics=['test'],  
)

Важно: IDE может ругаться, что модуля NewTopic не существует, но он есть. Это официальный пакет. Это касается версии 2.5.0.

Чтобы удалить topic:

from confluent_kafka.admin import AdminClient  
  
admin_client = AdminClient({'bootstrap.servers': 'localhost:19092'})  
  
  
def example_delete_topics(a: AdminClient = None, topics: list[str] = None) -> None:  
    """  
    Функция для удаления `topic` в Kafka.  
    :param a: AdminClient с параметрами инициализации. Default `None`.    :param topics: Список `topic` для удаления. Default `None`.    :return: Ничего не возвращает.  
    """  
    fs = a.delete_topics(topics, operation_timeout=30)  
  
    # Wait for operation to finish.  
    for topic, f in fs.items():  
        try:  
            f.result()  # The result itself is None  
            print("Topic {} deleted".format(topic))  
        except Exception as e:  
            print("Failed to delete topic {}: {}".format(topic, e))  
  
  
example_delete_topics(  
    a=admin_client,  
    topics=['test'],  
)

Больше примеров использования библиотеки confluent_kafka в официальном GitHub проекта.

Kafka CLI

CLI является популярным вариантов для взаимодействия с Kafka. Изначально его нет на вашем устройстве, поэтому необходимо его скачать следующей командой:

wget https://archive.apache.org/dist/kafka/3.8.0/kafka_2.13-3.8.0.tgz

Затем распаковать:

tar -xzf kafka_2.13-3.8.0.tgz

После выполнения данных команд мы можем использовать CLI для взаимодействия с Kafka.

Важно: Все исполняемые файлы находятся в папке bin. Поэтому стоит обратить внимание, что все скрипты будут выполнять из неё.

Чтобы перейти в папку bin нужно выполнить команду:

cd kafka_2.13-3.8.0/bin/

Запись в Kafka через CLI

Чтобы произвести запись в Kafka выполним команду:

echo 'Hello, Kafka!' | sh kafka-console-producer.sh --broker-list localhost:19092 --topic test

Или так:

echo 'Hello, Kafka!' | ./kafka-console-producer.sh --broker-list localhost:19092 --topic test

Важно: Мне привычнее вызывать скрипт командой sh, но можно и через ./.

Ещё можно создать producer в интерактивном режиме командой:

sh kafka-console-producer.sh --broker-list localhost:19092 --topic test

После создания такого producer у нас появляется возможность писать все сообщения, которые хотим.

После выполнения команды у нас появится [> и после чего мы сможем вводить сообщения для Kafka.

Для выхода из интерактивного режима несколько раз нажмите CTRL + C.

Чтение из Kafka через CLI

Важно: topic в Kafka можно читать "с конца" и "с начала".

Чтобы начать читать с самого начала:

sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test --from-beginning

Чтобы начать читать с конца и получать только новые сообщения:

sh kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic test

Kafka Python

Как было описано выше мы можем взаимодействовать с Kafka через Python. Поэтому сейчас рассмотрим также операции записи и чтения с использованием Python.

Запись в Kafka через Python

Я приведу пример той записи, которая может появиться в вашей Kafka – это информация о пользователе.
Запись будет содержать: uuid, first_name, last_name, middle_name.

Вы можете запустить код ниже и в topic my_topic начнут записываться значения.

import json  
import time  
from confluent_kafka import Producer  
from faker import Faker  
import uuid_utils as uuid  
  
  
def generate_list_of_dict() -> dict[str, str]:  
  
    fake = Faker(locale='ru_RU')  
  
    return {  
        'uuid': str(uuid.uuid7()),  
        'first_name': fake.first_name(),  
        'last_name': fake.last_name(),  
        'middle_name': fake.middle_name(),  
    }  
  
  
# Define the Kafka configuration  
conf = {'bootstrap.servers': "localhost:19092"}  
  
# Create a Producer instance with the above configuration  
producer = Producer(conf)  
  
  
while True:  
    # Define some data to send to Kafka  
    data = generate_list_of_dict()  
  
    # Convert the data to a JSON string  
    data_str = json.dumps(data)  
  
    # Produce a message to the "my_topic" topic  
    producer.produce(topic="my_topic", value=data_str)  
  
    # Flush the producer to ensure all messages are sent  
    producer.flush()  
  
    # Sleep for a second before producing the next set of messages  
    time.sleep(3)

Важно: Если topic ранее не был создан, то он создастся при первой записи.

Чтение из Kafka через Python

Для того чтобы прочитать значения из Kafka нам необходимо создать consumer. Функция ниже имеет возможность прочитать topic с самого начала и с определённого offset.

from confluent_kafka import Consumer, KafkaError, TopicPartition  
  
  
def consume_messages(topic: str = None, offset: int = None) -> None:  
    conf = {  
        'bootstrap.servers': 'localhost:19092',  
        'group.id': 'mygroup',  
        'auto.offset.reset': 'earliest'  
    }  
  
    consumer = Consumer(conf)  
  
    if offset is not None:  
        partitions = consumer.list_topics(topic).topics[topic].partitions  
        for partition in partitions:  
            consumer.assign([TopicPartition(topic, partition, offset)])  
    else:  
        consumer.subscribe([topic])  
  
    try:  
        while True:  
            msg = consumer.poll(1.0)  
            if msg is None:  
                continue  
            if msg.error():  
                if msg.error().code() == KafkaError:  
                    print('Reached end of partition')  
                else:  
                    print(f'Error: {msg.error()}')  
            else:  
                print(f'Received message: {msg.value().decode("utf-8")}')  
    except KeyboardInterrupt:  
        pass  
    finally:  
        consumer.close()  
  
  
# Читать с начала  
consume_messages('test')  
  
# Читать с определенного offset  
# consume_messages('test', offset=5)

Ранее мы читали topic в Kafka без использования групп и поэтому атрибут --from-beginning срабатывал каждый раз при вызове (каждый раз создавалась новая группа).

Но при создании consumer через Python указание group.id является обязательным и поэтому мы можем столкнуться со следующей проблемой: если мы один раз прочитали topic, то при перезапуске кода мы начнем читать только новые сообщения и даже атрибут auto.offset.reset не поможет.

А всё это происходит, потому что мы произвели commit (фиксацию) offset для группы.

Чтобы проверить на каком сейчас offset находится группа необходимо выполнить команду в Kafka:

sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --describe

И мы увидим, что мы прочитали все сообщения. Поэтому offset стоит на последнем сообщении в topic.

  • CURRENT-OFFSET говорит о том на каком offset находится группа.

  • LOG-END-OFFSET текущий последний доступный offset для topic

Вообще, это не проблема, потому что данный offset можно "сбросить", для этого необходимо выполнить команду:

sh kafka-consumer-groups.sh --bootstrap-server localhost:19092 --group mygroup --to-earliest --reset-offsets --execute --topic test

Также можно прочитать topic заново изменив group.id. Но это делать не рекомендуется.

Использование Kafka в дата-инженерии

В дата-инженерии Kafka частый гость, потому что Kafka позволяет быстро и за дёшево покрыть множество бизнес-задач, таких как:

CDC

При реализации CDC вы можете встретиться с Kafka, потому что она является "стандартом" при работе с такого вида событиями.

Если вы хотите понять что такое CDC и какую роль там занимает Kafka вы можете изучить мою статью: CDC на примитивах.

Event-driven

Так как Kafka позволяет нам получать изменения "моментально". В этом определении есть определённые нюансы, но это тема для другого разговора.

Если вернуться к мысли выше, то получая все события "моментально" мы можем на них реагировать.

Для примера: покупатель заходит на сайт нашего интернет-магазина и при заходе в какую-то категорию или раздел мы можем сделать ему какое-то предложение или перестроить для него страницу, в зависимости от его предпочтений или условий заложенных ранее.

Real-time Analytics

Также довольно часто Kafka используется для аналитики в реальном времени. Если к нам сообщения о событиях приходят постоянно и "моментально", то мы можем реагировать на них и следить за своими метриками.

Для примера: маркетинговые акции. Мы запускаем какую-то акцию и сразу смотрим на важные для нас показатели. В зависимости от получаемых значений мы можем изменять условия акции, условия размещения и прочее.

Резюме

Kafka популярный инструмент, поэтому найти литературу, видео и примеры использования – не проблема. В данной статье я показал только верхушку айсберга, который можно изучать и изучать.

Если говорить про взаимодействие c Kafka, то CLI и Python – это не единственные инструменты, к ним можно добавить: PySpark, ClickHouse, Java и прочее.

Кстати, про то как читать из Kafka при помощи ClickHouse было описано в моей статье: CDC на примитивах.

Для более глубокого изучения инструмента рекомендую ознакомиться с книгой: Apache Kafka. Потоковая обработка и анализ данных" (авторы - Нархид Н., Шапира Г., Палино Т., год издания - 2019). В ней описывается много тонкостей и подводных камней при работе с Kafka. Уже вышло второе издание, я его не читал, но судя по наполнению; учтены новые моменты, поэтому порекомендовал бы изучать второе издание.

Ну и самое главное – Теория без практики мертва, практика без теории слепа. Поэтому попробуйте Kafka, даже на pet-проектах или в рамках данной статьи.


Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.

Комментарии (0)