Привет, Хабр!
В этой статье рассмотрим, как на примере магазина котиков — кейса, где каждый заказ превращается в событие — создать событийно‑ориентированную систему обработки заказов с использованием Python, Kafka и Django REST Framework. Создадим REST API для приёма заказов, настроим Kafka‑продюсеры, консьюмеры и реализуем компенсационные транзакции по принципу Saga.
Почему событийно-ориентированная архитектура?
Прежде всего, декуплирование сервисов позволяет нам:
Разделить ответственность. Заказ, оплата, доставка — каждый модуль работает независимо, не мешая другим.
Масштабировать по необходимости. Если нагрузка растёт, можно масштабировать только ту часть, которая испытывает затруднения.
Повысить отказоустойчивость. Если один сервис падает, остальные продолжают работать, а сообщения хранятся в Kafka до обработки.
Облегчить интеграцию. Новые микросервисы легко подключаются через подписку на нужные топики.
Разберёмся, какие компоненты понадобятся:
Django REST Framework. Это «фасад» для клиентов, через который принимаются заказы.
Kafka. Надёжный брокер сообщений, который гарантирует, что данные передадутся между сервисами.
Микросервис заказов. Сюда входит логика приёма заказа, сохранение его в базе данных (или где угодно) и отправка события.
Сервисы‑подписчики. Они отвечают за обработку событий: от подтверждения оплаты до выполнения компенсационных транзакций.
Kafka
Kafka позволяет обрабатывать миллионы сообщений, хранить их до подтверждения и масштабироваться горизонтально.
Пример Kafka Producer на Python:
# kafka_producer.py
import json
from kafka import KafkaProducer
import logging
from typing import Any, Dict
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_kafka_producer() -> KafkaProducer:
"""
Инициализируем KafkaProducer с базовыми настройками.
В продакшене нужно добавить обработку ошибок подключения, настройки безопасности и т.д.
"""
try:
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
retries=5, # Повторная отправка при сбоях
linger_ms=10, # Небольшая задержка перед отправкой
max_request_size=1048576 # Ограничение размера запроса (1MB)
)
logger.info("KafkaProducer успешно инициализирован!")
return producer
except Exception as e:
logger.error(f"Ошибка инициализации KafkaProducer: {e}")
raise
if __name__ == "__main__":
producer = get_kafka_producer()
order_event: Dict[str, Any] = {
"order_id": 1001,
"cat_id": 42,
"quantity": 1,
"customer_name": "Вася",
"status": "CREATED"
}
try:
future = producer.send('orders_topic', order_event)
result = future.get(timeout=10)
logger.info(f"Сообщение отправлено: топик {result.topic}, партиция {result.partition}")
except Exception as e:
logger.error(f"Ошибка отправки сообщения: {e}")
finally:
producer.flush()
Здесь понятно: создаём продюсера и отправляем сообщение. Конечно, для продакшена потребуется больше деталей — но это отличная отправная точка.
Django REST Framework
Теперь переходим к созданию микросервиса заказов. DRF‑приложение примет заказ, проведёт валидацию данных и отправит событие в Kafka.
Выполняем в терминале:
django-admin startproject catshop
cd catshop
python manage.py startapp orders
Создадим сериализатор заказа:
# orders/serializers.py
from rest_framework import serializers
class OrderSerializer(serializers.Serializer):
cat_id = serializers.IntegerField(min_value=1)
quantity = serializers.IntegerField(min_value=1)
customer_name = serializers.CharField(max_length=255)
def validate_cat_id(self, value: int) -> int:
if value <= 0:
raise serializers.ValidationError("ID котика должен быть положительным числом.")
return value
Представление для создания заказа:
# orders/views.py
import uuid
import logging
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .serializers import OrderSerializer
from catshop.kafka_producer import get_kafka_producer
logger = logging.getLogger(__name__)
# Инициализация Kafka Producer один раз при старте приложения
producer = get_kafka_producer()
class OrderCreateView(APIView):
"""
API для создания нового заказа.
"""
def post(self, request, *args, **kwargs):
serializer = OrderSerializer(data=request.data)
if serializer.is_valid():
order_data = serializer.validated_data
# Генерация уникального идентификатора заказа
order_data["order_id"] = str(uuid.uuid4())
order_data["status"] = "CREATED"
try:
future = producer.send('orders_topic', order_data)
result = future.get(timeout=10)
logger.info(f"Сообщение отправлено: топик {result.topic}, партиция {result.partition}")
except Exception as e:
logger.error(f"Ошибка отправки события в Kafka: {e}")
return Response(
{"error": "Не удалось обработать заказ, попробуйте позже."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return Response(order_data, status=status.HTTP_201_CREATED)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
Настроим маршруты:
# orders/urls.py
from django.urls import path
from .views import OrderCreateView
urlpatterns = [
path('orders/', OrderCreateView.as_view(), name='order-create'),
]
И подключаем маршруты в основном файле проекта:
# catshop/urls.py
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('orders.urls')),
]
Kafka Consumer – кто слушает события?
Чтобы система работала целиком, нужен компонент, который будет принимать сообщения из Kafka и обрабатывать их. Для этого реализуем Kafka Consumer с ручным подтверждением сообщений.
# kafka_consumer.py
import json
import logging
from kafka import KafkaConsumer
from typing import Any, Dict
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_kafka_consumer(topic: str, group_id: str = "order_consumers") -> KafkaConsumer:
"""
Инициализация KafkaConsumer с ручным подтверждением.
"""
try:
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9092'],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False # Ручное подтверждение для большей надежности
)
logger.info("KafkaConsumer успешно инициализирован!")
return consumer
except Exception as e:
logger.error(f"Ошибка инициализации KafkaConsumer: {e}")
raise
def process_order_event(event: Dict[str, Any]) -> None:
"""
Логика обработки события заказа.
Здесь можно добавить вызовы платежной системы, обновление базы данных и прочие действия.
"""
try:
order_id = event.get("order_id")
status_event = event.get("status")
logger.info(f"Обрабатываю заказ {order_id} со статусом {status_event}")
if status_event == "CREATED":
logger.info(f"Заказ {order_id} подтвержден – начинаем обработку платежа.")
# Можно добавить вызов функции подтверждения заказа или уведомления другого сервиса
elif status_event == "CANCELLED":
logger.info(f"Заказ {order_id} отменён – запускаем компенсационные транзакции.")
# Здесь реализуйте логику отмены и компенсационных транзакций
else:
logger.warning(f"Заказ {order_id} имеет неизвестный статус: {status_event}")
except Exception as e:
logger.error(f"Ошибка при обработке заказа {event.get('order_id')}: {e}")
if __name__ == "__main__":
consumer = get_kafka_consumer("orders_topic")
try:
for message in consumer:
logger.info(f"Получено сообщение: {message.value}")
process_order_event(message.value)
consumer.commit() # Ручное подтверждение после успешной обработки
except KeyboardInterrupt:
logger.info("Остановка KafkaConsumer...")
finally:
consumer.close()
Вручную подтверждаем обработку каждого сообщения, чтобы не потерять ни один заказ.
Saga Pattern
В распределённых системах не избежать ошибок. Если, например, платеж не проходит, мы должны отменить заказ и выполнить необходимые компенсационные транзакции. Вот тут на помощь приходит Saga Pattern.
# saga_handler.py
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
def handle_saga(event: Dict[str, Any]) -> None:
"""
Простейшая реализация Saga: если платеж не успешен, запускаем компенсационные транзакции.
"""
order_id = event.get("order_id")
payment_status = event.get("payment_status", "FAILED") # По умолчанию считаем, что платеж не прошёл
if payment_status != "SUCCESS":
logger.info(f"Платеж для заказа {order_id} не прошёл – запускаем компенсационные транзакции.")
cancel_order(order_id)
else:
logger.info(f"Платеж для заказа {order_id} успешен – заказ подтвержден.")
def cancel_order(order_id: str) -> None:
"""
Функция компенсации: отменяет заказ и выполняет необходимые действия (например, возврат средств).
"""
try:
logger.info(f"Заказ {order_id} отменён. Выполняются компенсационные транзакции...")
# Здесь можно вызвать другой микросервис или обновить статус заказа в БД
except Exception as e:
logger.error(f"Ошибка при отмене заказа {order_id}: {e}")
Так сейвим целостность данных даже в случае сбоев.
Тестирование
Пример тестов для API заказа, написанных с использованием DRF.
Тесты для API заказа:
# orders/tests.py
from django.urls import reverse
from rest_framework.test import APITestCase
class OrderAPITestCase(APITestCase):
def test_create_order_success(self):
url = reverse('order-create')
data = {
"cat_id": 10,
"quantity": 2,
"customer_name": "Мурка"
}
response = self.client.post(url, data, format='json')
self.assertEqual(response.status_code, 201)
self.assertIn("order_id", response.data)
self.assertEqual(response.data.get("status"), "CREATED")
def test_create_order_invalid_data(self):
url = reverse('order-create')
data = {
"cat_id": -1, # Недопустимое значение
"quantity": 0,
"customer_name": ""
}
response = self.client.post(url, data, format='json')
self.assertEqual(response.status_code, 400)
Тестируйте позитивные сценарии, а также ситуации с ошибками, таймаутами и сбоями в сети.
Был ли у вас опыт создания подобных систем? Делитесь своим опытом и идеями в комментариях.
20 февраля пройдёт открытый урок на тему «Практика аутентификации и авторизации в микросервисной архитектуре».
Разберём ключевые протоколы и стандарты, такие как OAuth2 и JWT, а также изучим практические прикладные инструменты для централизованного управления доступом. Если интересно, записыватесь на странице курса "Microservice Architecture".