В данной статье представлен простой способ реализации микросервисной архитектуры с использованием Kafka, Golang и Docker.
Если вы сразу хотите перейти к рабочему коду, вот ссылка на репозиторий
⚠️ Важно: Не для Продакшена!
Этот материал представляет собой исключительно образовательный пример и демонстрирует техническую возможность реализации подхода запрос-ответ через брокера. Следует подчеркнуть, что в реальных средах подход запрос-ответ через брокер сообщений часто считается дурной практикой. В общепринятых паттернах архитектуры брокеры сообщений обычно используются для того, чтобы "когда-нибудь" обработать сообщение, а не для моментального ответа на запрос. Если в вашей системе необходимо получать моментальные ответы на запросы, рассмотрите возможность размещения сервисов за балансировщиком нагрузки.
Общий процесс работы
Клиент отправляет HTTP-запрос на первый микросервис (API Gateway), используя, например, Postman.
API Gateway передает запрос в Kafka, откуда его принимает второй микросервис.
Второй микросервис обрабатывает запрос и отправляет ответ обратно в Kafka.
API Gateway извлекает ответ из Kafka и возвращает его клиенту.
Примечание: Важным аспектом является возможность сопоставления запросов и ответов, чтобы при параллельной обработке множества запросов каждый ответ был возвращен соответствующему клиенту.
Краткий обзор кода
Конфигурация API Gateway:
Файл api-gateway/main.go
:
package main
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/IBM/sarama"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// MyMessage - структура для нашего сообщения
type MyMessage struct {
ID string `json:"id"`
Name string `json:"name"`
Value string `json:"value"`
}
// responseChannels - словарь для хранения каналов ответов, индексированных по ID запроса
// mu - мьютекс для обеспечения синхронизации доступа к словарю responseChannels
var responseChannels map[string]chan *sarama.ConsumerMessage
var mu sync.Mutex
func main() {
responseChannels = make(map[string]chan *sarama.ConsumerMessage)
// Создание продюсера Kafka
producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// Создание консьюмера Kafka
consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// Подписка на партицию "pong" в Kafka
partConsumer, err := consumer.ConsumePartition("pong", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("Failed to consume partition: %v", err)
}
defer partConsumer.Close()
// Горутина для обработки входящих сообщений от Kafka
go func() {
for {
select {
// Чтение сообщения из Kafka
case msg, ok := <-partConsumer.Messages():
if !ok {
log.Println("Channel closed, exiting goroutine")
return
}
responseID := string(msg.Key)
mu.Lock()
ch, exists := responseChannels[responseID]
if exists {
ch <- msg
delete(responseChannels, responseID)
}
mu.Unlock()
}
}
}()
// Инициализация роутера Gin
router := gin.Default()
router.GET("/ping", func(c *gin.Context) {
requestID := uuid.New().String()
message := MyMessage{
ID: requestID,
Name: "Ping",
Value: "Pong",
}
// Преобразование сообщения в JSON что бы потом отправить через kafka
bytes, err := json.Marshal(message)
if err != nil {
c.JSON(500, gin.H{"error": "failed to marshal JSON"})
return
}
msg := &sarama.ProducerMessage{
Topic: "ping",
Key: sarama.StringEncoder(requestID),
Value: sarama.ByteEncoder(bytes),
}
// отправка сообщения в Kafka
_, _, err = producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message to Kafka: %v", err)
c.JSON(500, gin.H{"error": "failed to send message to Kafka"})
return
}
responseCh := make(chan *sarama.ConsumerMessage)
mu.Lock()
responseChannels[requestID] = responseCh
mu.Unlock()
select {
case responseMsg := <-responseCh:
c.JSON(200, gin.H{"message": string(responseMsg.Value)})
case <-time.After(10 * time.Second):
mu.Lock()
delete(responseChannels, requestID)
mu.Unlock()
c.JSON(500, gin.H{"error": "timeout waiting for response"})
}
})
if err := router.Run(":8080"); err != nil {
log.Fatalf("Failed to run server: %v", err)
}
}
Второй микросервис:
Файл second-microservice/main.go
:
package main
import (
"encoding/json"
"log"
"github.com/IBM/sarama"
)
// Наша структура для сообщения
type MyMessage struct {
ID string `json:"id"`
Name string `json:"name"`
Value string `json:"value"`
}
func main() {
// Создание продюсера Kafka
producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// Создание консьюмера Kafka
consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// Подписка на партицию "ping" в Kafka
partConsumer, err := consumer.ConsumePartition("ping", 0, sarama.OffsetNewest)
if err != nil {
log.Fatalf("Failed to consume partition: %v", err)
}
defer partConsumer.Close()
for {
select {
// (обработка входящего сообщения и отправка ответа в Kafka)
case msg, ok := <-partConsumer.Messages():
if !ok {
log.Println("Channel closed, exiting")
return
}
// Десериализация входящего сообщения из JSON
var receivedMessage MyMessage
err := json.Unmarshal(msg.Value, &receivedMessage)
if err != nil {
log.Printf("Error unmarshaling JSON: %v\n", err)
continue
}
log.Printf("Received message: %+v\n", receivedMessage)
responseText := receivedMessage.Name + " " + receivedMessage.Value + " ( " + receivedMessage.ID + " ) "
// Формируем ответное сообщение
resp := &sarama.ProducerMessage{
Topic: "pong",
Key: sarama.StringEncoder(receivedMessage.ID),
Value: sarama.StringEncoder(responseText),
}
// Отпровляем ответ в gateway
_, _, err = producer.SendMessage(resp)
if err != nil {
log.Printf("Failed to send message to Kafka: %v", err)
}
}
}
}
Конфигурация docker-compose ( для 3 версии )
Файл docker-compose.yml
:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
- ZOOKEEPER_CLIENT_PORT=2181
networks:
- kafka-network
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
networks:
- kafka-network
api-gateway:
build:
context: ./api-gateway
dockerfile: Dockerfile
depends_on:
- kafka
networks:
- kafka-network
ports:
- "8080:8080"
second-microservice:
build:
context: ./second-microservice
dockerfile: Dockerfile
depends_on:
- kafka
networks:
- kafka-network
networks:
kafka-network:
driver: bridge
Примечание: в 3-й версии Docker Compose функция "depends_on" работает несколько иначе, чем во 2-й версии. Она по-прежнему контролирует порядок запуска контейнеров, но не гарантирует, что зависимый сервис полностью готов к работе, прежде чем запускать зависящий от него сервис. В частности, эта функциональность не будет ожидать готовности зависимых сервисов в рамках кластера при использовании Docker Swarm.
Из-за этой особенности в Docker Compose версии 3 мы используем скрипт wait-for-it в каждом контейнере (Dockerfile, пример которого можно увидеть в репозитории), чтобы гарантировать, что все необходимые сервисы доступны и полностью функционируют, прежде чем начнется выполнение основной программы.Скрипт wait-for-it обеспечивает простой и эффективный способ ожидания доступности TCP-хоста и порта.
Это позволяет нам контролировать порядок запуска сервисов и гарантировать, что все необходимые зависимости удовлетворены, прежде чем сервис начнет свою работу.
Комментарии (4)
hello_my_name_is_dany
30.09.2023 16:57+4В 3-й версии Docker Compose была удалена функция "depends_on" для контроля за зависимостями сервисами, которая была доступна в 2-й версии. Эта функция позволяла автоматически ожидать, пока зависимые сервисы будут полностью запущены, прежде чем запускать зависящий сервис.
Вы тут немного путаете, в самом docker compose
depends_on
остаётся и работает, а вот в docker swarm он игнорируется
sandryunin
30.09.2023 16:57+2Я еще я заметил сразу не читая весь код целиком, что вы любите паниковать, тем самым нарушая один из принципов самого GO.
if err != nil { panic(err) }
В таких местах лучше заменить на Fatal, а лучше вообще обработать ошибку.
go func() { for { select { case msg := <-partConsumer.Messages(): responseID := string(msg.Key) mu.Lock() ch, exists := responseChannels[responseID] if exists { ch <- msg delete(responseChannels, responseID) } mu.Unlock() } } }()
А вот в этой горутине вы читаете из канала, при этом никак не обрабатывая дефолт, например на случай если канал уже закрыт.
requestID := fmt.Sprintf("%d", time.Now().UnixNano()) responseChannels[requestID] = responseCh
А вот таким кодом есть риск поломаться, если в хендлер придет нагрузка, вы конечно можете сказать ято это прекрасный вариант схлопнуть запросу, скажу что нет. Для этого используйте sync и SingleFlightGroup.
И это я глянул только 1 сервис, не смотря что там дальше, при этом я не го разработчик, а девопс. Но даже я на своем уровне знания код так не пишу, потому что он уж очень новичковый, и напоминает код который пишут ютуберы, которые пытаются учить гошке.
Если вы беретесь за то чтобы показать как надо, то делайте это правильно, представьте сколько не окрепших умов попытаются протащить такое в продакшен))) Ну и на последок, тащить gin ради 1 ручки, можно было бы взять чего попроще, родной http или fastHttp ну на крайняк mux, который вроде до сих пор в архиве.
У меня все, коллеги разработчики если что еще поправят.
icecube092
Подход запрос-ответ через брокер считается дурной практикой, если вам нужно чтобы был ответ на запрос то лучше ставить сервисы за балансировщик. Брокеры нужны чтобы "когда-нибудь" обрабатывать сообщения, а не моментально отвечать
m68k
статья похожа на набор вредных советов. Тут кафка как собаке 5 нога:
* api gateway невозможно масштабировать, при падении он всё потеряет
* ключи(поле key) в кафке для выбора партиций используют, здесь партиция только 0, кафку тоже получается не замасштабируешь. Нужно коньюмер группу для этого использовать
* second microservice тоже не масштабирутся потому что 1 инстанс сервиса == 1 партиция кафки.
????