Заранее оговорюсь, всё что описано в данной статье, касается runtime (децентрализованного) кеша.

Зачем нам такое может понадобиться? По нескольким причинам:

  • У нас высокие требования к скорости работы приложения, дополнительные запросы к централизованному кешу нежелательны, с целью избежать сетевых взаимодействий.

  • Компания не знает как готовить отказоустойчивый кеш (например redis), или просто не хочет/не может затягивать новую технологию, усложнять инфраструктуру.

  • У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время. (будьте осторожны, если у вас высокие требования к синхронизации данных, возникает множество дополнительных проблем в виде проблем с сетью/производительность реплик/etc...)

Представим типичную ситуацию в распределенной среде: имеется N реплик, на каждой из них свой независимый кеш. Например в Repository (я намеренно опустил работу с примитивами синхронизации при обновлении значений кеша, так как цель продемонстрировать суть происходящего):

type db interface {
  Find(key string) string
  Update(key string, value string)
}

type Repository struct {
  cache map[string]string
  db    db
}

func NewRepository(db db) Repository {
  return Repository{db: db, cache: make(map[string]string)}
}

func (r *Repository) Update(key string, value string) {
  r.db.Update(key, value)
  r.updateCacheValue(key, value)
}

func (r *Repository) updateCacheValue(key string, value string)  {
  r.cache[key] = value
}

func (r *Repository) UpdateCache(key string) {
  r.updateCacheValue(key, r.db.Find(key))
}

func (r *Repository) Find(key string) string {
  if val, ok := r.cache[key]; ok {
   return val
  }
  value := r.db.Find(key)
  r.updateCacheValue(key, value)
  return value
}

Это прекрасно работает, но не в распределенном приложении. Мы сталкиваемся с ситуацией, когда запрос пользователя на обновление, попал на одну случайную реплику и нам нужно уведомить всех об изменениях в данных, так как кеш локальный:

Пользователь операции SELECT может попасть на реплику, где устаревшее значение кеша
Пользователь операции SELECT может попасть на реплику, где устаревшее значение кеша

Можем использовать событийно-ориентированный подход и с помощью kafka мы легко решим эту проблему.
Для синхронизации мы можем использовать sync-topic, как это может выглядеть? А примерно так:

Напишем небольшой компонент для работы с топиком:

type KafkaSync struct {
  reader *kafka.Reader
  writer *kafka.Writer
}

func NewKafkaSync(reader *kafka.Reader, writer *kafka.Writer) KafkaSync {
  return KafkaSync{reader: reader, writer: writer}
}

func (sync KafkaSync) Sync(ctx context.Context, key string) error {
  return sync.writer.WriteMessages(ctx, kafka.Message{Value: []byte(key)})
}

func (sync KafkaSync) OnSync(ctx context.Context, cb func(key string)) error {
  err := sync.reader.SetOffsetAt(ctx, time.Now())
  if err != nil {
   return fmt.Errorf("setting offset: %w", err)
  }
  for ctx.Err() == nil {
    message, err := sync.reader.ReadMessage(ctx)
    if err != nil {
     return fmt.Errorf("read message: %w", err)
    }
    cb(string(message.value))
  }
  return nil
}

Суть компонента в том, чтобы генерировать события в момент обновления данных - метод Sync, а также уметь их отлавливать - метод OnSync.

Псевдокод, который отразит принцип синхронизации:

kafkaSync := NewKafkaSync()
repo := NewRepository()
eg, ctx := errgroup.WithContext(context.Background())

eg.Go(func() error {
  return kafkaSync.OnSync(ctx, func(key string) error {
     // Тут мы отловим новое событие синхронизации на всех репликах
     // и можем обновить кеш на каждой из них
     repo.UpdateCache(key)
  })
})

eg.Go(func() error {
  // Представим что тут сработал HTTP Update эндпойнт, на какой то реплике
  repo.Update("key1", "val1")
  return kafka.sync()
})

eg.Wait()

При любом изменении хранилища, мы генерируем событие обновления в sync-topic, которое в свою очередь получат остальные, и обновят локальный кеш.

В топике мы можем пересылать:

  • Ключ записи которую нужно обновить, но необходимо идти в хранилище за новым значением.

  • Пустое сообщение. Иногда это нужно, чтобы просто уведомить о факте обновления хранилища, например если нужно обновить множество ключей.

  • Обновленное значение и ключ. Например, когда мы хотим избежать дополнительных походов в хранилище, чтобы не создавать нагрузку. Мы можем просто переслать это значение в топике.

Нужно учитывать важный момент при работе с kafka

Если sync-topic имеет несколько партиций, то распределение в рамках consumer group может сделать данную схему неработоспособной. Решение проблемы заключается в чтении топика без consumer group, тогда каждая реплика будет уведомлена о любом изменении, в любой партиции.

Комментарии (17)


  1. jmdorian
    13.05.2025 06:49

    Т.е. добавлять kafka это не "затягивать новую технологию, усложнять инфраструктуру", а redis - усложнять?


    1. apolon13 Автор
      13.05.2025 06:49

      Да, важное уточнение, не усложнять новыми компонентами на фоне уже сложившейся архитектуры, где присутствует kafka. Например в компании где работаю я, так и вышло.


  1. lazy_val
    13.05.2025 06:49

    У нас нет четкого представления о том, как инвалидировать кеш на основе TTL (time to live), поскольку бизнес правила не позволяют жить невалидному кешу хоть какое-то время.

    Применение Kafka и Event-Driven Architecture вопрос/проблему с инвалидацией разве решают как-то?


    1. apolon13 Автор
      13.05.2025 06:49

      Скорее речь о том, что с помощью этого подхода мы совсем отказываемся от состояния невалидного кеша, ну или продолжительность этого состояния ничтожно мала.


      1. lazy_val
        13.05.2025 06:49

        Честно - непонятно. Вот у нас какой-то сервис, который должен в реальном времени возвращать данные о свободных остатках товаров на складах. Допустим бизнес требует, чтобы в каждый момент времени пользователь получал актуальные на сейчас данные. То есть мы даже TTL в одну микросекунду не хотим ставить, потому что за эту микросекунду кто-то товар зарезервирует или наоборот отменит резерв.

        Чем нам в этой ситуации Kafka поможет?


        1. apolon13 Автор
          13.05.2025 06:49

          В целом, паттерн из статьи не позиционируется как серебряная пуля для любых задач, связанных с синхронизацией и доступом к актуальным данным.Конечно есть задачи с более жесткими требованиями, и они требуют совершенно других подходов.


          1. Kahelman
            13.05.2025 06:49

            Поддерживаю предыдущего оратора.

            1. Не понятны требования к Кешу и почему он при таких ограничениях должен быть распределенным.

            2. Кафка это хорошо, но не дает гарантию ‘синхронности данных’: процесс А послал обновление данных, процесс Б прочитал его из Кафка. У процесса С на машине была проблема с производительностью, а потом сетевое соединение отвалилось, а потом она перезапустилась…

            В итоге пользователь на машине С получил обновленную версию Кеша через 30 секунд.

            Вы похоже стараетесь найти решение задачи о византийских генералах, которая не имеет решения. Вы не можете обеспечить согласованность данных в распределенной системе. Единственно решение raft - подобные протоколы, которые лишь гарантируют что данные будут согласованы в какой-то момент времени.


            1. apolon13 Автор
              13.05.2025 06:49

              1. Например передача большого количества текста по сети, между кешом и репликой, это может накладывать серьезный отпечаток на производительности.

              2. Если мы говорим о синхронности в любой момент времени, то конечно тут возникает куча проблем. Но возможны задачи которые не имеют жесткий требований к синхронизации данных, даже задержка в 30 секунд и более для нас не критична (например это может быть какое-то длинное описание товара).

              Но замечание справедливое, отмечу этот момент в тексте.


              1. Kahelman
                13.05.2025 06:49

                Тогда кто вам мешает инвалид позвать кеш по времени?


                1. apolon13 Автор
                  13.05.2025 06:49

                  Тогда непонятно по какому времени. Если мы возьмем условно минуту, то каждую минуту с каждой реплики мы будем ходить в хранилище за миллионами или даже десятками миллионов записей, хотя возможно этого не надо было делать, мы создаем рабочую нагрузку на ровном месте. Вполне допустимо инвалидировать по времени, если например мы давно не получали сообщения о синхронизации, получится совмещенный вариант.


                  1. Kahelman
                    13.05.2025 06:49

                    Вы не будете «ходить в хранилище за миллионами записей». Только когда кеш невалидный будете перечитывать записи. Не думаю что у вас каждое 30 секунд миллионы записей запрашиваются.

                    Давайте ближе к теме: сколько серверов, сколько данных сколько запросов в секунду. Ну и неплохо было бы про задачу написать. А-то осуждаем сферического коня в вакууме


  1. Gary_Ihar
    13.05.2025 06:49

    Мне для самообразования. Вот вы указали, что централизованный кеш не подошел из-за того, что это сетевой запрос.

    Но в тоже время вы заюзали "централизованную" кафку. Разве общение с ней, не есть сетевые запросы или ... ?


    1. apolon13 Автор
      13.05.2025 06:49

      Да, это сетевые запросы, но они никак не влияют на производительность сервиса, так суть данного паттерна - не гонять супер много данных через кафку, а условно "посигналить" всем остальным что нужно обновить кеш. Для этого достаточно просто пустого сообщения, в самом простом случае, ну или передать просто пару ключей в случаях сложнее. Так же go позволяет писать конкурентный код, это так-же можно использовать в случае записи больших сообщений параллельно каких-то сетевых операций сервиса. Так же есть возможность асинхронной записи, но это уже риск потерять сообщения.


      1. Gary_Ihar
        13.05.2025 06:49

        Спасибо


      1. Kahelman
        13.05.2025 06:49

        Вообще-то Кафка и придумана чтобы гонять большие объему данных. В вашем случае это скорее Amal (rabbitMQ) или даже MQTT. Если хочется чего-то распределенного - то Zero Mq вам в помощь


  1. Frank59
    13.05.2025 06:49

    Инвалидация кэша на воркерах по таймауту скорее всего решит все ваши проблемы без кафки и смс. Не знаю сколько у вас инстансов в онлайне, но вряд ли сотни и тысячи. Так что доп нагрузку на бд (или что вы там кэширует) от этих запросов даже не заметите


  1. Ak-47
    13.05.2025 06:49

    не знаете, как запускать приложения в виртуалке - не усложняйте себе жизнь, возьмите кубернетес..