Apache Kafka — популярный распределенный брокер сообщений, который собирает и сохраняет сообщения (данные) со всех источников, а после предоставляет их программам и сервисам-получателям. Благодаря своей производительности и архитектуре инструмент довольно активно используется в проектах, где нужно гонять большие объемы данных. Но даже возможностей Kafka не всегда достаточно — иногда системам нужен буст по скорости и надежности. И получить его можно с помощью кэширования данных в Tarantool.
Меня зовут Никита Молоствов. Я старший программист в команде разработки VK Tech. В этой статье я расскажу о том, как можно наладить взаимодействие Kafka и Tarantool, зачем может понадобиться кэширование потока из Kafka в Tarantool, и дам подробную пошаговую инструкцию, с которой каждый сможет применить мои наработки в собственной системе.
Немного о Kafka
Apache Kafka — распределенная потоковая платформа для обработки данных в реальном времени. Инструмент ориентирован на сценарии, где данные передаются между компонентами в виде непрерывных потоков событий, а также для аналитики в реальном времени. Kafka сохраняет данные вдолгую, что позволяет обрабатывать и анализировать их в любой момент, даже после передачи целевым системам.
Kafka можно использовать в разных кейсах. В том числе:
для агрегации событий или логов;
для доставки событий многим потребителям;
для систем уведомлений и событий;
для проектирования event-driven-систем и не только.
Этому способствует ряд особенностей инструмента. Так, Kafka:
позволяет приложениям публиковать потоки записей (сообщений), а другим приложениям — подписываться на эти записи;
поддерживает горизонтальное масштабирование за счет распределения нагрузки между несколькими брокерами;
обеспечивает высокую пропускную способность благодаря асинхронной обработке сообщений;
обеспечивает надежное хранение и воспроизведение сообщений за счет хранения данных в виде журналов;
легко интегрируется с другими системами через API и различные коннекторы.
Вместе с тем Kafka больше ориентирована на обработку потоков данных в режиме реального времени, но не всегда может обеспечить быстрый доступ к часто запрашиваемым данным. Поэтому в кейсах, где такая возможность критически важна, потоки из Kafka рационально кэшировать в другие системы.
Предпосылки кэширования данных из Kafka
Есть несколько сценариев, когда кэширование потоков данных из Kafka может быть полезным.
Если потребителям нужно часто обращаться к одним и тем же данным, кэширование поможет избежать повторных запросов к Kafka и ускорит доступ к этим данным. Например, если приложение часто запрашивает последние сообщения из конкретного топика, кэшированные данные ускорят получение нужных сведений.
Кэширование уменьшает количество обращений к Kafka, особенно если запросы происходят очень часто. Это помогает разгрузить систему и предотвратить ее перегрузку.
В отдельных кейсах кэш может использоваться для поддержания консистентности данных между разными компонентами системы. Например, можно использовать кэш для временного хранения промежуточных результатов до завершения транзакции.
Кэшируя данные, можно исключить необходимость повторного получения данных и, как результат, кратно сократить время ожидания и ускорить обработку.
При этом получение профита от кэширования потоков во многом зависит от выбранного целевого продукта, который эти данные будет принимать.
Инструмент должен обеспечивать быструю запись данных, поступающих из Kafka, чтобы минимизировать задержки между моментом поступления сообщения и его доступностью в кэше.
Кэш должен поддерживать быстрые операции чтения, особенно если данные используются в реальном времени.
Важна возможность добавления новых узлов для увеличения емкости хранилища и производительности системы.
Нужна поддержка работы в распределенной среде, где несколько узлов могут работать совместно для обработки больших объемов данных.
Важны механизмы обеспечения корректной обработки транзакций и исключения дублирования или пропуска данных.
В решении должны быть доступны механизмы репликации и восстановления, которые смогут гарантировать сохранность данных даже в случае сбоя одного или нескольких узлов.
Этим требованиям полностью соответствует Tarantool. Причем, что важно, в отличие от Kafka, которая ориентирована на хранение и передачу больших объемов данных, Tarantool заточен и под предоставление быстрого доступа к ним, то есть подходит под задачи построения кэша.
Типовые этапы кэширования данных между системами
Базовый пайплайн кэширования потоков данных из одной системы в другую (в нашем случае — из Kafka в Tarantool) упрощенно можно свести к нескольким этапам.
Сбор данных из Kafka. В первую очередь данные собираются из топиков Kafka. Для этого используется потребитель (consumer), который подписывается на нужные топики и получает сообщения. Потребитель может быть настроен таким образом, чтобы получать данные пакетами или индивидуально, в зависимости от требований приложения.
Обработка сообщений. Полученные данные могут требовать предварительной обработки перед сохранением в Tarantool. Например, нужно извлечь полезную информацию из сообщений, преобразовать ее в нужный формат или агрегировать данные.
Сохранение данных в Tarantool. После обработки данные сохраняются в Tarantool. Tarantool поддерживает различные типы данных, включая строки, числа, массивы и объекты. Данные могут храниться в виде записей в пространстве (space), которое является аналогом таблицы в реляционных базах данных.
Наряду с ними есть и этапы администрирования.
Управление кэшированием. Важно правильно настроить размер кэша, время жизни данных и стратегию вытеснения устаревших данных. Это нужно, чтобы оптимизировать использование памяти и гарантировать актуальность данных.
Синхронизация данных. Необходимо следить за тем, чтобы данные в Tarantool были синхронизированы с данными в Kafka.
Мониторинг и управление ошибками. Например, надо отслеживать возможные сбои в соединении с Kafka или Tarantool, а также реагировать на ситуации, когда данные не могут быть корректно обработаны или сохранены.
От теории к практике: пример кэширования потока данных из Kafka в Tarantool
Теперь перейдем от теории к реальному кейсу реализации кэширования потоков данных из Kafka в Tarantool. Далее я подробно остановлюсь на всех ключевых моментах и этапах реализации.
Примечание: Описанный мной метод универсален и может пригодиться, если и в вашей системе потребуется значительно увеличить скорость и эффективность работы. Для удобства все конкретные примеры кода я оставил в репозитории — пользуйтесь.
Подготовка инфраструктуры
Для начала пишем небольшой docker-compose-файл для поднятия необходимых систем (ZooKeeper-server, Kafka-broker, Prometheus, Grafana, etcd).
Примечание: Файлы настройки мониторинга есть в репозитории.
Код
services:
zookeeper-server:
container_name: zookeeper-server
hostname: zookeeper-server
image: bitnami/zookeeper:3.9.3-debian-12-r0
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- "2181:2181"
- "2888:2888"
- "8080:8080"
networks:
- cache
kafka-broker:
container_name: kafka-broker
hostname: kafka-broker
image: bitnami/kafka:3.8.1-debian-12-r0
ports:
- "29092:29092"
- "9092:9092"
depends_on:
- zookeeper-server
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ZOOKEEPER_PROTOCOL=PLAINTEXT
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://:9092,EXTERNAL://127.0.0.1:29092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
networks:
- cache
prometheus:
container_name: prometheus
hostname: prometheus
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- ./monitoring/prometheus/configs:/etc/prometheus/configs:ro
networks:
- cache
grafana:
container_name: grafana
hostname: grafana
image: grafana/grafana:11.3.0-ubuntu
ports:
- "3000:3000"
volumes:
- ./monitoring/grafana/provisioning:/etc/grafana/provisioning
- ./monitoring/grafana/dashboards:/var/lib/grafana/dashboards:rw
networks:
- cache
networks:
cache:
external: true
Создаем сеть и поднимаем необходимые образы:
docker network create cache
docker compose up
Создаем топик с 10 партициями:
docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-topics.sh --create
--topic invalidation --bootstrap-server localhost:9092 --partitions 10
Для наглядности кейса вносим в Kafka 10 млн событий:
docker exec -i kafka-broker bash -c ' \
for i in {1..1000000}; do \
echo $i:{\"id\":$i\,\"time\":$EPOCHSECONDS} >> /tmp/explekafka ;\
done; \
for j in {1..10}; do \
sort -R /tmp/explekafka | /opt/bitnami/kafka/bin/kafka-console-
producer.sh --bootstrap-server localhost:9092 --topic invalidation --
property "parse.key=true" --property "key.separator=:" ; \
done; \
rm -rf /tmp/explekafka'
Подключаемся через set explorer и просматриваем вставленные сообщения:
Примечание: Базовый дашборд для мониторинга Tarantool доступен по ссылке.
Готовим контейнеры, в которых будут запущены наши приложения:
cd ./dockers/
docker build -t tarantool.local -f ./Dockerfile.tarantool .
docker build -t go.local -f ./Dockerfile.go .
На этом будем считать, что базовая настройка закончена, и приступим к написанию кода.
Решение «в лоб»
Собираем приложение на Golang для чтения потока из Kafka. Посмотрим, какие показатели будут в нем.
Примечание: Файл в репозитории — example0/main.go.
Код
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"regexp"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
sendBatchesTimeout = 10 * time.Millisecond
readTimeout = 5 * time.Millisecond
)
type message struct {
Id int `json:"id"`
Time int64 `json:"Time"`
}
type mainApp struct {
requestDurations *prometheus.SummaryVec
conKafka *kafka.Consumer
startTimeRPS int64
countMessage int64
allCountMessage int64
reteryCount int
tickerSendBatchTimeout *time.Ticker
startAllRpsTime int64
}
func Init() (*mainApp, error) {
reg := prometheus.NewRegistry()
requestDurations := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "request_durations_seconds",
Help: "latency distributions.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"service"},
)
reg.MustRegister(requestDurations)
reg.MustRegister(
collectors.NewGoCollector(
collectors.WithGoCollectorRuntimeMetrics(
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/idle:cpu-seconds")},
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/total:cpu-seconds")},
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/user:cpu-seconds")},
),
),
)
http.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
},
))
conKafka, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "kafka-broker:9092",
"group.id": "example0",
"auto.offset.reset": "earliest",
})
if err != nil {
return nil, err
}
err = conKafka.SubscribeTopics([]string{"invalidation"}, nil)
if err != nil {
return nil, err
}
return &mainApp{
requestDurations: requestDurations,
conKafka: conKafka,
startTimeRPS: time.Now().UnixNano(),
countMessage: 0,
allCountMessage: 0,
startAllRpsTime: 0,
tickerSendBatchTimeout: time.NewTicker(sendBatchesTimeout),
}, nil
}
func (self *mainApp) readMessage() (*kafka.Message, error) {
startTimeGetMessage := time.Now().UnixNano()
msg, err := self.conKafka.ReadMessage(readTimeout)
self.requestDurations.WithLabelValues("ReadMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
return msg, err
}
func (self *mainApp) parseMesage(msg *kafka.Message) ([]interface{}, error) {
var data message
err := json.Unmarshal(msg.Value, &data)
if err != nil {
panic(fmt.Sprintf("Json parser error: %v (%v)\n", err, msg))
}
id, err := strconv.Atoi(string(msg.Key))
if err != nil {
panic(fmt.Sprintf("Kafka key parser error: %v (%v)\n", err, msg))
}
if id != data.Id {
panic(fmt.Sprintf("Kafka key != message id: %v (%v)\n", err, msg))
}
value := string(msg.Value)
tuple := []interface{}{id, nil, value}
self.countMessage++
self.allCountMessage++
return tuple, nil
}
func (self *mainApp) printRPS() {
stopTimeRPS := time.Now().UnixNano()
if self.countMessage > 0 {
time := stopTimeRPS - self.startTimeRPS
log.Printf("time: %d count: %d rps: %f\n",
time, self.countMessage, float64(self.countMessage*1e9)/float64(time))
self.countMessage = 0
} else {
log.Printf("No message")
}
self.startTimeRPS = stopTimeRPS
}
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
app, err := Init()
if err != nil {
log.Fatalf("Error init app: %s\n", err)
os.Exit(1)
}
defer app.conKafka.Close()
go func() {
log.Println("Start http server for metrics!")
err = http.ListenAndServe(":7081", nil)
if err != nil {
log.Fatalf("Error start http server: %s", err)
os.Exit(1)
}
}()
log.Println("Run read message!")
run := true
tickerRPS := time.NewTicker(time.Second)
for run {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
run = false
case <-tickerRPS.C:
app.printRPS()
default:
msg, err := app.readMessage()
if err != nil {
if !err.(kafka.Error).IsTimeout() {
log.Fatalf("Kafka error read message: %s\n", err)
run = false
}
continue
}
if app.allCountMessage == 0 { // Игнорим время первого сообшения
app.startAllRpsTime = time.Now().UnixNano()
}
_, err = app.parseMesage(msg)
if err != nil {
log.Fatalf("Error parseMesage message: %s\n", err)
continue
}
if app.allCountMessage == 1e7 {
stopRpsTime := time.Now().UnixNano()
fmt.Printf("Processed 1e7 messages time: %d count: %d rps: %f\n",
(stopRpsTime - app.startAllRpsTime), app.allCountMessage, float64((app.allCountMessage-1)*1e9)/float64(stopRpsTime-app.startAllRpsTime))
run = false
}
}
}
os.Exit(0)
}
Собираем предложенный файл:
cd example0
go mod init example0
go get -u github.com/tarantool/go-tarantool/v2
go get github.com/confluentinc/confluent-kafka-go/v2/kafka
go get github.com/prometheus/client_golang
go mod tidy
go build .
Запускаем чтение репликационного потока из Kafka нашим GO-приложением:
docker run -it --rm --name go001 --net cache \
-v $PWD:/opt/go \
go.local ./example0
Получаем теоретический максимум для заданных условий на уровне 108533.100017 RPS.
В процессе работы можем посмотреть, что оффсет по партициям смещается:
watch docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-consumer-
groups.sh --bootstrap-server localhost:9092 --group example0 --describe
Для сброса оффсета используем команду:
docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-consumer-groups.sh
\
--bootstrap-server localhost:9092 \
--group example0 \
--topic invalidation \
--reset-offsets --to-earliest --execute
Синхронное чтение сообщений
Теперь попробуем вариант с синхронным чтением сообщений и коммитом Kafka после загрузки данных в Tarantool. Для этого примера используем Tarantool Community Edition 3.2.1 — она актуальна на момент написания статьи.
Примечание: Код приложения для Tarantool — в файле example1/init.lua.
Код
local log = require('log')
local fiber = require('fiber')
local metrics = require('metrics')
local http_server = require('http.server')
local prometheus = require('metrics.plugins.prometheus')
local server,err = http_server.new('0.0.0.0', 6081)
if err then
log.error("Failed to create HTTP server: %s", err)
os.exit(1)
end
server:start()
metrics.enable_default_metrics()
server:route({path = '/metrics'}, prometheus.collect_http)
box.cfg {listen = 3301}
box.once("bootstrap", function()
-- Create a new space named "test"
box.schema.create_space('test', { if_not_exists = true })
-- Define the space format
box.space.test:format({
{name = 'id', type = 'unsigned'},
{name = 'value', type = 'string'}
})
-- Create a primary key for the space
box.space.test:create_index('primary', {
parts = {'id'}
})
log.info("Space 'test' created")
end)
fiber.create(function()
while true do
log.info("spase test count: %s", box.space.test:count())
fiber.sleep(5)
end
end)
Готовим конфигурационный файл.
Примечание: В репозитории — example1/con g.yml.
Код
app:
file: 'init.lua'
credentials:
users:
replicator:
password: 'topsecret'
roles: [replication]
example_user:
password: 'example_password'
roles: [super]
iproto:
advertise:
peer:
login: replicator
replication:
failover: manual
groups:
group001:
replicasets:
replicaset001:
leader: instance001
instances:
instance001:
iproto:
listen:
- uri: 'instance001:3301'
Запускаем одиночный инстанс:
docker run -it --rm --name instance001 --net cache \
-p 3301:3301 -p 6081:6081 \
-e TT_INSTANCE_NAME=instance001 \
-v $PWD/config.yml:/opt/tarantool/config.yml \
-v $PWD/files:/var/lib/tarantool/sys_env/default:rw \
-v $PWD/init.lua:/opt/tarantool/init.lua \
tarantool.local
Убиваем производительность синхронно и однопоточно
Примечание: Код приложения для чтения потока из Kafka — в файле example1/main.go.
Код
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"regexp"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tarantool/go-tarantool/v2"
)
var (
tarantoolUser = "example_user"
tarantoolPassword = "example_password"
tarantoolUri = "instance001:3301"
batchSize = 1000
sendBatchesTimeout = 10 * time.Millisecond
readTimeout = 5 * time.Millisecond
)
type message struct {
Id int `json:"id"`
Time int64 `json:"Time"`
}
type mainApp struct {
requestDurations *prometheus.SummaryVec
conKafka *kafka.Consumer
tarantoolConnect *tarantool.Connection
startTimeRPS int64
countMessage int64
allCountMessage int64
reteryCount int
tickerSendBatchTimeout *time.Ticker
startAllRpsTime int64
}
func Init(ctx context.Context) (*mainApp, error) {
reg := prometheus.NewRegistry()
requestDurations := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "request_durations_seconds",
Help: "latency distributions.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"service"},
)
reg.MustRegister(requestDurations)
reg.MustRegister(
collectors.NewGoCollector(
collectors.WithGoCollectorRuntimeMetrics(
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/idle:cpu-seconds")},
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/total:cpu-seconds")},
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/user:cpu-seconds")},
),
),
)
http.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
},
))
conKafka, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "kafka-broker:9092",
"group.id": "example1",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
})
if err != nil {
return nil, err
}
err = conKafka.SubscribeTopics([]string{"invalidation"}, nil)
if err != nil {
return nil, err
}
dialer := tarantool.NetDialer{
Address: tarantoolUri,
User: tarantoolUser,
Password: tarantoolPassword,
}
opts := tarantool.Opts{}
conn, err := tarantool.Connect(ctx, dialer, opts)
if err != nil || conn == nil {
return nil, err
}
return &mainApp{
requestDurations: requestDurations,
tarantoolConnect: conn,
conKafka: conKafka,
startTimeRPS: time.Now().UnixNano(),
countMessage: 0,
allCountMessage: 0,
startAllRpsTime: 0,
tickerSendBatchTimeout: time.NewTicker(sendBatchesTimeout),
}, nil
}
func (self *mainApp) readMessage() (*kafka.Message, error) {
startTimeGetMessage := time.Now().UnixNano()
msg, err := self.conKafka.ReadMessage(readTimeout)
self.requestDurations.WithLabelValues("ReadMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
return msg, err
}
func (self *mainApp) parseMesage(msg *kafka.Message) ([]interface{}, error) {
var data message
err := json.Unmarshal(msg.Value, &data)
if err != nil {
log.Fatalf("Json parser error: %v (%v)\n", err, msg)
return nil, err
}
id, err := strconv.Atoi(string(msg.Key))
if err != nil {
log.Fatalf("Kafka key parser error: %v (%v)\n", err, msg)
return nil, err
}
if id != data.Id {
log.Fatalf("Kafka key != message id: %v (%v)\n", err, msg)
return nil, err
}
value := string(msg.Value)
tuple := []interface{}{id, value}
return tuple, nil
}
func (self *mainApp) processed(tuple []interface{}) error {
self.countMessage++
self.allCountMessage++
return self.send(tuple)
}
func (self *mainApp) send(tuple []interface{}) error {
startTimeGetMessage := time.Now().UnixNano()
startTimeGetMessage = time.Now().UnixNano()
req := tarantool.NewReplaceRequest("test").Tuple(tuple)
_, err := self.tarantoolConnect.Do(req).Get()
self.requestDurations.WithLabelValues("sendMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
if err != nil {
return err
}
self.conKafka.Commit()
self.tickerSendBatchTimeout.Reset(sendBatchesTimeout)
return nil
}
func (self *mainApp) printRPS() {
stopTimeRPS := time.Now().UnixNano()
if self.countMessage > 0 {
time := stopTimeRPS - self.startTimeRPS
log.Printf("time: %d count: %d allCount: %d rps: %f\n",
time, self.countMessage, self.allCountMessage, float64(self.countMessage*1e9)/float64(time))
self.countMessage = 0
} else {
log.Printf("No message")
}
self.startTimeRPS = stopTimeRPS
}
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
app, err := Init(ctx)
if err != nil {
log.Fatalf("Error init app: %s\n", err)
os.Exit(1)
}
defer app.conKafka.Close()
defer app.tarantoolConnect.Close()
go func() {
log.Println("Start http server for metrics!")
err = http.ListenAndServe(":7081", nil)
if err != nil {
log.Fatalf("Error start http server: %s", err)
os.Exit(1)
}
}()
log.Println("Run read message!")
run := true
tickerRPS := time.NewTicker(time.Second)
for run {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
run = false
case <-tickerRPS.C:
app.printRPS()
default:
msg, err := app.readMessage()
if err != nil {
if !err.(kafka.Error).IsTimeout() {
log.Fatalf("Kafka error read message: %s\n", err)
run = false
}
continue
}
if app.allCountMessage == 0 { // Игнорим время первого сообшения
app.startAllRpsTime = time.Now().UnixNano()
}
tuple, err := app.parseMesage(msg)
if err != nil {
log.Fatalf("Error parseMesage message: %s\n", err)
continue
}
err = app.processed(tuple)
if err != nil {
log.Fatalf("Error send to tarantool: %s\n", err)
run = false
}
if app.allCountMessage == 1e5 {
stopRpsTime := time.Now().UnixNano()
fmt.Printf("Processed 1e7 messages time: %d count: %d rps: %f\n",
(stopRpsTime - app.startAllRpsTime), app.allCountMessage, float64((app.allCountMessage-1)*1e9)/float64(stopRpsTime-app.startAllRpsTime))
run = false
}
}
}
os.Exit(0)
}
Запустим чтение репликационного потока из Kafka нашим GO-приложением.
Примечание: Далее я не буду давать инструкции по сборке исполняемого GO-файла — они аналогичны.
docker run -it --rm --name go001 --net cache \
-v $PWD:/opt/go \
go.local ./example1
Мы получили средний RPS для синхронного одногопоточного режима, равный 1127.402242.
Здесь довольно закономерно, что с последовательным чтением и записью в пределах одной транзакции производительность снижается практически в 100 раз. Чуть позже мы попробуем улучшить этот показатель.
Загрузка Tarantool операциями
Если внимательно посмотреть на график CPU Tarantool, то можно увидеть, что он не нагружен. Поэтому загрузим его операциями чтения и посмотрим, что будет с синхронными RPS.
Для этого запустим нагрузочное тестирование с помощью k6, оптимизированного для работы с Tarantool. Загрузить его можно из GitHub.
Примечание: Инструкция по сборке k6 — в репозитории.
Рассмотрим файл example1/testmaster.js:
Код
import tarantool from "k6/x/tarantool";
import { check } from "k6";
const conn = tarantool.connect(["localhost:3301"],{
user: "example_user",
pass: "example_password",
});
export default () => {
const id = 1 + Math.floor(Math.random() * (1e6 - 1));
const result = tarantool.call(conn, "box.space.test:get", [id]);
check(result, {
"code is 0": (r) => r.code === 0,
"id is correct": (r) => {
if (r.data && r.data[0] && r.data[0][0] === id)
return true;
else {
console.log(JSON.stringify({id: id, result: r}));
return false;
}
},
"id in value is correct": (r) => r.data && r.data[0] && r.data[0][1] && JSON.parse(r.data[0][1]).id === id,
});
};
Сбросим оффсет в Kafka и перезапустим наливку данных в Tarantool. Параллельно запустим нагрузку:
docker exec -i kafka-broker /opt/bitnami/kafka/bin/kafka-consumer-groups.sh\
--bootstrap-server localhost:9092 \
--group example1 \
--topic invalidation \
--reset-offsets --to-earliest --execute
docker run -it --rm --name go001 --net cache \
-v $PWD:/opt/go \
go.local ./example1
k6-linux-amd64 run --vus 100 --duration 100s testmaster.js
Получаем 51139.576561 RPS с одного инстанса. Результат приемлемый. Вместе с тем примерно 430 операций на запись — уже почти в 3 раза хуже.
Для решения проблемы с записью во время чтения добавим реплику Tarantool и снова запустим цикл испытаний.
Примечание: Описание конфигурационного файла example1/config_replica.yml.
Код
app:
file: 'init.lua'
credentials:
users:
replicator:
password: 'topsecret'
roles: [replication]
example_user:
password: 'example_password'
roles: [super]
iproto:
advertise:
peer:
login: replicator
replication:
failover: manual
groups:
group001:
replicasets:
replicaset001:
leader: instance001
instances:
instance001:
iproto:
listen:
- uri: 'instance001:3301'
instance002:
iproto:
listen:
- uri: 'instance002:3301'
Производительность снизилась до 1154.198970 RPS, но это вполне допустимо.
app:
file: 'init.lua'
credentials:
users:
replicator:
password: 'topsecret'
roles: [replication]
example_user:
password: 'example_password'
roles: [super]
iproto:
advertise:
peer:
login: replicator
replication:
failover: manual
groups:
group001:
replicasets:
replicaset001:
leader: instance001
instances:
instance001:
iproto:
listen:
- uri: 'instance001:3301'
instance002:
iproto:
listen:
- uri: 'instance002:3301'
Примечание: Важно не забыть поменять порт подключения в тестовом скрипте.
Таким образом, под нагрузкой получаем следующие результаты:
чтение — 51020.952949 RPS;
запись — 1014.127257023 RPS.
Шардированный кластер и батчи
Для ускорения работы системы можно попробовать реализацию с шардированным кластером и батчами.
Для разворачивания шардированного кластера Tarantool можно использовать утилиту tt.
Я подготовил конфигурационный файл на 4 роутера и 4 реплики (example2/config.yml в репозитории):
Код
credentials:
users:
replicator:
password: 'topsecret'
roles: [replication, sharding, super]
example_user:
password: 'example_password'
roles: [super]
iproto:
advertise:
peer:
login: replicator
replication:
failover: election
sharding:
bucket_count: 10000
groups:
routers:
sharding:
roles:
- router
roles:
- roles.crud-router
- router
replicasets:
route-1:
instances:
route-1-1:
iproto:
listen:
- uri: 'route-1-1:3301'
route-1-2:
iproto:
listen:
- uri: 'route-1-2:3301'
route-2:
instances:
route-2-1:
iproto:
listen:
- uri: 'route-2-1:3301'
route-2-2:
iproto:
listen:
- uri: 'route-2-2:3301'
route-3:
instances:
route-3-1:
iproto:
listen:
- uri: 'route-3-1:3301'
route-3-2:
iproto:
listen:
- uri: 'route-3-2:3301'
route-4:
instances:
route-4-1:
iproto:
listen:
- uri: 'route-4-1:3301'
route-4-2:
iproto:
listen:
- uri: 'route-4-2:3301'
storages:
sharding:
roles:
- storage
roles:
- roles.crud-storage
- storage
replicasets:
storage-1:
instances:
storage-1-1:
iproto:
listen:
- uri: 'storage-1-1:3301'
storage-1-2:
iproto:
listen:
- uri: 'storage-1-2:3301'
storage-2:
instances:
storage-2-1:
iproto:
listen:
- uri: 'storage-2-1:3301'
storage-2-2:
iproto:
listen:
- uri: 'storage-2-2:3301'
Также я прописал инициализацию для роутера в router.lua (файл example2/router.lua в репозитории):
Код
local log = require('log')
local fiber = require('fiber')
local metrics = require('metrics')
local http_server = require('http.server')
local prometheus = require('metrics.plugins.prometheus')
local os = require('os')
local crud = require('crud')
local vshard = require('vshard')
local server,err = http_server.new('0.0.0.0', 6081)
if err then
log.error("Failed to create HTTP server: %s", err)
os.exit(1)
end
server:start()
metrics.enable_default_metrics()
server:route({path = '/metrics'}, prometheus.collect_http)
local function apply()
while true do
local ok, err = vshard.router.bootstrap({
if_not_bootstrapped = true,
})
if ok then
break
end
log.info(('Router bootstrap error: %s'):format(err))
fiber.sleep(1)
end
crud.cfg({stats = true, stats_driver = 'metrics'})
rawset(_G, "crud_get", function(space, index)
return crud.get(space, index).rows
end)
end
return {
validate = function()end,
apply = apply,
stop = function()end,
depends = {'roles.crud-router'},
}
Также подготовил инициализацию для стораджа в storage.lua (файл в репозитории example2/storage.lua):
Код
local log = require('log')
local fiber = require('fiber')
local metrics = require('metrics')
local http_server = require('http.server')
local prometheus = require('metrics.plugins.prometheus')
local server,err = http_server.new('0.0.0.0', 6081)
if err then
log.error("Failed to create HTTP server: %s", err)
os.exit(1)
end
server:start()
metrics.enable_default_metrics()
server:route({path = '/metrics'}, prometheus.collect_http)
local function apply()
box.once("bootstrap", function()
-- Create a sharding_space
local sharding_space = box.schema.space.create('_ddl_sharding_key', {
format = {
{name = 'space_name', type = 'string', is_nullable = false},
{name = 'sharding_key', type = 'array', is_nullable = false},
},
if_not_exists = true,
})
-- Create a primary key for the sharding_space
sharding_space:create_index('space_name', {
type = 'TREE',
unique = true,
parts = {{'space_name', 'string', is_nullable = false}},
if_not_exists = true,
})
-- Create a new space named "test"
box.schema.create_space('test', { if_not_exists = true })
-- Define the space format
box.space.test:format({
{name = 'id', type = 'unsigned'},
{name = 'bucket_id', type = 'unsigned'},
{name = 'value', type = 'string'}
})
-- Create a primary key for the space
box.space.test:create_index('primary', { parts = {'id'}})
-- Create a bucket_id key for the space
box.space.test:create_index('bucket_id', { parts = {'bucket_id'}, unique = false, if_not_exists = true})
-- Add sharding key
box.space._ddl_sharding_key:replace({'test', {'id'}})
log.info("Space 'test' created")
end)
end
return {
validate = function()end,
apply = apply,
stop = function()end,
depends = {'roles.crud-storage'},
}
Также нужен docker-compose-файл для запуска кластера:
Код
services:
route-1-1:
container_name: route-1-1
hostname: route-1-1
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-1-1
ports:
- 3201:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-1-2:
container_name: route-1-2
hostname: route-1-2
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-1-2
ports:
- 3202:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-2-1:
container_name: route-2-1
hostname: route-2-1
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-2-1
ports:
- 3203:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-2-2:
container_name: route-2-2
hostname: route-2-2
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-2-2
ports:
- 3204:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-3-1:
container_name: route-3-1
hostname: route-3-1
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-3-1
ports:
- 3205:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-3-2:
container_name: route-3-2
hostname: route-3-2
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-3-2
ports:
- 3206:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-4-1:
container_name: route-4-1
hostname: route-4-1
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-4-1
ports:
- 3207:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
route-4-2:
container_name: route-4-2
hostname: route-4-2
image: tarantool.local
environment:
- TT_INSTANCE_NAME=route-4-2
ports:
- 3208:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./router.lua:/opt/tarantool/router.lua
networks:
- cache
storage-1-1:
container_name: storage-1-1
hostname: storage-1-1
image: tarantool.local
environment:
- TT_INSTANCE_NAME=storage-1-1
ports:
- 3401:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./storage.lua:/opt/tarantool/storage.lua
networks:
- cache
storage-1-2:
container_name: storage-1-2
hostname: storage-1-2
image: tarantool.local
environment:
- TT_INSTANCE_NAME=storage-1-2
ports:
- 3402:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./storage.lua:/opt/tarantool/storage.lua
networks:
- cache
storage-2-1:
container_name: storage-2-1
hostname: storage-2-1
image: tarantool.local
environment:
- TT_INSTANCE_NAME=storage-2-1
ports:
- 3403:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./storage.lua:/opt/tarantool/storage.lua
networks:
- cache
storage-2-2:
container_name: storage-2-2
hostname: storage-2-2
image: tarantool.local
environment:
- TT_INSTANCE_NAME=storage-2-2
ports:
- 3404:3301
volumes:
- ./config.yml:/opt/tarantool/config.yml
- ./files:/var/lib/tarantool/sys_env/default:rw
- ./storage.lua:/opt/tarantool/storage.lua
networks:
- cache
networks:
cache:
external: true
Запускаем его командой:
docker compose up
Для работы с батчами и вставкой через crud.replace_many используем GО-приложение следующего вида (файл example2/main.go в репозитории):
Код
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"regexp"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/crud"
"github.com/tarantool/go-tarantool/v2/pool"
)
var (
tarantoolUser = "example_user"
tarantoolPassword = "example_password"
routerUriList = []string{"route-1-1:3301", "route-1-2:3301"}
batchSize = 1000
sendBatchesTimeout = 10 * time.Millisecond
readTimeout = 5 * time.Millisecond
)
type message struct {
Id int `json:"id"`
Time int64 `json:"Time"`
}
type mainApp struct {
requestDurations *prometheus.SummaryVec
conKafka *kafka.Consumer
routerPool *pool.ConnectionPool
tuples []crud.Tuple
startTimeRPS int64
countMessage int64
allCountMessage int64
reteryCount int
tickerSendBatchTimeout *time.Ticker
startAllRpsTime int64
}
func Init(ctx context.Context) (*mainApp, error) {
reg := prometheus.NewRegistry()
requestDurations := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "request_durations_seconds",
Help: "latency distributions.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"service"},
)
reg.MustRegister(requestDurations)
reg.MustRegister(
collectors.NewGoCollector(
collectors.WithGoCollectorRuntimeMetrics(
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/idle:cpu-seconds")},
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/total:cpu-seconds")},
collectors.GoRuntimeMetricsRule{Matcher: regexp.MustCompile("/cpu/classes/user:cpu-seconds")},
),
),
)
http.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
// Opt into OpenMetrics to support exemplars.
EnableOpenMetrics: true,
},
))
pools := []pool.Instance{}
for _, uri := range routerUriList {
iface := pool.Instance{
Dialer: tarantool.NetDialer{
Address: uri,
User: tarantoolUser,
Password: tarantoolPassword,
},
Name: uri,
}
pools = append(pools, iface)
}
conKafka, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "kafka-broker:9092",
"group.id": "example2",
"auto.offset.reset": "earliest",
"enable.auto.commit": false,
})
if err != nil {
return nil, err
}
err = conKafka.SubscribeTopics([]string{"invalidation"}, nil)
if err != nil {
return nil, err
}
routerPool, err := pool.Connect(ctx, pools)
if err != nil || routerPool == nil {
return nil, err
}
return &mainApp{
requestDurations: requestDurations,
routerPool: routerPool,
conKafka: conKafka,
startTimeRPS: time.Now().UnixNano(),
countMessage: 0,
allCountMessage: 0,
startAllRpsTime: 0,
tickerSendBatchTimeout: time.NewTicker(sendBatchesTimeout),
}, nil
}
func (self *mainApp) readMessage() (*kafka.Message, error) {
startTimeGetMessage := time.Now().UnixNano()
msg, err := self.conKafka.ReadMessage(readTimeout)
self.requestDurations.WithLabelValues("ReadMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
return msg, err
}
func (self *mainApp) parseMesage(msg *kafka.Message) ([]interface{}, error) {
var data message
err := json.Unmarshal(msg.Value, &data)
if err != nil {
log.Fatalf("Json parser error: %v (%v)\n", err, msg)
return nil, err
}
id, err := strconv.Atoi(string(msg.Key))
if err != nil {
log.Fatalf("Kafka key parser error: %v (%v)\n", err, msg)
return nil, err
}
if id != data.Id {
log.Fatalf("Kafka key != message id: %v (%v)\n", err, msg)
return nil, err
}
value := string(msg.Value)
tuple := []interface{}{id, nil, value}
return tuple, nil
}
func (self *mainApp) processed(tuple []interface{}) error {
self.tuples = append(self.tuples, tuple)
self.countMessage++
self.allCountMessage++
if len(self.tuples) >= batchSize {
return self.sendBatch()
}
return nil
}
func (self *mainApp) sendBatch() error {
if len(self.tuples) == 0 {
return nil
}
startTimeGetMessage := time.Now().UnixNano()
req := crud.MakeReplaceManyRequest("test").Tuples(self.tuples)
ret := crud.Result{}
err := self.routerPool.Do(req, pool.ANY).GetTyped(&ret)
self.requestDurations.WithLabelValues("sendMessage").Observe(float64(time.Now().UnixNano() - startTimeGetMessage))
if err != nil {
return err
}
self.tuples = []crud.Tuple{}
self.conKafka.Commit()
self.tickerSendBatchTimeout.Reset(sendBatchesTimeout)
return nil
}
func (self *mainApp) printRPS() {
stopTimeRPS := time.Now().UnixNano()
if self.countMessage > 0 {
time := stopTimeRPS - self.startTimeRPS
log.Printf("time: %d count: %d rps: %f\n",
time, self.countMessage, float64(self.countMessage*1e9)/float64(time))
self.countMessage = 0
} else {
log.Printf("No message")
}
self.startTimeRPS = stopTimeRPS
}
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
app, err := Init(ctx)
if err != nil {
log.Fatalf("Error init app: %s\n", err)
os.Exit(1)
}
defer app.conKafka.Close()
defer app.routerPool.Close()
go func() {
log.Println("Start http server for metrics!")
err = http.ListenAndServe(":7081", nil)
if err != nil {
log.Fatalf("Error start http server: %s", err)
os.Exit(1)
}
}()
log.Println("Run read message!")
run := true
tickerRPS := time.NewTicker(time.Second)
for run {
select {
case sig := <-sigchan:
log.Printf("Caught signal %v: terminating\n", sig)
run = false
case <-tickerRPS.C:
app.printRPS()
case <-app.tickerSendBatchTimeout.C:
err = app.sendBatch()
if err != nil {
log.Fatalf("Error send to tarantool: %s\n", err)
run = false
}
default:
msg, err := app.readMessage()
if err != nil {
if !err.(kafka.Error).IsTimeout() {
log.Fatalf("Kafka error read message: %s\n", err)
run = false
}
continue
}
if app.allCountMessage == 0 { // Игнорим время первого сообшения
app.startAllRpsTime = time.Now().UnixNano()
}
tuple, err := app.parseMesage(msg)
if err != nil {
log.Fatalf("Error parseMesage message: %s\n", err)
continue
}
err = app.processed(tuple)
if err != nil {
log.Fatalf("Error send to tarantool: %s\n", err)
run = false
}
if app.allCountMessage == 1e7 {
app.sendBatch()
stopRpsTime := time.Now().UnixNano()
fmt.Printf("Processed 1e7 messages time: %d count: %d rps: %f\n",
(stopRpsTime - app.startAllRpsTime), app.allCountMessage, float64((app.allCountMessage-1)*1e9)/float64(stopRpsTime-app.startAllRpsTime))
run = false
}
}
}
os.Exit(0)
}
Соберем и запустим его:
docker run -it --rm --name go001 --net cache \
-v $PWD:/opt/go \
go.local ./example2
Для батчевого режима без нагрузки мы получаем средний RPS на уровне 24030.055023.
Поменяем в скрипте тестирования эндпоинты на 2 свободных роутера и вызов на CRUD. После этого посмотрим на результат под нагрузкой (файл single/example2/testmaster.js в репозитории):
Под нагрузкой получаем:
~32 400 RPS на чтение;
~22 000 RPS на запись.
В моем случае это предел, ограниченный возможностями машины, — для наглядности кейса достаточно и этого. Вместе с тем эти показатели легко улучшить, просто добавив больше серверов.
Вместо выводов
Кэширование потоков из Apache Kafka в Tarantool — эффективный способ оптимизации работы с большими объемами данных в реальном времени. Подход помогает существенно снизить нагрузку на систему за счет уменьшения количества обращений к источнику данных и ускорения обработки запросов. Внедрение подобного решения может повысить производительность приложений, работающих с потоковыми данными, а также улучшить их надежность и масштабируемость.
Безусловно, реализация кэширования предполагает понимание особенностей инструментов, способов их синхронизации, учет требований к инфраструктуре и стратегии развертывания кэша. Вместе с тем приведенный мной пример наглядно показывает, что потенциальный профит от кэширования данных в Tarantool полностью оправдывает как вложенные ресурсы, так и затраченное время.
Комментарии (5)
alexanderfedyukov
05.12.2024 06:05То есть вы предлагаете все это перенести на клиента? А если клиентских сервисов несколько? А при динамическом маштабировании? А когда количество данных в каждом топике по несколько гигабайт? А когда часть данных уже ушла из кафки?
Таких проблем нет. Клиент - метка условная, "клиентский сервис" - такой же участник обработки, как и другие компоненты. И перенести в него данные можно и отмасштабироваться и отшардировать потоки. И как показала практика это сделать можно сильно проще, чем при использовании Тарантула. Самое важное - можно сделать так, чтобы клиент обрабатывал только свою часть данных, не обращаясь ко всему спейсу тарантула через балансировщики, роутеры, вычисляя нужные шарды и т.д.. И еще важнее - надежнее.
alexanderfedyukov
По-моему кейс чересчур надуманный: при выполнении потоковой обработки сообщений нужно "оставаться в потоке" и все необходимые данные хранить под ногами у сервиса-обработчика ровно столько времени, сколько это ему необходимо. Кафка, в частности ее клиент, позволяет решить эти задачи из коробки, причем позволяет хранить только данные необходимые конкретному обработчику в данный момент времени для обработки конкретных партиций топика, а не все подряд, масштабироваться примерно по щелчку пальцев по сравнению с тарантулом. В случае включения в эту историю Тарантула появляются проблемы и накладные расходы, которые перевешивают весь профит от подключения: дополнительные инфраструктурные компоненты тарантула, которые требуют большее количества железа и отдельной команды сопровождения, проблемы синхронизации топиков кафки и структур тарантула, сложность или невозможность обеспечения транзакционности обработки сообщений, обеспечение fault tolerance и т.д.. А плюсы от включения далеко не очевидны. Тарантул был бы интересен как конкурент Redis/Ignite, в том числе при взаимодействии с БД. Но, например, реализации JSR 107 в нем нет, и только его подключение в проект будет нести дополнительные сложности.
nikosias Автор
А что лучше монолит или микросервис?
В статье как раз и показано как можно маштабировиться с одного инстанса до шардированного отказоустойчивого кластера.
То есть вы предлагаете все это перенести на клиента? А если клиентских сервисов несколько? А при динамическом маштабировании? А когда количество данных в каждом топике по несколько гигабайт? А когда часть данных уже ушла из кафки?
Мне кажется что тут стоит остановиться, так как данная тема сама по себе достойна отдельной статьи, и если вы ее напишите, добавьте плиз в комментариях ссылку мне очень интересна данная тема и пути их решения в кешах и витринах данных.
В целом редис или тарантул это, в узких кругах довольно холиварная тема. Ну и существует tarantooldb где частично имплементирован протокол редиса.