Apache Kafka — популярный распределенный брокер сообщений, который собирает и сохраняет сообщения (данные) со всех источников, а после предоставляет их программам и сервисам-получателям. Благодаря своей производительности и архитектуре инструмент довольно активно используется в проектах, где нужно гонять большие объемы данных. Но даже возможностей Kafka не всегда достаточно — иногда системам нужен буст по скорости и надежности. И получить его можно с помощью кэширования данных в Tarantool.

Меня зовут Никита Молоствов. Я старший программист в команде разработки VK Tech. В этой статье я расскажу о том, как можно наладить взаимодействие Kafka и Tarantool, зачем может понадобиться кэширование потока из Kafka в Tarantool, и дам подробную пошаговую инструкцию, с которой каждый сможет применить мои наработки в собственной системе.

Немного о Kafka

Apache Kafka — распределенная потоковая платформа для обработки данных в реальном времени. Инструмент ориентирован на сценарии, где данные передаются между компонентами в виде непрерывных потоков событий, а также для аналитики в реальном времени. Kafka сохраняет данные вдолгую, что позволяет обрабатывать и анализировать их в любой момент, даже после передачи целевым системам.

Kafka можно использовать в разных кейсах. В том числе:

  • для агрегации событий или логов;

  • для доставки событий многим потребителям;

  • для систем уведомлений и событий;

  • для проектирования event-driven-систем и не только.

Этому способствует ряд особенностей инструмента. Так, Kafka:

  • позволяет приложениям публиковать потоки записей (сообщений), а другим приложениям — подписываться на эти записи;

  • поддерживает горизонтальное масштабирование за счет распределения нагрузки между несколькими брокерами;

  • обеспечивает высокую пропускную способность благодаря асинхронной обработке сообщений;

  • обеспечивает надежное хранение и воспроизведение сообщений за счет хранения данных в виде журналов;

  • легко интегрируется с другими системами через API и различные коннекторы.

Вместе с тем Kafka больше ориентирована на обработку потоков данных в режиме реального времени, но не всегда может обеспечить быстрый доступ к часто запрашиваемым данным. Поэтому в кейсах, где такая возможность критически важна, потоки из Kafka рационально кэшировать в другие системы.

Предпосылки кэширования данных из Kafka

Есть несколько сценариев, когда кэширование потоков данных из Kafka может быть полезным. 

  • Если потребителям нужно часто обращаться к одним и тем же данным, кэширование поможет избежать повторных запросов к Kafka и ускорит доступ к этим данным. Например, если приложение часто запрашивает последние сообщения из конкретного топика, кэшированные данные ускорят получение нужных сведений.

  • Кэширование уменьшает количество обращений к Kafka, особенно если запросы происходят очень часто. Это помогает разгрузить систему и предотвратить ее перегрузку.

  • В отдельных кейсах кэш может использоваться для поддержания консистентности данных между разными компонентами системы. Например, можно использовать кэш для временного хранения промежуточных результатов до завершения транзакции.

  • Кэшируя данные, можно исключить необходимость повторного получения данных и, как результат, кратно сократить время ожидания и ускорить обработку.

При этом получение профита от кэширования потоков во многом зависит от выбранного целевого продукта, который эти данные будет принимать.

  • Инструмент должен обеспечивать быструю запись данных, поступающих из Kafka, чтобы минимизировать задержки между моментом поступления сообщения и его доступностью в кэше.

  • Кэш должен поддерживать быстрые операции чтения, особенно если данные используются в реальном времени.

  • Важна возможность добавления новых узлов для увеличения емкости хранилища и производительности системы.

  • Нужна поддержка работы в распределенной среде, где несколько узлов могут работать совместно для обработки больших объемов данных.

  • Важны механизмы обеспечения корректной обработки транзакций и исключения дублирования или пропуска данных.

  • В решении должны быть доступны механизмы репликации и восстановления, которые смогут гарантировать сохранность данных даже в случае сбоя одного или нескольких узлов.

Этим требованиям полностью соответствует Tarantool. Причем, что важно, в отличие от Kafka, которая ориентирована на хранение и передачу больших объемов данных, Tarantool заточен и под предоставление быстрого доступа к ним, то есть подходит под задачи построения кэша.

Типовые этапы кэширования данных между системами

Базовый пайплайн кэширования потоков данных из одной системы в другую (в нашем случае — из Kafka в Tarantool) упрощенно можно свести к нескольким этапам.

  1. Сбор данных из Kafka. В первую очередь данные собираются из топиков Kafka. Для этого используется потребитель (consumer), который подписывается на нужные топики и получает сообщения. Потребитель может быть настроен таким образом, чтобы получать данные пакетами или индивидуально, в зависимости от требований приложения.

  2. Обработка сообщений. Полученные данные могут требовать предварительной обработки перед сохранением в Tarantool. Например, нужно извлечь полезную информацию из сообщений, преобразовать ее в нужный формат или агрегировать данные.

  3. Сохранение данных в Tarantool. После обработки данные сохраняются в Tarantool. Tarantool поддерживает различные типы данных, включая строки, числа, массивы и объекты. Данные могут храниться в виде записей в пространстве (space), которое является аналогом таблицы в реляционных базах данных.

Наряду с ними есть и этапы администрирования.

  1. Управление кэшированием. Важно правильно настроить размер кэша, время жизни данных и стратегию вытеснения устаревших данных. Это нужно, чтобы оптимизировать использование памяти и гарантировать актуальность данных.

  2. Синхронизация данных. Необходимо следить за тем, чтобы данные в Tarantool были синхронизированы с данными в Kafka.

  3. Мониторинг и управление ошибками. Например, надо отслеживать возможные сбои в соединении с Kafka или Tarantool, а также реагировать на ситуации, когда данные не могут быть корректно обработаны или сохранены.

От теории к практике: пример кэширования потока данных из Kafka в Tarantool

Теперь перейдем от теории к реальному кейсу реализации кэширования потоков данных из Kafka в Tarantool. Далее я подробно остановлюсь на всех ключевых моментах и этапах реализации.

Примечание: Описанный мной метод универсален и может пригодиться, если и в вашей системе потребуется значительно увеличить скорость и эффективность работы. Для удобства все конкретные примеры кода я оставил в репозитории — пользуйтесь.

Подготовка инфраструктуры

Для начала пишем небольшой docker-compose-файл для поднятия необходимых систем (ZooKeeper-server, Kafka-broker, Prometheus, Grafana, etcd). 

Примечание: Файлы настройки мониторинга есть в репозитории.

Код
services:
  zookeeper-server:
    container_name: zookeeper-server
    hostname: zookeeper-server
    image: bitnami/zookeeper:3.9.3-debian-12-r0
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports:
      - "2181:2181"
      - "2888:2888"
      - "8080:8080"
    networks:
      - cache

  kafka-broker:
    container_name: kafka-broker
    hostname: kafka-broker
    image: bitnami/kafka:3.8.1-debian-12-r0
    ports:
      - "29092:29092"
      - "9092:9092"
    depends_on:
      - zookeeper-server
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ZOOKEEPER_PROTOCOL=PLAINTEXT
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://:9092,EXTERNAL://127.0.0.1:29092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT

    networks:
      - cache

  prometheus:
    container_name: prometheus
    hostname: prometheus
    image: prom/prometheus:v2.55.0
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - ./monitoring/prometheus/configs:/etc/prometheus/configs:ro
    networks:
      - cache

  grafana:
    container_name: grafana
    hostname: grafana
    image: grafana/grafana:11.3.0-ubuntu
    ports:
      - "3000:3000"
    volumes:
      - ./monitoring/grafana/provisioning:/etc/grafana/provisioning
      - ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards:rw
    networks:
      - cache

networks:
  cache:
    external: true

Создаем сеть и поднимаем необходимые образы:

docker network create cache
docker compose up

Создаем топик с 10 партициями:

docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-topics.sh --create
--topic invalidation --bootstrap-server localhost:9092 --partitions 10

Для наглядности кейса вносим в Kafka 10 млн событий:

docker exec -i kafka-broker bash -c ' \

for i in {1..1000000}; do \

echo $i:{\"id\":$i\,\"time\":$EPOCHSECONDS} >> /tmp/explekafka ;\

done; \

for j in {1..10}; do \

sort -R /tmp/explekafka | /opt/bitnami/kafka/bin/kafka-console-

producer.sh --bootstrap-server localhost:9092 --topic invalidation  --

property "parse.key=true" --property "key.separator=:" ; \

done; \

rm -rf /tmp/explekafka'

Подключаемся через set explorer и просматриваем вставленные сообщения:

Примечание: Базовый дашборд для мониторинга Tarantool доступен по ссылке.

Готовим контейнеры, в которых будут запущены наши приложения:

cd ./dockers/

docker build -t tarantool.local -f ./Dockerfile.tarantool .

docker build -t go.local -f ./Dockerfile.go .

На этом будем считать, что базовая настройка закончена, и приступим к написанию кода.

Решение «в лоб»

Собираем приложение на Golang для чтения потока из Kafka. Посмотрим, какие показатели будут в нем. 

Примечание: Файл в репозитории — example0/main.go.

Код
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"regexp"
	"strconv"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/collectors"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
	sendBatchesTimeout = 10 * time.Millisecond
	readTimeout        = 5 * time.Millisecond
)

type message struct {
	Id   int   `json:"id"`
	Time int64 `json:"Time"`
}

type mainApp struct {
	requestDurations       *prometheus.SummaryVec
	conKafka               *kafka.Consumer
	startTimeRPS           int64
	countMessage           int64
	allCountMessage        int64
	reteryCount            int
	tickerSendBatchTimeout *time.Ticker
	startAllRpsTime        int64
}

func Init() (*mainApp, error) {
	reg := prometheus.NewRegistry()
	requestDurations := prometheus.NewSummaryVec(
		prometheus.SummaryOpts{
			Name:       "request_durations_seconds",
			Help:       "latency distributions.",
			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
		},
		[]string{"service"},
	)

	reg.MustRegister(requestDurations)
	reg.MustRegister(
		collectors.NewGoCollector(
			collectors.WithGoCollectorRuntimeMetrics(
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/idle:cpu-seconds")},
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/total:cpu-seconds")},
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/user:cpu-seconds")},
			),
		),
	)

	http.Handle("/metrics", promhttp.HandlerFor(
		reg,
		promhttp.HandlerOpts{
			// Opt into OpenMetrics to support exemplars.
			EnableOpenMetrics: true,
		},
	))

	conKafka, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "kafka-broker:9092",
		"group.id":          "example0",
		"auto.offset.reset": "earliest",
	})
	if err != nil {
		return nil, err
	}
	err = conKafka.SubscribeTopics([]string{"invalidation"}, nil)
	if err != nil {
		return nil, err
	}

	return &mainApp{
		requestDurations:       requestDurations,
		conKafka:               conKafka,
		startTimeRPS:           time.Now().UnixNano(),
		countMessage:           0,
		allCountMessage:        0,
		startAllRpsTime:        0,
		tickerSendBatchTimeout: time.NewTicker(sendBatchesTimeout),
	}, nil
}

func (self *mainApp) readMessage() (*kafka.Message, error) {
	startTimeGetMessage := time.Now().UnixNano()
	msg, err := self.conKafka.ReadMessage(readTimeout)
	self.requestDurations.WithLabelValues("ReadMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
	return msg, err
}

func (self *mainApp) parseMesage(msg *kafka.Message) ([]interface{}, error) {
	var data message
	err := json.Unmarshal(msg.Value, &data)
	if err != nil {
		panic(fmt.Sprintf("Json parser error: %v (%v)\n", err, msg))
	}
	id, err := strconv.Atoi(string(msg.Key))
	if err != nil {
		panic(fmt.Sprintf("Kafka key parser error: %v (%v)\n", err, msg))
	}
	if id != data.Id {
		panic(fmt.Sprintf("Kafka key != message id: %v (%v)\n", err, msg))
	}

	value := string(msg.Value)
	tuple := []interface{}{id, nil, value}
	self.countMessage++
	self.allCountMessage++
	return tuple, nil
}

func (self *mainApp) printRPS() {
	stopTimeRPS := time.Now().UnixNano()
	if self.countMessage > 0 {
		time := stopTimeRPS - self.startTimeRPS
		log.Printf("time: %d count: %d rps: %f\n",
			time, self.countMessage, float64(self.countMessage*1e9)/float64(time))
		self.countMessage = 0
	} else {
		log.Printf("No message")
	}
	self.startTimeRPS = stopTimeRPS
}

func main() {
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)

	app, err := Init()
	if err != nil {
		log.Fatalf("Error init app: %s\n", err)
		os.Exit(1)
	}
	defer app.conKafka.Close()

	go func() {
		log.Println("Start http server for metrics!")
		err = http.ListenAndServe(":7081", nil)
		if err != nil {
			log.Fatalf("Error start http server: %s", err)
			os.Exit(1)
		}
	}()

	log.Println("Run read message!")
	run := true
	tickerRPS := time.NewTicker(time.Second)

	for run {
		select {
		case sig := <-sigchan:
			log.Printf("Caught signal %v: terminating\n", sig)
			run = false
		case <-tickerRPS.C:
			app.printRPS()
		default:
			msg, err := app.readMessage()
			if err != nil {
				if !err.(kafka.Error).IsTimeout() {
					log.Fatalf("Kafka error read message: %s\n", err)
					run = false
				}
				continue
			}
			if app.allCountMessage == 0 { // Игнорим время первого сообшения
				app.startAllRpsTime = time.Now().UnixNano()
			}
			_, err = app.parseMesage(msg)
			if err != nil {
				log.Fatalf("Error parseMesage message: %s\n", err)
				continue
			}

			if app.allCountMessage == 1e7 {
				stopRpsTime := time.Now().UnixNano()
				fmt.Printf("Processed 1e7 messages time: %d count: %d rps: %f\n",
					(stopRpsTime - app.startAllRpsTime), app.allCountMessage, float64((app.allCountMessage-1)*1e9)/float64(stopRpsTime-app.startAllRpsTime))
				run = false
			}
		}
	}

	os.Exit(0)
}

Собираем предложенный файл:

cd example0

go mod init example0

go get -u github.com/tarantool/go-tarantool/v2

go get github.com/confluentinc/confluent-kafka-go/v2/kafka

go get github.com/prometheus/client_golang

go mod tidy

go build .

Запускаем чтение репликационного потока из Kafka нашим GO-приложением:

docker run -it --rm --name go001 --net cache \

-v $PWD:/opt/go \

go.local ./example0

Получаем теоретический максимум для заданных условий на уровне 108533.100017 RPS.

В процессе работы можем посмотреть, что оффсет по партициям смещается:

watch docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-consumer-

groups.sh --bootstrap-server localhost:9092 --group example0 --describe

Для сброса оффсета используем команду:

docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-consumer-groups.sh

\

--bootstrap-server localhost:9092 \

--group example0 \

--topic invalidation \

--reset-offsets --to-earliest --execute

Синхронное чтение сообщений

Теперь попробуем вариант с синхронным чтением сообщений и коммитом Kafka после загрузки данных в Tarantool. Для этого примера используем Tarantool Community Edition 3.2.1 — она актуальна на момент написания статьи. 

Примечание: Код приложения для Tarantool — в файле example1/init.lua.

Код
local log = require('log')
local fiber = require('fiber')
local metrics = require('metrics')
local http_server = require('http.server')
local prometheus = require('metrics.plugins.prometheus')

local server,err = http_server.new('0.0.0.0', 6081)
if err then
    log.error("Failed to create HTTP server: %s", err)
    os.exit(1)
end
server:start()

metrics.enable_default_metrics()

server:route({path = '/metrics'}, prometheus.collect_http)

box.cfg {listen = 3301}

box.once("bootstrap", function()
    -- Create a new space named "test"
    box.schema.create_space('test', { if_not_exists = true })

    -- Define the space format
    box.space.test:format({
        {name = 'id', type = 'unsigned'},
        {name = 'value', type = 'string'}
    })

    -- Create a primary key for the space
    box.space.test:create_index('primary', {
        parts = {'id'}
    })
    
    log.info("Space 'test' created")
end)

fiber.create(function() 
    while true do
        log.info("spase test count: %s", box.space.test:count())
        fiber.sleep(5)
    end
end)

Готовим конфигурационный файл.

Примечание: В репозитории — example1/con g.yml.

Код
app:
  file: 'init.lua'

credentials:
  users:
    replicator:
      password: 'topsecret'
      roles: [replication]

    example_user:
      password: 'example_password'
      roles: [super]

iproto:
  advertise:
    peer:
      login: replicator

replication:
  failover: manual

groups:
  group001:
    replicasets:
      replicaset001:
        leader: instance001
        instances:
          instance001:
            iproto:
              listen:
              - uri: 'instance001:3301'

Запускаем одиночный инстанс:

docker run -it --rm --name instance001 --net cache \

-p 3301:3301 -p 6081:6081 \

-e TT_INSTANCE_NAME=instance001 \

-v $PWD/config.yml:/opt/tarantool/config.yml \

-v $PWD/files:/var/lib/tarantool/sys_env/default:rw \

-v $PWD/init.lua:/opt/tarantool/init.lua \

tarantool.local

Убиваем производительность синхронно и однопоточно

Примечание: Код приложения для чтения потока из Kafka — в файле example1/main.go.

Код
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"regexp"
	"strconv"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/collectors"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"github.com/tarantool/go-tarantool/v2"
)

var (
	tarantoolUser      = "example_user"
	tarantoolPassword  = "example_password"
	tarantoolUri       = "instance001:3301"
	batchSize          = 1000
	sendBatchesTimeout = 10 * time.Millisecond
	readTimeout        = 5 * time.Millisecond
)

type message struct {
	Id   int   `json:"id"`
	Time int64 `json:"Time"`
}

type mainApp struct {
	requestDurations       *prometheus.SummaryVec
	conKafka               *kafka.Consumer
	tarantoolConnect       *tarantool.Connection
	startTimeRPS           int64
	countMessage           int64
	allCountMessage        int64
	reteryCount            int
	tickerSendBatchTimeout *time.Ticker
	startAllRpsTime        int64
}

func Init(ctx context.Context) (*mainApp, error) {
	reg := prometheus.NewRegistry()
	requestDurations := prometheus.NewSummaryVec(
		prometheus.SummaryOpts{
			Name:       "request_durations_seconds",
			Help:       "latency distributions.",
			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
		},
		[]string{"service"},
	)

	reg.MustRegister(requestDurations)
	reg.MustRegister(
		collectors.NewGoCollector(
			collectors.WithGoCollectorRuntimeMetrics(
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/idle:cpu-seconds")},
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/total:cpu-seconds")},
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/user:cpu-seconds")},
			),
		),
	)

	http.Handle("/metrics", promhttp.HandlerFor(
		reg,
		promhttp.HandlerOpts{
			// Opt into OpenMetrics to support exemplars.
			EnableOpenMetrics: true,
		},
	))

	conKafka, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  "kafka-broker:9092",
		"group.id":           "example1",
		"auto.offset.reset":  "earliest",
		"enable.auto.commit": false,
	})
	if err != nil {
		return nil, err
	}
	err = conKafka.SubscribeTopics([]string{"invalidation"}, nil)
	if err != nil {
		return nil, err
	}

	dialer := tarantool.NetDialer{
		Address:  tarantoolUri,
		User:     tarantoolUser,
		Password: tarantoolPassword,
	}
	opts := tarantool.Opts{}

	conn, err := tarantool.Connect(ctx, dialer, opts)
	if err != nil || conn == nil {
		return nil, err
	}
	return &mainApp{
		requestDurations:       requestDurations,
		tarantoolConnect:       conn,
		conKafka:               conKafka,
		startTimeRPS:           time.Now().UnixNano(),
		countMessage:           0,
		allCountMessage:        0,
		startAllRpsTime:        0,
		tickerSendBatchTimeout: time.NewTicker(sendBatchesTimeout),
	}, nil
}

func (self *mainApp) readMessage() (*kafka.Message, error) {
	startTimeGetMessage := time.Now().UnixNano()
	msg, err := self.conKafka.ReadMessage(readTimeout)
	self.requestDurations.WithLabelValues("ReadMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
	return msg, err
}

func (self *mainApp) parseMesage(msg *kafka.Message) ([]interface{}, error) {
	var data message
	err := json.Unmarshal(msg.Value, &data)
	if err != nil {
		log.Fatalf("Json parser error: %v (%v)\n", err, msg)
		return nil, err
	}
	id, err := strconv.Atoi(string(msg.Key))
	if err != nil {
		log.Fatalf("Kafka key parser error: %v (%v)\n", err, msg)
		return nil, err
	}
	if id != data.Id {
		log.Fatalf("Kafka key != message id: %v (%v)\n", err, msg)
		return nil, err
	}

	value := string(msg.Value)
	tuple := []interface{}{id, value}
	return tuple, nil
}
func (self *mainApp) processed(tuple []interface{}) error {
	self.countMessage++
	self.allCountMessage++

	return self.send(tuple)
}
func (self *mainApp) send(tuple []interface{}) error {

	startTimeGetMessage := time.Now().UnixNano()
	startTimeGetMessage = time.Now().UnixNano()
	req := tarantool.NewReplaceRequest("test").Tuple(tuple)
	_, err := self.tarantoolConnect.Do(req).Get()
	self.requestDurations.WithLabelValues("sendMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
	if err != nil {
		return err
	}

	self.conKafka.Commit()
	self.tickerSendBatchTimeout.Reset(sendBatchesTimeout)
	return nil
}

func (self *mainApp) printRPS() {
	stopTimeRPS := time.Now().UnixNano()
	if self.countMessage > 0 {
		time := stopTimeRPS - self.startTimeRPS
		log.Printf("time: %d count: %d allCount: %d rps: %f\n",
			time, self.countMessage, self.allCountMessage, float64(self.countMessage*1e9)/float64(time))
		self.countMessage = 0
	} else {
		log.Printf("No message")
	}
	self.startTimeRPS = stopTimeRPS
}

func main() {
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	app, err := Init(ctx)
	if err != nil {
		log.Fatalf("Error init app: %s\n", err)
		os.Exit(1)
	}
	defer app.conKafka.Close()
	defer app.tarantoolConnect.Close()

	go func() {
		log.Println("Start http server for metrics!")
		err = http.ListenAndServe(":7081", nil)
		if err != nil {
			log.Fatalf("Error start http server: %s", err)
			os.Exit(1)
		}
	}()

	log.Println("Run read message!")
	run := true
	tickerRPS := time.NewTicker(time.Second)

	for run {
		select {
		case sig := <-sigchan:
			log.Printf("Caught signal %v: terminating\n", sig)
			run = false
		case <-tickerRPS.C:
			app.printRPS()
		default:
			msg, err := app.readMessage()
			if err != nil {
				if !err.(kafka.Error).IsTimeout() {
					log.Fatalf("Kafka error read message: %s\n", err)
					run = false
				}
				continue
			}
			if app.allCountMessage == 0 { // Игнорим время первого сообшения
				app.startAllRpsTime = time.Now().UnixNano()
			}
			tuple, err := app.parseMesage(msg)
			if err != nil {
				log.Fatalf("Error parseMesage message: %s\n", err)
				continue
			}
			err = app.processed(tuple)
			if err != nil {
				log.Fatalf("Error send to tarantool: %s\n", err)
				run = false
			}
			if app.allCountMessage == 1e5 {
				stopRpsTime := time.Now().UnixNano()
				fmt.Printf("Processed 1e7 messages time: %d count: %d rps: %f\n",
					(stopRpsTime - app.startAllRpsTime), app.allCountMessage, float64((app.allCountMessage-1)*1e9)/float64(stopRpsTime-app.startAllRpsTime))
				run = false
			}
		}
	}

	os.Exit(0)
}

Запустим чтение репликационного потока из Kafka нашим GO-приложением. 

Примечание: Далее я не буду давать инструкции по сборке исполняемого GO-файла — они аналогичны.

docker run -it --rm --name go001 --net cache \

-v $PWD:/opt/go \

go.local ./example1

Мы получили средний RPS для синхронного одногопоточного режима, равный 1127.402242.

Здесь довольно закономерно, что с последовательным чтением и записью в пределах одной транзакции производительность снижается практически в 100 раз. Чуть позже мы попробуем улучшить этот показатель.

Загрузка Tarantool операциями

Если внимательно посмотреть на график CPU Tarantool, то можно увидеть, что он не нагружен. Поэтому загрузим его операциями чтения и посмотрим, что будет с синхронными RPS. 

Для этого запустим нагрузочное тестирование с помощью k6, оптимизированного для работы с Tarantool. Загрузить его можно из GitHub

Примечание: Инструкция по сборке k6 — в репозитории.

Рассмотрим файл example1/testmaster.js:

Код
import tarantool from "k6/x/tarantool";
import { check } from "k6";

const conn = tarantool.connect(["localhost:3301"],{
    user: "example_user",
    pass: "example_password",
});

export default () => {
    const id = 1 + Math.floor(Math.random() * (1e6 - 1));
    const result = tarantool.call(conn, "box.space.test:get", [id]);
    check(result, {
        "code is 0": (r) => r.code === 0,
        "id is correct": (r) => {
            if (r.data && r.data[0] && r.data[0][0] === id) 
                return true;
            else {
                console.log(JSON.stringify({id: id, result: r})); 
                return false;
            }
        },
        "id in value is correct": (r) => r.data && r.data[0] && r.data[0][1] && JSON.parse(r.data[0][1]).id === id,
    });
};

Сбросим оффсет в Kafka и перезапустим наливку данных в Tarantool. Параллельно запустим нагрузку:

docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-consumer-groups.sh\

--bootstrap-server localhost:9092 \

--group example1 \

--topic invalidation \

--reset-offsets --to-earliest --execute

docker run -it --rm --name go001 --net cache \

-v $PWD:/opt/go \

go.local ./example1

k6-linux-amd64 run --vus 100 --duration 100s testmaster.js

Получаем 51139.576561 RPS с одного инстанса. Результат приемлемый. Вместе с тем примерно 430 операций на запись — уже почти в 3 раза хуже.

Для решения проблемы с записью во время чтения добавим реплику Tarantool и снова запустим цикл испытаний.

Примечание: Описание конфигурационного файла example1/config_replica.yml.

Код
app:
  file: 'init.lua'

credentials:
  users:
    replicator:
      password: 'topsecret'
      roles: [replication]

    example_user:
      password: 'example_password'
      roles: [super]

iproto:
  advertise:
    peer:
      login: replicator

replication:
  failover: manual

groups:
  group001:
    replicasets:
      replicaset001:
        leader: instance001
        instances:
          instance001:
            iproto:
              listen:
              - uri: 'instance001:3301'
          instance002:
            iproto:
              listen:
              - uri: 'instance002:3301'

Производительность снизилась до 1154.198970 RPS, но это вполне допустимо.

app:
  file: 'init.lua'

credentials:
  users:
    replicator:
      password: 'topsecret'
      roles: [replication]

    example_user:
      password: 'example_password'
      roles: [super]

iproto:
  advertise:
    peer:
      login: replicator

replication:
  failover: manual

groups:
  group001:
    replicasets:
      replicaset001:
        leader: instance001
        instances:
          instance001:
            iproto:
              listen:
              - uri: 'instance001:3301'
          instance002:
            iproto:
              listen:
              - uri: 'instance002:3301'

Примечание: Важно не забыть поменять порт подключения в тестовом скрипте.

Таким образом, под нагрузкой получаем следующие результаты:

  • чтение — 51020.952949 RPS;

  • запись — 1014.127257023 RPS.

Шардированный кластер и батчи

Для ускорения работы системы можно попробовать реализацию с шардированным кластером и батчами.

Для разворачивания шардированного кластера Tarantool можно использовать утилиту tt.

Я подготовил конфигурационный файл на 4 роутера и 4 реплики (example2/config.yml в репозитории):

Код
credentials:
  users:
    replicator:
      password: 'topsecret'
      roles: [replication, sharding, super]
    example_user:
      password: 'example_password'
      roles: [super]

iproto:
  advertise:
    peer:
      login: replicator

replication:
  failover: election

sharding:
  bucket_count: 10000

groups:
  routers:
    sharding:
      roles:
        - router
    roles:
      - roles.crud-router
      - router
    replicasets:
      route-1:
        instances:
          route-1-1:
            iproto:
              listen:
              - uri: 'route-1-1:3301'
          route-1-2:
            iproto:
              listen:
              - uri: 'route-1-2:3301'
      route-2:
        instances:
          route-2-1:
            iproto:
              listen:
              - uri: 'route-2-1:3301'
          route-2-2:
            iproto:
              listen:
              - uri: 'route-2-2:3301'
      route-3:
        instances:
          route-3-1:
            iproto:
              listen:
              - uri: 'route-3-1:3301'
          route-3-2:
            iproto:
              listen:
              - uri: 'route-3-2:3301'
      route-4:
        instances:
          route-4-1:
            iproto:
              listen:
              - uri: 'route-4-1:3301'
          route-4-2:
            iproto:
              listen:
              - uri: 'route-4-2:3301'
  storages:
    sharding:
      roles:
        - storage
    roles:
      - roles.crud-storage
      - storage
    replicasets:
      storage-1:
        instances:
          storage-1-1:
            iproto:
              listen:
              - uri: 'storage-1-1:3301'
          storage-1-2:
            iproto:
              listen:
              - uri: 'storage-1-2:3301'
      storage-2:
        instances:
          storage-2-1:
            iproto:
              listen:
              - uri: 'storage-2-1:3301'
          storage-2-2:
            iproto:
              listen:
              - uri: 'storage-2-2:3301'

Также я прописал инициализацию для роутера в router.lua (файл example2/router.lua в репозитории):

Код
local log = require('log')
local fiber = require('fiber')
local metrics = require('metrics')
local http_server = require('http.server')
local prometheus = require('metrics.plugins.prometheus')
local os = require('os')
local crud = require('crud')
local vshard = require('vshard')

local server,err = http_server.new('0.0.0.0', 6081)
if err then
    log.error("Failed to create HTTP server: %s", err)
    os.exit(1)
end
server:start()

metrics.enable_default_metrics()

server:route({path = '/metrics'}, prometheus.collect_http)

local function apply()
    while true do
        local ok, err = vshard.router.bootstrap({
            if_not_bootstrapped = true,
        })
        if ok then
            break
        end
        log.info(('Router bootstrap error: %s'):format(err))
        fiber.sleep(1)
    end
    crud.cfg({stats = true, stats_driver = 'metrics'})
    rawset(_G, "crud_get", function(space, index)
        return crud.get(space, index).rows
    end)

end

return {
    validate = function()end,
    apply = apply,
    stop = function()end,
    depends = {'roles.crud-router'},
}

Также подготовил инициализацию для стораджа в storage.lua (файл в репозитории example2/storage.lua):

Код
local log = require('log')
local fiber = require('fiber')
local metrics = require('metrics')
local http_server = require('http.server')
local prometheus = require('metrics.plugins.prometheus')

local server,err = http_server.new('0.0.0.0', 6081)
if err then
    log.error("Failed to create HTTP server: %s", err)
    os.exit(1)
end
server:start()

metrics.enable_default_metrics()

server:route({path = '/metrics'}, prometheus.collect_http)

local function apply()
    box.once("bootstrap", function()
        -- Create a sharding_space
        local sharding_space = box.schema.space.create('_ddl_sharding_key', {
            format = {
                {name = 'space_name', type = 'string', is_nullable = false},
                {name = 'sharding_key', type = 'array', is_nullable = false},
            },
            if_not_exists = true,
        })

        -- Create a primary key for the sharding_space
        sharding_space:create_index('space_name', {
            type = 'TREE',
            unique = true,
            parts = {{'space_name', 'string', is_nullable = false}},
            if_not_exists = true,
        })

        -- Create a new space named "test"
        box.schema.create_space('test', { if_not_exists = true })

        -- Define the space format
        box.space.test:format({
            {name = 'id', type = 'unsigned'},
            {name = 'bucket_id', type = 'unsigned'},
            {name = 'value', type = 'string'}
        })

        -- Create a primary key for the space
        box.space.test:create_index('primary', { parts = {'id'}})
        -- Create a bucket_id key for the space
        box.space.test:create_index('bucket_id', { parts = {'bucket_id'}, unique = false, if_not_exists = true})

        -- Add sharding key
        box.space._ddl_sharding_key:replace({'test', {'id'}})

        log.info("Space 'test' created")
    end)
end

return {
    validate = function()end,
    apply = apply,
    stop = function()end,
    depends = {'roles.crud-storage'},
}

Также нужен docker-compose-файл для запуска кластера:

Код
services:
  route-1-1:
    container_name: route-1-1
    hostname: route-1-1
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-1-1
    ports:
      -  3201:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-1-2:
    container_name: route-1-2
    hostname: route-1-2
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-1-2
    ports:
      -  3202:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-2-1:
    container_name: route-2-1
    hostname: route-2-1
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-2-1
    ports:
      -  3203:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-2-2:
    container_name: route-2-2
    hostname: route-2-2
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-2-2
    ports:
      -  3204:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-3-1:
    container_name: route-3-1
    hostname: route-3-1
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-3-1
    ports:
      -  3205:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-3-2:
    container_name: route-3-2
    hostname: route-3-2
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-3-2
    ports:
      -  3206:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-4-1:
    container_name: route-4-1
    hostname: route-4-1
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-4-1
    ports:
      -  3207:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  route-4-2:
    container_name: route-4-2
    hostname: route-4-2
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=route-4-2
    ports:
      -  3208:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./router.lua:/opt/tarantool/router.lua
    networks:
      - cache

  storage-1-1:
    container_name: storage-1-1
    hostname: storage-1-1
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=storage-1-1
    ports:
      -  3401:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./storage.lua:/opt/tarantool/storage.lua
    networks:
      - cache


  storage-1-2:
    container_name: storage-1-2
    hostname: storage-1-2
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=storage-1-2
    ports:
      -  3402:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./storage.lua:/opt/tarantool/storage.lua
    networks:
      - cache


  storage-2-1:
    container_name: storage-2-1
    hostname: storage-2-1
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=storage-2-1
    ports:
      -  3403:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./storage.lua:/opt/tarantool/storage.lua
    networks:
      - cache


  storage-2-2:
    container_name: storage-2-2
    hostname: storage-2-2
    image: tarantool.local
    environment:
      - TT_INSTANCE_NAME=storage-2-2
    ports:
      -  3404:3301
    volumes:
      - ./config.yml:/opt/tarantool/config.yml
      - ./files:/var/lib/tarantool/sys_env/default:rw
      - ./storage.lua:/opt/tarantool/storage.lua
    networks:
      - cache

networks:
  cache:
    external: true

Запускаем его командой:

docker compose up

Для работы с батчами и вставкой через crud.replace_many используем GО-приложение следующего вида (файл example2/main.go в репозитории):

Код
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"regexp"
	"strconv"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/collectors"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"github.com/tarantool/go-tarantool/v2"
	"github.com/tarantool/go-tarantool/v2/crud"
	"github.com/tarantool/go-tarantool/v2/pool"
)

var (
	tarantoolUser      = "example_user"
	tarantoolPassword  = "example_password"
	routerUriList      = []string{"route-1-1:3301", "route-1-2:3301"}
	batchSize          = 1000
	sendBatchesTimeout = 10 * time.Millisecond
	readTimeout        = 5 * time.Millisecond
)

type message struct {
	Id   int   `json:"id"`
	Time int64 `json:"Time"`
}

type mainApp struct {
	requestDurations       *prometheus.SummaryVec
	conKafka               *kafka.Consumer
	routerPool             *pool.ConnectionPool
	tuples                 []crud.Tuple
	startTimeRPS           int64
	countMessage           int64
	allCountMessage        int64
	reteryCount            int
	tickerSendBatchTimeout *time.Ticker
	startAllRpsTime        int64
}

func Init(ctx context.Context) (*mainApp, error) {
	reg := prometheus.NewRegistry()
	requestDurations := prometheus.NewSummaryVec(
		prometheus.SummaryOpts{
			Name:       "request_durations_seconds",
			Help:       "latency distributions.",
			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
		},
		[]string{"service"},
	)

	reg.MustRegister(requestDurations)
	reg.MustRegister(
		collectors.NewGoCollector(
			collectors.WithGoCollectorRuntimeMetrics(
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/idle:cpu-seconds")},
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/total:cpu-seconds")},
				collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/user:cpu-seconds")},
			),
		),
	)

	http.Handle("/metrics", promhttp.HandlerFor(
		reg,
		promhttp.HandlerOpts{
			// Opt into OpenMetrics to support exemplars.
			EnableOpenMetrics: true,
		},
	))
	pools := []pool.Instance{}
	for _, uri := range routerUriList {
		iface := pool.Instance{
			Dialer: tarantool.NetDialer{
				Address:  uri,
				User:     tarantoolUser,
				Password: tarantoolPassword,
			},
			Name: uri,
		}

		pools = append(pools, iface)
	}
	conKafka, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  "kafka-broker:9092",
		"group.id":           "example2",
		"auto.offset.reset":  "earliest",
		"enable.auto.commit": false,
	})
	if err != nil {
		return nil, err
	}
	err = conKafka.SubscribeTopics([]string{"invalidation"}, nil)
	if err != nil {
		return nil, err
	}

	routerPool, err := pool.Connect(ctx, pools)
	if err != nil || routerPool == nil {
		return nil, err
	}
	return &mainApp{
		requestDurations:       requestDurations,
		routerPool:             routerPool,
		conKafka:               conKafka,
		startTimeRPS:           time.Now().UnixNano(),
		countMessage:           0,
		allCountMessage:        0,
		startAllRpsTime:        0,
		tickerSendBatchTimeout: time.NewTicker(sendBatchesTimeout),
	}, nil
}

func (self *mainApp) readMessage() (*kafka.Message, error) {
	startTimeGetMessage := time.Now().UnixNano()
	msg, err := self.conKafka.ReadMessage(readTimeout)
	self.requestDurations.WithLabelValues("ReadMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
	return msg, err
}

func (self *mainApp) parseMesage(msg *kafka.Message) ([]interface{}, error) {
	var data message
	err := json.Unmarshal(msg.Value, &data)
	if err != nil {
		log.Fatalf("Json parser error: %v (%v)\n", err, msg)
		return nil, err
	}
	id, err := strconv.Atoi(string(msg.Key))
	if err != nil {
		log.Fatalf("Kafka key parser error: %v (%v)\n", err, msg)
		return nil, err
	}
	if id != data.Id {
		log.Fatalf("Kafka key != message id: %v (%v)\n", err, msg)
		return nil, err
	}

	value := string(msg.Value)
	tuple := []interface{}{id, nil, value}
	return tuple, nil
}
func (self *mainApp) processed(tuple []interface{}) error {
	self.tuples = append(self.tuples, tuple)
	self.countMessage++
	self.allCountMessage++
	if len(self.tuples) >= batchSize {
		return self.sendBatch()
	}
	return nil
}
func (self *mainApp) sendBatch() error {
	if len(self.tuples) == 0 {
		return nil
	}
	startTimeGetMessage := time.Now().UnixNano()
	req := crud.MakeReplaceManyRequest("test").Tuples(self.tuples)
	ret := crud.Result{}
	err := self.routerPool.Do(req, pool.ANY).GetTyped(&ret)
	self.requestDurations.WithLabelValues("sendMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
	if err != nil {
		return err
	}
	self.tuples = []crud.Tuple{}
	self.conKafka.Commit()
	self.tickerSendBatchTimeout.Reset(sendBatchesTimeout)
	return nil
}

func (self *mainApp) printRPS() {
	stopTimeRPS := time.Now().UnixNano()
	if self.countMessage > 0 {
		time := stopTimeRPS - self.startTimeRPS
		log.Printf("time: %d count: %d rps: %f\n",
			time, self.countMessage, float64(self.countMessage*1e9)/float64(time))
		self.countMessage = 0
	} else {
		log.Printf("No message")
	}
	self.startTimeRPS = stopTimeRPS
}

func main() {
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, os.Interrupt)

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	app, err := Init(ctx)
	if err != nil {
		log.Fatalf("Error init app: %s\n", err)
		os.Exit(1)
	}
	defer app.conKafka.Close()
	defer app.routerPool.Close()

	go func() {
		log.Println("Start http server for metrics!")
		err = http.ListenAndServe(":7081", nil)
		if err != nil {
			log.Fatalf("Error start http server: %s", err)
			os.Exit(1)
		}
	}()

	log.Println("Run read message!")
	run := true
	tickerRPS := time.NewTicker(time.Second)

	for run {
		select {
		case sig := <-sigchan:
			log.Printf("Caught signal %v: terminating\n", sig)
			run = false
		case <-tickerRPS.C:
			app.printRPS()
		case <-app.tickerSendBatchTimeout.C:
			err = app.sendBatch()
			if err != nil {
				log.Fatalf("Error send to tarantool: %s\n", err)
				run = false
			}
		default:
			msg, err := app.readMessage()
			if err != nil {
				if !err.(kafka.Error).IsTimeout() {
					log.Fatalf("Kafka error read message: %s\n", err)
					run = false
				}
				continue
			}
			if app.allCountMessage == 0 { // Игнорим время первого сообшения
				app.startAllRpsTime = time.Now().UnixNano()
			}
			tuple, err := app.parseMesage(msg)
			if err != nil {
				log.Fatalf("Error parseMesage message: %s\n", err)
				continue
			}
			err = app.processed(tuple)
			if err != nil {
				log.Fatalf("Error send to tarantool: %s\n", err)
				run = false
			}
			if app.allCountMessage == 1e7 {
				app.sendBatch()
				stopRpsTime := time.Now().UnixNano()
				fmt.Printf("Processed 1e7 messages time: %d count: %d rps: %f\n",
					(stopRpsTime - app.startAllRpsTime), app.allCountMessage, float64((app.allCountMessage-1)*1e9)/float64(stopRpsTime-app.startAllRpsTime))
				run = false
			}
		}
	}

	os.Exit(0)
}

Соберем и запустим его:

docker run -it --rm --name go001 --net cache \

-v $PWD:/opt/go \

go.local ./example2

Для батчевого режима без нагрузки мы получаем средний RPS на уровне 24030.055023.

Поменяем в скрипте тестирования эндпоинты на 2 свободных роутера и вызов на CRUD. После этого посмотрим на результат под нагрузкой (файл single/example2/testmaster.js в репозитории):

Под нагрузкой получаем:

  • ~32 400 RPS на чтение;

  • ~22 000 RPS на запись.

В моем случае это предел, ограниченный возможностями машины, — для наглядности кейса достаточно и этого. Вместе с тем эти показатели легко улучшить, просто добавив больше серверов.

Вместо выводов

Кэширование потоков из Apache Kafka в Tarantool — эффективный способ оптимизации работы с большими объемами данных в реальном времени. Подход помогает существенно снизить нагрузку на систему за счет уменьшения количества обращений к источнику данных и ускорения обработки запросов. Внедрение подобного решения может повысить производительность приложений, работающих с потоковыми данными, а также улучшить их надежность и масштабируемость.

Безусловно, реализация кэширования предполагает понимание особенностей инструментов, способов их синхронизации, учет требований к инфраструктуре и стратегии развертывания кэша. Вместе с тем приведенный мной пример наглядно показывает, что потенциальный профит от кэширования данных в Tarantool полностью оправдывает как вложенные ресурсы, так и затраченное время.

Комментарии (5)


  1. alexanderfedyukov
    05.12.2024 06:05

    По-моему кейс чересчур надуманный: при выполнении потоковой обработки сообщений нужно "оставаться в потоке" и все необходимые данные хранить под ногами у сервиса-обработчика ровно столько времени, сколько это ему необходимо. Кафка, в частности ее клиент, позволяет решить эти задачи из коробки, причем позволяет хранить только данные необходимые конкретному обработчику в данный момент времени для обработки конкретных партиций топика, а не все подряд, масштабироваться примерно по щелчку пальцев по сравнению с тарантулом. В случае включения в эту историю Тарантула появляются проблемы и накладные расходы, которые перевешивают весь профит от подключения: дополнительные инфраструктурные компоненты тарантула, которые требуют большее количества железа и отдельной команды сопровождения, проблемы синхронизации топиков кафки и структур тарантула, сложность или невозможность обеспечения транзакционности обработки сообщений, обеспечение fault tolerance и т.д.. А плюсы от включения далеко не очевидны. Тарантул был бы интересен как конкурент Redis/Ignite, в том числе при взаимодействии с БД. Но, например, реализации JSR 107 в нем нет, и только его подключение в проект будет нести дополнительные сложности.


    1. nikosias Автор
      05.12.2024 06:05

      По-моему кейс чересчур надуманный: при выполнении потоковой обработки сообщений нужно "оставаться в потоке" и все необходимые данные хранить под ногами у сервиса-обработчика ровно столько времени, сколько это ему необходимо.

      А что лучше монолит или микросервис?

      Кафка, в частности ее клиент, позволяет решить эти задачи из коробки, причем позволяет хранить только данные необходимые конкретному обработчику в данный момент времени для обработки конкретных партиций топика, а не все подряд, масштабироваться примерно по щелчку пальцев по сравнению с тарантулом.

      В статье как раз и показано как можно маштабировиться с одного инстанса до шардированного отказоустойчивого кластера.

      В случае включения в эту историю Тарантула появляются проблемы и накладные расходы, которые перевешивают весь профит от подключения: дополнительные инфраструктурные компоненты тарантула, которые требуют большее количества железа и отдельной команды сопровождения, проблемы синхронизации топиков кафки и структур тарантула, сложность или невозможность обеспечения транзакционности обработки сообщений, обеспечение fault tolerance и т.д.. А плюсы от включения далеко не очевидны.

      То есть вы предлагаете все это перенести на клиента? А если клиентских сервисов несколько? А при динамическом маштабировании? А когда количество данных в каждом топике по несколько гигабайт? А когда часть данных уже ушла из кафки?

      Мне кажется что тут стоит остановиться, так как данная тема сама по себе достойна отдельной статьи, и если вы ее напишите, добавьте плиз в комментариях ссылку мне очень интересна данная тема и пути их решения в кешах и витринах данных.

      Тарантул был бы интересен как конкурент Redis/Ignite, в том числе при взаимодействии с БД. Но, например, реализации JSR 107 в нем нет, и только его подключение в проект будет нести дополнительные сложности.

      В целом редис или тарантул это, в узких кругах довольно холиварная тема. Ну и существует tarantooldb где частично имплементирован протокол редиса.


  1. ris58h
    05.12.2024 06:05

    Вы про форматирование кода не слышали или это Хабр сломался?


    1. nikosias Автор
      05.12.2024 06:05

      Очень странно, поправил.


  1. alexanderfedyukov
    05.12.2024 06:05

    То есть вы предлагаете все это перенести на клиента? А если клиентских сервисов несколько? А при динамическом маштабировании? А когда количество данных в каждом топике по несколько гигабайт? А когда часть данных уже ушла из кафки?

    Таких проблем нет. Клиент - метка условная, "клиентский сервис" - такой же участник обработки, как и другие компоненты. И перенести в него данные можно и отмасштабироваться и отшардировать потоки. И как показала практика это сделать можно сильно проще, чем при использовании Тарантула. Самое важное - можно сделать так, чтобы клиент обрабатывал только свою часть данных, не обращаясь ко всему спейсу тарантула через балансировщики, роутеры, вычисляя нужные шарды и т.д.. И еще важнее - надежнее.