В этой статье мы разработаем и реализуем событийно-ориентированное приложение с использованием Kafka в Python. Для примера мы возьмем заказ мебели в приложении типа IKEA. Это просто пример, а не то, что происходит на самом деле в IKEA.

Мы будем делать приложение на локальном компьютере, но для производственных сред вы можете использовать облачный провайдер, такой как AWS, GCP или Azure.

Давайте посмотрим, что у нас есть в приведенной выше архитектуре:

  • Frontend (внешний интерфейс). Это может быть мобильное или веб-приложение, где пользователь заказывает товар. Когда пользователь выбирает и заказывает мебель с помощью приложения, внешний интерфейс обращается к бэкэнду.

  • Orders Backend (бэкэнд заказов). Он принимает заказ из внешнего интерфейса со всеми данными, связанными с этим заказом, а затем записывает в Kafka тему под названием «order_details». Тема «order_details» будет содержать всю информацию, относящуюся к одному отдельному заказу. Это будет простой Python-файл. Вы можете задеплоить этот сервис вместе с остальными, как микросервисы в облако, используя, например, облачный запуск на GCP или Lambda на AWS.

  • Transactions Backend (бэкэнд транзакций). Подписывается на тему «order_details» в Kafka, поэтому всякий раз, когда кто-то пишет в тему, бэкэнд транзакций будет читать сообщение и обрабатывать его в режиме реального времени. Бэкэнд транзакций будет выполнять обработку кредитных карт и некоторые другие проверки, чтобы убедиться, что заказ подтвержден. Как только заказ будет подтвержден, он отправит ответ в другую тему Kafka под названием «order_confirmed». Эта тема нужна, чтобы собирать все данные, которые относятся к подтвержденному заказу.

  • Email Backend (бэкэнд электронной почты). Подписывается на тему «order_confirmed» и отправляет пользователю электронное письмо с подтверждением, когда заказ подтвержден. Он также может отправить сообщение в тему, например «order_email_sent».

  • Analytics Backend (бэкэнд аналитики). Он подписывается на тему «order_confirmed» и выполняет по ней какую-то аналитику. Например, он может агрегировать общее количество заказов в этот день и общее количество доходов, полученных от разных заказов. Затем мы можем отправить результат аналитики по теме «analytics_result».

  • Dashboard. У нас может быть служба для получения каких-либо данных из разных тем и отправки их на панель инструментов для визуализации. Здесь мы просто используем один сервис для них обоих в Python для простоты, но вы можете легко их разделить.

В этом посте нам понадобятся следующее:

kafka-python
flask

Чтобы запустить Kafka локально, можно использовать следующий компоновочный файл с кластером Kafka с одним брокером, а также одним zookeeper'ом и некоторыми другими компонентами Kafka, такими как центр управления UI (UI control-center), реестр схем (schema-registry) и т.д.

## docker-compose-kafka.yml

version: "3"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29093:29093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: "true"
      CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka-tools
    container_name: kafka-tools
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.4.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.4.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Также можно использовать Kafka-UI и Conduktor. UI-центр управления просто показывает сообщения, которые отправляются, когда UI-страница темы открыта. Для Kafka-UI вы можете добавить следующие коды в свой файл компоновки вместо части центра управления:

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=broker:29092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
- KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081

Затем нам нужно запустить docker-compose -f docker-compose-kafka.yml up -d, чтобы запустить Kafka со всеми компонентами. Обратите внимание, что Kafka должна быть запущена, когда мы хотим протестировать внешние и внутренние службы, которым необходимо отправлять или получать данные из тем Kafka.

Мы можем проверить, все ли работает, с помощью docker-compose -f docker-compose-kafka.yml ps команды:

NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:29093->29093/tcp
kafka-tools "tail -f /dev/null" kafka-tools running
kafka-ui "/bin/sh -c 'java $J…" kafka-ui running 0.0.0.0:8080->8080/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

В дополнение к вышеупомянутому файлу docker-compose для Kafka у нас будет еще один для наших микросервисов. Вы можете добавлять сервисы один за другим в файл docker-compose и тестировать их.

Обратите внимание, что нужно для подключения ваших producers и consumers к брокеру Kafka: 

  • Если вы используете ту же сеть, поместив микросервис в тот же файл docker-compose, что и Kafka. Или используя отдельный файл docker-compose и настроив сеть, как сеть Kafka, вы можете использовать broker:29092.

  • Если вы запускаете свой сервис локально на том же компьютере без его докеризации, вы можете использовать в своем коде его localhost:9092.

  • Если вы хотите запустить Kafka на одной машине, а свои службы на другой машине, вам нужно использовать в своем коде <kafka machine ip>:29093.

Начнем с backend-сервисов. Интерфейс мы добавим позже.

Бэкэнд заказов

Теперь давайте перейдем к backend'у заказов. Мы докеризуем приложение и помещаем его в отдельный файл docker-compose с именем docker-compose-services.yml и настраиваем сеть так же, как сеть Kafka.  orders_backend.py — это flask-приложение и выглядит оно следующим образом:

# orders_backend.py

import json
import time

from kafka import KafkaProducer
from flask import Flask, jsonify, request

ORDER_KAFKA_TOPIC = 'order_details'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`

app = Flask(__name__)

## from inside docker compose network - when add the service to compose file -> orders_backend:v1
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                              value_serializer=lambda x: json.dumps(x).encode('utf-8'))

# post endpoint to get user id , order id, user email, and order details
@app.route('/order', methods=['POST'])
def order():
    user_id = request.json['user_id']
    order_id = request.json['order_id']
    user_email = request.json['user_email']
    order_details = request.json['order_details']
    order = {}
    order['user_id'] = user_id
    order['order_id'] = order_id
    order['user_email'] = user_email
    order['order_details'] = order_details
    order['time'] = time.time()
    producer.send(ORDER_KAFKA_TOPIC, order)
    print("Sent order details {} to kafka topic: {}".format(order, ORDER_KAFKA_TOPIC))
    return jsonify(order)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=5002, debug=True)

У него есть конечная точка публикации для получения заказа и публикации его в теме Kafka с именем «order_details».

Затем вы можете легко докеризовать этот сервис. Вот докерфайл:

FROM python:3.9.7-slim
RUN pip install -U pip
RUN pip install pipenv
WORKDIR /app
COPY [ "Pipfile", "Pipfile.lock", "./" ]
RUN pipenv install - system - deploy
COPY [ "orders_backend.py", "./" ]
EXPOSE 5002
ENTRYPOINT ["python", "orders_backend.py"]

Затем мы можем создать образ, используя:

docker build -t orders_backend:v1 .

Файл docker-compose будет выглядеть следующим образом:

# docker-compose-services.yml

version: "1"

services:
  orders_backend:
    restart: always
    image: orders_backend:v1
    ports:
      - "5002:5002"
    networks:
      - ikea-ordering-kafka_default

networks:
  ikea-ordering-kafka_default:
    external: true

Мы можем использовать postman для проверки:

Мы также можем видеть сообщения по теме в пользовательском интерфейсе:

Бэкэнд транзакций

Это простой сервис для прослушивания темы «order_details», выполнения какой-либо обработки данных и отправки подтвержденного сообщения в тему «order_confirmed»:

# transactions_backend.py

import json
import time
from kafka import KafkaConsumer, KafkaProducer

OERDER_KAFKA_TOPIC = 'order_details'
ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`

consumer = KafkaConsumer(OERDER_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                            value_deserializer=lambda x: json.loads(x.decode('utf-8')))
producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                            value_serializer=lambda x: json.dumps(x).encode('utf-8'))

while True:
    for message in consumer:
        print("Received order details: {}".format(message.value))
        user_id = message.value['user_id']
        order_id = message.value['order_id']
        user_email = message.value['user_email']
        order_details = message.value['order_details']
        time = message.value['time']
        ## do some suff on the order and check the confirmation
        order_confirmed = {}
        order_confirmed['user_id'] = user_id
        order_confirmed['order_id'] = order_id
        order_confirmed['user_email'] = user_email
        order_confirmed['order_details'] = order_details
        order_confirmed['time'] = time
        order_confirmed['status'] = 'confirmed'
        producer.send(ORDER_CONFIRMED_KAFKA_TOPIC, order_confirmed)
        print("Sent order details {} to kafka topic: {}".format(order_confirmed, ORDER_CONFIRMED_KAFKA_TOPIC))

Докер-файл выглядит следующим образом:

FROM python:3.9.7-slim
RUN pip install -U pip
RUN pip install pipenv
WORKDIR /app
COPY [ "Pipfile", "Pipfile.lock", "./" ]
RUN pipenv install - system - deploy
COPY [ "transactions_backend.py", "./" ]
ENTRYPOINT ["python", "transactions_backend.py"]

Вы можете создать этот образ и обновить docker-compose-services.yml файл:

# docker-compose-services.yml

version: "1"

services:
  orders_backend:
    restart: always
    image: orders_backend:v1
    ports:
      - "5002:5002"
    networks:
      - ikea-ordering-kafka_default
  
  transactions_backend:
    restart: always
    image: transactions_backend:v1
    ports:
      - "5003:5003"
    networks:
      - ikea-ordering-kafka_default

networks:
  ikea-ordering-kafka_default:
    external: true

Затем, ещё раз протестировав сервисы с помощью postman, мы можем увидеть сообщения, приходящие в тему:

Бэкэнд электронной почты

Код этой службы выглядит следующим образом:

# email_backend.py

import json
import time

from kafka import KafkaConsumer, KafkaProducer
# from flask import Flask, jsonify, request

ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
EMAIL_SENT_KAFKA_TOPIC = 'order_email_sent'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`

producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                            value_serializer=lambda x: json.dumps(x).encode('utf-8'))

consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                            value_deserializer=lambda x: json.loads(x.decode('utf-8')))


def send_email(user_id, order_id, user_email, order_details, time, status):
    print("Sending email to user: {} with order details: {}".format(user_email, order_details))
    # send email to user
    # ...
    # ...
    # ...
    # ...   
    return True


while True:
    for message in consumer:
        # read data from consumer and call the send_email() function
        print("Received order details: {}".format(message.value))
        user_id = message.value['user_id']
        order_id = message.value['order_id']
        user_email = message.value['user_email']
        order_details = message.value['order_details']
        time = message.value['time']
        status = message.value['status']
        email_send_status = send_email(user_id, order_id, user_email, order_details, time, status)
        email_sent = {}
        email_sent['user_id'] = user_id
        email_sent['order_id'] = order_id
        email_sent['user_email'] = user_email
        email_sent['order_details'] = order_details
        email_sent['time'] = time
        email_sent['status'] = email_send_status
        producer.send(EMAIL_SENT_KAFKA_TOPIC, email_sent)
        print("Sent email details {} to kafka topic: {}".format(email_sent, EMAIL_SENT_KAFKA_TOPIC))

Докер-файл также похож на предыдущие с небольшими изменениями имени python-файла. Затем вы можете создать образ, обновить файл docker-compose и запустить его. После отправки нескольких новых сообщений через postman мы можем увидеть сообщения в теме в пользовательском интерфейсе:

Бэкэнд аналитики

Код следующего сервиса получает подтвержденный заказ и рассчитывает общее количество заказов и общий доход:

# analytics_backend.py

import json
import time

from kafka import KafkaConsumer, KafkaProducer

ORDER_CONFIRMED_KAFKA_TOPIC = 'order_confirmed'
ANALYTICS_KAFKA_TOPIC = 'analytics_result'
# KAFKA_SERVER_ADDRESS = 'localhost:9092'
KAFKA_SERVER_ADDRESS = 'broker:29092'
# KAFKA_SERVER_ADDRESS = '47.93.191.241:29093`

producer = KafkaProducer(bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                            value_serializer=lambda x: json.dumps(x).encode('utf-8'))

consumer = KafkaConsumer(ORDER_CONFIRMED_KAFKA_TOPIC, bootstrap_servers=[KAFKA_SERVER_ADDRESS], security_protocol="PLAINTEXT",
                            value_deserializer=lambda x: json.loads(x.decode('utf-8')))

total_revenue = 0
total_orders_count = 0

while True:
    for message in consumer:
        # read data from consumer and do some analytics on it
        print("Received order details: {}".format(message.value))
        order_details = message.value['order_details']
        total_revenue += int(order_details['price'])
        total_orders_count += 1
        analytics = {}
        analytics['total_revenue'] = total_revenue
        analytics['total_orders_count'] = total_orders_count
        producer.send(ANALYTICS_KAFKA_TOPIC, analytics)
        print("Sent analytics details {} to kafka topic: {}".format(analytics, ANALYTICS_KAFKA_TOPIC))

Докер-файл снова точно такой же, как и предыдущий, но с небольшой модификацией. Создайте образ, обновите файл компоновки и, наконец, запустите его.

Мы можем отправить несколько новых сообщений через postman и увидеть сообщения в теме в пользовательском интерфейсе:

Вот и все на этот раз. Надеемся, вы получили общее представление о том, как использовать Kafka в своих проектах.

https://slurm.club/3Musuk6

А чтобы еще больше узнать о том, как разработчикам можно использовать Kafka в работе, вы можете прийти на наш курс «Apache Kafka для разработчиков». Это углублённый курс с практикой на Java или Golang и платформой Spring+Docker+Postgres, который переведёт вас на новый уровень владения инструментом.

Комментарии (11)


  1. ivankudryavtsev
    15.05.2023 15:58
    +2

    Вот в чем проблема переводов - никто не понимает толком что происходит. Пакет kafka-python - это грусть-печаль. В итоге, миллионы "леммингов" прыгают в обрыв. Kafka - это тяжелое решение про пропускную способность, вот такой клиент убивает пропускную способность. Ну и зачем Kafka тогда? Юзайте RabbitMQ, да хоть Redis, MongoDB.


    1. alitenicole Автор
      15.05.2023 15:58
      +2

      Стало интересно, почему же клиент убивает пропускную способность?
      Сможете разъяснить, искренне интересно узнать)


      1. ivankudryavtsev
        15.05.2023 15:58
        +2

        А вы сравните с confluent-kafka и сами все поймете.


        1. alitenicole Автор
          15.05.2023 15:58

          Хорошо, поизучаем, спасибо за наводку)


        1. holodoz
          15.05.2023 15:58
          +2

          Если у вас уже есть опыт использования обоих решений, было бы очень интересно послушать, хотя бы в двух словах, в чем разница и насколько она заметна в использовании


          1. ivankudryavtsev
            15.05.2023 15:58

            Писать лень, но вот первая ссылка из Google. Статья старенькая, но по моим последним пробам, наверное, в 2021 все было ровно так.


    1. nin-jin
      15.05.2023 15:58

      А обеспечение консенсуса в логе событий разве не бьёт по пропускной способности?


      1. ivankudryavtsev
        15.05.2023 15:58

        Развейте свою мысль подробнее. По сравнению с чем? Кажется, что консенсус в Kafka никак не мешает пропускной способности, пока брокеров не начинает штормить.


        1. nin-jin
          15.05.2023 15:58

          По сравнению, например, с неупорядоченными очередями, которым не нужно сохранять глобальную упорядоченность. Насколько я понял, Кафка просто пускает весь трафик по одному топику через один узел - это узкое место, которое если рвётся, то капитально - постоянные перевыборы с падением очередного счастливчика.


          1. ivankudryavtsev
            15.05.2023 15:58

            Ну упали, перевыбрались, поднялись, в чем проблема?


            1. nin-jin
              15.05.2023 15:58

              А как всем известно, после перевыборов нагрузка ведь уменьшается, а не растёт..