В данной статье представлен простой способ реализации микросервисной архитектуры с использованием Kafka, Golang и Docker.

Если вы сразу хотите перейти к рабочему коду, вот ссылка на репозиторий

⚠️ Важно: Не для Продакшена!

Этот материал представляет собой исключительно образовательный пример и демонстрирует техническую возможность реализации подхода запрос-ответ через брокера. Следует подчеркнуть, что в реальных средах подход запрос-ответ через брокер сообщений часто считается дурной практикой. В общепринятых паттернах архитектуры брокеры сообщений обычно используются для того, чтобы "когда-нибудь" обработать сообщение, а не для моментального ответа на запрос. Если в вашей системе необходимо получать моментальные ответы на запросы, рассмотрите возможность размещения сервисов за балансировщиком нагрузки.

Общий процесс работы

  1. Клиент отправляет HTTP-запрос на первый микросервис (API Gateway), используя, например, Postman.

  2. API Gateway передает запрос в Kafka, откуда его принимает второй микросервис.

  3. Второй микросервис обрабатывает запрос и отправляет ответ обратно в Kafka.

  4. API Gateway извлекает ответ из Kafka и возвращает его клиенту.

Примечание: Важным аспектом является возможность сопоставления запросов и ответов, чтобы при параллельной обработке множества запросов каждый ответ был возвращен соответствующему клиенту.

Краткий обзор кода

Конфигурация API Gateway:

Файл api-gateway/main.go:

package main

import (
	"encoding/json"
	"log"
	"sync"
	"time"

	"github.com/IBM/sarama"
	"github.com/gin-gonic/gin"
	"github.com/google/uuid"
)

// MyMessage - структура для нашего сообщения
type MyMessage struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Value string `json:"value"`
}

// responseChannels - словарь для хранения каналов ответов, индексированных по ID запроса
// mu - мьютекс для обеспечения синхронизации доступа к словарю responseChannels
var responseChannels map[string]chan *sarama.ConsumerMessage
var mu sync.Mutex

func main() {
	responseChannels = make(map[string]chan *sarama.ConsumerMessage)

	// Создание продюсера Kafka
	producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create producer: %v", err)
	}
	defer producer.Close()

	// Создание консьюмера Kafka
	consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	// Подписка на партицию "pong" в Kafka
	partConsumer, err := consumer.ConsumePartition("pong", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Failed to consume partition: %v", err)
	}
	defer partConsumer.Close()

	// Горутина для обработки входящих сообщений от Kafka
	go func() {
		for {
			select {
			// Чтение сообщения из Kafka
			case msg, ok := <-partConsumer.Messages():
				if !ok {
					log.Println("Channel closed, exiting goroutine")
					return
				}
				responseID := string(msg.Key)
				mu.Lock()
				ch, exists := responseChannels[responseID]
				if exists {
					ch <- msg
					delete(responseChannels, responseID)
				}
				mu.Unlock()
			}
		}
	}()

	// Инициализация роутера Gin
	router := gin.Default()
	router.GET("/ping", func(c *gin.Context) {
		requestID := uuid.New().String()

		message := MyMessage{
			ID:    requestID,
			Name:  "Ping",
			Value: "Pong",
		}

		// Преобразование сообщения в JSON что бы потом отправить через kafka
		bytes, err := json.Marshal(message)
		if err != nil {
			c.JSON(500, gin.H{"error": "failed to marshal JSON"})
			return
		}

		msg := &sarama.ProducerMessage{
			Topic: "ping",
			Key:   sarama.StringEncoder(requestID),
			Value: sarama.ByteEncoder(bytes),
		}

		// отправка сообщения в Kafka
		_, _, err = producer.SendMessage(msg)
		if err != nil {
			log.Printf("Failed to send message to Kafka: %v", err)
			c.JSON(500, gin.H{"error": "failed to send message to Kafka"})
			return
		}

		responseCh := make(chan *sarama.ConsumerMessage)
		mu.Lock()
		responseChannels[requestID] = responseCh
		mu.Unlock()

		select {
		case responseMsg := <-responseCh:
			c.JSON(200, gin.H{"message": string(responseMsg.Value)})
		case <-time.After(10 * time.Second):
			mu.Lock()
			delete(responseChannels, requestID)
			mu.Unlock()
			c.JSON(500, gin.H{"error": "timeout waiting for response"})
		}
	})

	if err := router.Run(":8080"); err != nil {
		log.Fatalf("Failed to run server: %v", err)
	}
}

Второй микросервис:

Файл second-microservice/main.go:

package main

import (
	"encoding/json"
	"log"

	"github.com/IBM/sarama"
)

// Наша структура для сообщения
type MyMessage struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Value string `json:"value"`
}

func main() {
	// Создание продюсера Kafka
	producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create producer: %v", err)
	}
	defer producer.Close()

	// Создание консьюмера Kafka
	consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}
	defer consumer.Close()

	// Подписка на партицию "ping" в Kafka
	partConsumer, err := consumer.ConsumePartition("ping", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatalf("Failed to consume partition: %v", err)
	}
	defer partConsumer.Close()

	for {
		select {
		// (обработка входящего сообщения и отправка ответа в Kafka)
		case msg, ok := <-partConsumer.Messages():
			if !ok {
				log.Println("Channel closed, exiting")
				return
			}

			// Десериализация входящего сообщения из JSON
			var receivedMessage MyMessage
			err := json.Unmarshal(msg.Value, &receivedMessage)

			if err != nil {
				log.Printf("Error unmarshaling JSON: %v\n", err)
				continue
			}

			log.Printf("Received message: %+v\n", receivedMessage)

			responseText := receivedMessage.Name + " " + receivedMessage.Value + " ( " + receivedMessage.ID + " ) "

			// Формируем ответное сообщение
			resp := &sarama.ProducerMessage{
				Topic: "pong",
				Key:   sarama.StringEncoder(receivedMessage.ID),
				Value: sarama.StringEncoder(responseText),
			}
			// Отпровляем ответ в gateway
			_, _, err = producer.SendMessage(resp)
			if err != nil {
				log.Printf("Failed to send message to Kafka: %v", err)
			}
		}
	}
}

Конфигурация docker-compose ( для 3 версии )

Файл docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    networks:
      - kafka-network

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
    networks:
      - kafka-network

  api-gateway:
    build:
      context: ./api-gateway
      dockerfile: Dockerfile
    depends_on:
      - kafka
    networks:
      - kafka-network
    ports:
      - "8080:8080"


  second-microservice:
    build:
      context: ./second-microservice
      dockerfile: Dockerfile
    depends_on:
      - kafka
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

Примечание: в 3-й версии Docker Compose функция "depends_on" работает несколько иначе, чем во 2-й версии. Она по-прежнему контролирует порядок запуска контейнеров, но не гарантирует, что зависимый сервис полностью готов к работе, прежде чем запускать зависящий от него сервис. В частности, эта функциональность не будет ожидать готовности зависимых сервисов в рамках кластера при использовании Docker Swarm.

Из-за этой особенности в Docker Compose версии 3 мы используем скрипт wait-for-it в каждом контейнере (Dockerfile, пример которого можно увидеть в репозитории), чтобы гарантировать, что все необходимые сервисы доступны и полностью функционируют, прежде чем начнется выполнение основной программы.Скрипт wait-for-it обеспечивает простой и эффективный способ ожидания доступности TCP-хоста и порта.

Это позволяет нам контролировать порядок запуска сервисов и гарантировать, что все необходимые зависимости удовлетворены, прежде чем сервис начнет свою работу.

Рабочий код в репозитории

Комментарии (4)


  1. icecube092
    30.09.2023 16:57
    +12

    Подход запрос-ответ через брокер считается дурной практикой, если вам нужно чтобы был ответ на запрос то лучше ставить сервисы за балансировщик. Брокеры нужны чтобы "когда-нибудь" обрабатывать сообщения, а не моментально отвечать


    1. m68k
      30.09.2023 16:57
      +3

      статья похожа на набор вредных советов. Тут кафка как собаке 5 нога:
      * api gateway невозможно масштабировать, при падении он всё потеряет
      * ключи(поле key) в кафке для выбора партиций используют, здесь партиция только 0, кафку тоже получается не замасштабируешь. Нужно коньюмер группу для этого использовать
      * second microservice тоже не масштабирутся потому что 1 инстанс сервиса == 1 партиция кафки.
      ????


  1. hello_my_name_is_dany
    30.09.2023 16:57
    +4

    В 3-й версии Docker Compose была удалена функция "depends_on" для контроля за зависимостями сервисами, которая была доступна в 2-й версии. Эта функция позволяла автоматически ожидать, пока зависимые сервисы будут полностью запущены, прежде чем запускать зависящий сервис.

    Вы тут немного путаете, в самом docker compose depends_on остаётся и работает, а вот в docker swarm он игнорируется


  1. sandryunin
    30.09.2023 16:57
    +2

    Я еще я заметил сразу не читая весь код целиком, что вы любите паниковать, тем самым нарушая один из принципов самого GO.

    if err != nil {
    		panic(err)
    	}

    В таких местах лучше заменить на Fatal, а лучше вообще обработать ошибку.

    go func() {
    		for {
    			select {
    			case msg := <-partConsumer.Messages():
    				responseID := string(msg.Key)
    				mu.Lock()
    				ch, exists := responseChannels[responseID]
    				if exists {
    					ch <- msg
    					delete(responseChannels, responseID)
    				}
    				mu.Unlock()
    			}
    		}
    	}()

    А вот в этой горутине вы читаете из канала, при этом никак не обрабатывая дефолт, например на случай если канал уже закрыт.

    requestID := fmt.Sprintf("%d", time.Now().UnixNano())
    
    responseChannels[requestID] = responseCh

    А вот таким кодом есть риск поломаться, если в хендлер придет нагрузка, вы конечно можете сказать ято это прекрасный вариант схлопнуть запросу, скажу что нет. Для этого используйте sync и SingleFlightGroup.

    И это я глянул только 1 сервис, не смотря что там дальше, при этом я не го разработчик, а девопс. Но даже я на своем уровне знания код так не пишу, потому что он уж очень новичковый, и напоминает код который пишут ютуберы, которые пытаются учить гошке.

    Если вы беретесь за то чтобы показать как надо, то делайте это правильно, представьте сколько не окрепших умов попытаются протащить такое в продакшен))) Ну и на последок, тащить gin ради 1 ручки, можно было бы взять чего попроще, родной http или fastHttp ну на крайняк mux, который вроде до сих пор в архиве.

    У меня все, коллеги разработчики если что еще поправят.