Заранее оговорюсь, всё что описано в данной статье, касается runtime (децентрализованного) кеша.
Зачем нам такое может понадобиться? По нескольким причинам:
У нас высокие требования к скорости работы приложения, дополнительные запросы к централизованному кешу нежелательны, с целью избежать сетевых взаимодействий.
Компания не знает как готовить отказоустойчивый кеш (например redis), или просто не хочет/не может затягивать новую технологию, усложнять инфраструктуру.
У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время. (будьте осторожны, если у вас высокие требования к синхронизации данных, возникает множество дополнительных проблем в виде проблем с сетью/производительность реплик/etc...)
Представим типичную ситуацию в распределенной среде: имеется N реплик, на каждой из них свой независимый кеш. Например в Repository (я намеренно опустил работу с примитивами синхронизации при обновлении значений кеша, так как цель продемонстрировать суть происходящего):
type db interface {
Find(key string) string
Update(key string, value string)
}
type Repository struct {
cache map[string]string
db db
}
func NewRepository(db db) Repository {
return Repository{db: db, cache: make(map[string]string)}
}
func (r *Repository) Update(key string, value string) {
r.db.Update(key, value)
r.updateCacheValue(key, value)
}
func (r *Repository) updateCacheValue(key string, value string) {
r.cache[key] = value
}
func (r *Repository) UpdateCache(key string) {
r.updateCacheValue(key, r.db.Find(key))
}
func (r *Repository) Find(key string) string {
if val, ok := r.cache[key]; ok {
return val
}
value := r.db.Find(key)
r.updateCacheValue(key, value)
return value
}
Это прекрасно работает, но не в распределенном приложении. Мы сталкиваемся с ситуацией, когда запрос пользователя на обновление, попал на одну случайную реплику и нам нужно уведомить всех об изменениях в данных, так как кеш локальный:

Можем использовать событийно-ориентированный подход и с помощью kafka мы легко решим эту проблему.
Для синхронизации мы можем использовать sync-topic, как это может выглядеть? А примерно так:

Напишем небольшой компонент для работы с топиком:
type KafkaSync struct {
reader *kafka.Reader
writer *kafka.Writer
}
func NewKafkaSync(reader *kafka.Reader, writer *kafka.Writer) KafkaSync {
return KafkaSync{reader: reader, writer: writer}
}
func (sync KafkaSync) Sync(ctx context.Context, key string) error {
return sync.writer.WriteMessages(ctx, kafka.Message{Value: []byte(key)})
}
func (sync KafkaSync) OnSync(ctx context.Context, cb func(key string)) error {
err := sync.reader.SetOffsetAt(ctx, time.Now())
if err != nil {
return fmt.Errorf("setting offset: %w", err)
}
for ctx.Err() == nil {
message, err := sync.reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("read message: %w", err)
}
cb(string(message.value))
}
return nil
}
Суть компонента в том, чтобы генерировать события в момент обновления данных - метод Sync, а также уметь их отлавливать - метод OnSync.
Псевдокод, который отразит принцип синхронизации:
kafkaSync := NewKafkaSync()
repo := NewRepository()
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
return kafkaSync.OnSync(ctx, func(key string) error {
// Тут мы отловим новое событие синхронизации на всех репликах
// и можем обновить кеш на каждой из них
repo.UpdateCache(key)
})
})
eg.Go(func() error {
// Представим что тут сработал HTTP Update эндпойнт, на какой то реплике
repo.Update("key1", "val1")
return kafka.sync()
})
eg.Wait()
При любом изменении хранилища, мы генерируем событие обновления в sync-topic, которое в свою очередь получат остальные, и обновят локальный кеш.
В топике мы можем пересылать:
Ключ записи которую нужно обновить, но необходимо идти в хранилище за новым значением.
Пустое сообщение. Иногда это нужно, чтобы просто уведомить о факте обновления хранилища, например если нужно обновить множество ключей.
Обновленное значение и ключ. Например, когда мы хотим избежать дополнительных походов в хранилище, чтобы не создавать нагрузку. Мы можем просто переслать это значение в топике.
Нужно учитывать важный момент при работе с kafka
Если sync-topic имеет несколько партиций, то распределение в рамках consumer group может сделать данную схему неработоспособной. Решение проблемы заключается в чтении топика без consumer group, тогда каждая реплика будет уведомлена о любом изменении, в любой партиции.
Комментарии (17)
lazy_val
13.05.2025 06:49У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время.
Применение Kafka и Event-Driven Architecture вопрос/проблему с инвалидацией разве решают как-то?
apolon13 Автор
13.05.2025 06:49Скорее речь о том, что с помощью этого подхода мы совсем отказываемся от состояния невалидного кеша, ну или продолжительность этого состояния ничтожно мала.
lazy_val
13.05.2025 06:49Честно - непонятно. Вот у нас какой-то сервис, который должен в реальном времени возвращать данные о свободных остатках товаров на складах. Допустим бизнес требует, чтобы в каждый момент времени пользователь получал актуальные на сейчас данные. То есть мы даже TTL в одну микросекунду не хотим ставить, потому что за эту микросекунду кто-то товар зарезервирует или наоборот отменит резерв.
Чем нам в этой ситуации Kafka поможет?
apolon13 Автор
13.05.2025 06:49В целом, паттерн из статьи не позиционируется как серебряная пуля для любых задач, связанных с синхронизацией и доступом к актуальным данным.Конечно есть задачи с более жесткими требованиями, и они требуют совершенно других подходов.
Kahelman
13.05.2025 06:49Поддерживаю предыдущего оратора.
Не понятны требования к Кешу и почему он при таких ограничениях должен быть распределенным.
Кафка это хорошо, но не дает гарантию ‘синхронности данных’: процесс А послал обновление данных, процесс Б прочитал его из Кафка. У процесса С на машине была проблема с производительностью, а потом сетевое соединение отвалилось, а потом она перезапустилась…
В итоге пользователь на машине С получил обновленную версию Кеша через 30 секунд.
Вы похоже стараетесь найти решение задачи о византийских генералах, которая не имеет решения. Вы не можете обеспечить согласованность данных в распределенной системе. Единственно решение raft - подобные протоколы, которые лишь гарантируют что данные будут согласованы в какой-то момент времени.
apolon13 Автор
13.05.2025 06:49Например передача большого количества текста по сети, между кешом и репликой, это может накладывать серьезный отпечаток на производительности.
Если мы говорим о синхронности в любой момент времени, то конечно тут возникает куча проблем. Но возможны задачи которые не имеют жесткий требований к синхронизации данных, даже задержка в 30 секунд и более для нас не критична (например это может быть какое-то длинное описание товара).
Но замечание справедливое, отмечу этот момент в тексте.
Kahelman
13.05.2025 06:49Тогда кто вам мешает инвалид позвать кеш по времени?
apolon13 Автор
13.05.2025 06:49Тогда непонятно по какому времени. Если мы возьмем условно минуту, то каждую минуту с каждой реплики мы будем ходить в хранилище за миллионами или даже десятками миллионов записей, хотя возможно этого не надо было делать, мы создаем рабочую нагрузку на ровном месте. Вполне допустимо инвалидировать по времени, если например мы давно не получали сообщения о синхронизации, получится совмещенный вариант.
Kahelman
13.05.2025 06:49Вы не будете «ходить в хранилище за миллионами записей». Только когда кеш невалидный будете перечитывать записи. Не думаю что у вас каждое 30 секунд миллионы записей запрашиваются.
Давайте ближе к теме: сколько серверов, сколько данных сколько запросов в секунду. Ну и неплохо было бы про задачу написать. А-то осуждаем сферического коня в вакууме
Gary_Ihar
13.05.2025 06:49Мне для самообразования. Вот вы указали, что централизованный кеш не подошел из-за того, что это сетевой запрос.
Но в тоже время вы заюзали "централизованную" кафку. Разве общение с ней, не есть сетевые запросы или ... ?apolon13 Автор
13.05.2025 06:49Да, это сетевые запросы, но они никак не влияют на производительность сервиса, так суть данного паттерна - не гонять супер много данных через кафку, а условно "посигналить" всем остальным что нужно обновить кеш. Для этого достаточно просто пустого сообщения, в самом простом случае, ну или передать просто пару ключей в случаях сложнее. Так же go позволяет писать конкурентный код, это так-же можно использовать в случае записи больших сообщений параллельно каких-то сетевых операций сервиса. Так же есть возможность асинхронной записи, но это уже риск потерять сообщения.
Kahelman
13.05.2025 06:49Вообще-то Кафка и придумана чтобы гонять большие объему данных. В вашем случае это скорее Amal (rabbitMQ) или даже MQTT. Если хочется чего-то распределенного - то Zero Mq вам в помощь
Frank59
13.05.2025 06:49Инвалидация кэша на воркерах по таймауту скорее всего решит все ваши проблемы без кафки и смс. Не знаю сколько у вас инстансов в онлайне, но вряд ли сотни и тысячи. Так что доп нагрузку на бд (или что вы там кэширует) от этих запросов даже не заметите
Ak-47
13.05.2025 06:49не знаете, как запускать приложения в виртуалке - не усложняйте себе жизнь, возьмите кубернетес..
jmdorian
Т.е. добавлять kafka это не "затягивать новую технологию, усложнять инфраструктуру", а redis - усложнять?
apolon13 Автор
Да, важное уточнение, не усложнять новыми компонентами на фоне уже сложившейся архитектуры, где присутствует kafka. Например в компании где работаю я, так и вышло.