О чем статья

Хочу поделиться тем, как мы используем kafka для организации оперативного хранилища справочной информации.

Архитектура системы и требования

Сейчас я работаю на достаточно высоконагруженном проекте по обработке финансовых транзакций. Проект состоит из множества сервисов. Часть из которых условно можно выделить как транзакционное online flow - которое отвечает за то, чтобы надежно и быстро обработать поступающие на вход транзакции (списать оплату и выпустить чек). 

Центральное место в данной части системы занимает Kafka - входные данные обрабатываются, обогащаются и перекладываются из topic в topic и в конце концов обработанные транзакции должны быть сохранены в хранилище master data (postgres), по которым впоследствии уже выпускается какая-то отчетность, производятся сверки и другие взаимодействия с пользователями системы. 

Есть требование к online flow, что на обработку транзакций не должна влиять временная недоступность Postgres ( т.е. допустимы какие-то инфраструктурные работы по обновлению на 2-3 часа, в ходе которых БД будет недоступно). При этом есть допущение, что Kafka высокодоступная система. Т.е. при недоступности БД допустимо, что пользователи системы могут какое-то время не получать актуальные отчеты, но финансовые транзакции по списанию денег и выпуску чеков - должны продолжать работать при доступности внешних сервисов банков и Kafka.

Справочники

В сервисах требуется вести справочники, которые могут периодически обновляться службой поддержки. Справочники представляют из себя данные или настройки, которые требуются для обработки транзакций.

В данной статье как пример возьмем маленький справочник terminal_acquirer: каждому terminal_id соответствует acquirer_id. На основе этого маппинга один из сервисов принимает решение в каком банке проводить авторизацию.

В реальности справочники - это таблицы 5-10 полей размером несколько тысяч записей, которые могут иногда обновляться и дополняться.

Kafka как хранилище для справочников

Для службы сопровождения системы вести данные справочники удобно в реляционной БД( в нашем случае в Postgres). Удобно делать выборки, обновления, есть стандартные клиенты для взаимодействия, надежность хранения, ACID.

Чтобы покрыть описанное выше требование для online flow части, что сервисы должны “не заметить” временную недоступность Postgres применим подход описанный в conluent guides по построению streaming applications с той лишь разницей, что не будем использовать Kafka Streams.

Вкратце это будет выглядеть так:

  1. Редактируем данные справочника в Postgres

  2. Kafka Jdbc Source Connector выгружает обновления в Kafka topic

  3. Внутри сервиса  помещаем компонент, который вычитывает при старте все сообщения из Kafka topic и слушает его дальше,формируя in-memory структуру, к которой обращаются другие компоненты сервиса.

Проиллюстрируем это:

иллюстрация решения
иллюстрация решения

Kafka Connector

Про Kafka Connect Framework и JDBC Sink Connector можно почитать здесь.

Kafka JDBC Source Сonnector используется для того, чтобы периодически выкачивать обновления из БД в Kafka topic.

Необходимо правильно выбрать что будет использоваться в качестве record key - в большинстве случаев он должен совпадать с unique index в БД и быть неизменяемым.

В наших сервисах используется стандартный Kafka JDBC Source Connector c mode = timestamp + incrementing. Про стандартный Jdbc Source Connector, различные режимы и настройки можно почитать здесь. При таком режиме работы нужно завести в таблице служебное timestamp поле и служебное поле id с autoincrement. 

Проиллюстрируем как действия по вставке и удалению данных в таблицу будут отображаться в Kafka Topic:

Postgres

Kafka

offset

key

value

insert row1

0

key(row1)

value(row1)

insert row2

1

key(row2)

value(row2)

update row1

2

key(row1)

value(row1)

При этом важно чтобы key(row1) offset 0 == key(row1) offset 2. При этом value(row1) offset 0 != value(row1) offset 2

При таком подходе если удалить запись из БД, то это никак не отобразиться в Kafka Topic, т.к. стандартный Jdbc Source Connector по сути делает периодический select. Для того, чтобы можно было бы как-то выключать записи, а потом и удалять, можно ввести служебное поле isActive, сделать constraint который запрещает удалять данные с isActive = true, либо вообще их не удалять. Kafka connector может слать tombstone в случае isActive = false, либо можно слать значение как есть и отдать эту логику на откуп Consumer компоненту.

Можно также использовать debezium jdbc source connector, в котором как я понимаю не нужны такие ухищрения со служебными полями, т.к. он подключается к транзакционному логу masterа Postgres. Но в таком случае нужно обеспечить подключение коннектора к актуальной master node Postgres, что тоже может быть нетривиально.

Таким образом любой сервис, который слушает Kafka topic, может собрать у себя актуальную реплику справочника. Но нужно не забывать, чтобы key(row i) не изменялся. В таком случае мы можем применить к kafka topic retention policy compacted. Т.к. нас интересует только последний record по ключу пусть kafka удаляет старые данные для одинаковых ключей в соответствии с retention policy.

Вернемся к примеру с terminal_acquirer. Скрипты для создания справочника в БД со всеми необходимыми предусловиями, описанными выше:

CREATE TABLE termainal_acquirer (
	id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
	terminal_id text NOT NULL,
	acquirer_id text NOT NULL,
	is_active bool NOT NULL DEFAULT true,
	updated_date timestamp NOT NULL DEFAULT timezone('utc'::text, CURRENT_TIMESTAMP),
	CONSTRAINT terminal_acquirer_pkey PRIMARY KEY (id)
);

CREATE UNIQUE INDEX terminal_acquirer_unique_idx ON termainal_acquirer USING btree (terminal_id) WHERE (is_active IS TRUE);

-- Триггер, который предотвращает удаление из таблицы
create trigger trigger_terminal_acquirer_delete before
delete  on  termainal_acquirer for each row execute function fun_terminal_acquirer_delete();

– Триггер для обновления updated_date
create trigger trigger_terminal_acquirer_update before
update on  termainal_acquirer for each row execute function fun_terminal_acquirer_update();

–---Функция для обновления даты 
CREATE OR REPLACE FUNCTION fun_terminal_acquirer_update()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
            begin
              if new.terminal_id <> old.terminal_id then
                raise exception 'terminal_id cannot be changed';
              end if;
              new.updated_date = current_timestamp at time zone 'utc';
              return new;
            end;
            $function$
;
–---Функция для предупреждения удаления
CREATE OR REPLACE FUNCTION fun_terminal_acquirer_delete()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
            begin
              if old.is_active <> false then
                raise exception 'Active settings (is_active = true) can not be deleted. Make it inactive firstly (is_active = false)';
              end if;
              return old;
            end;
      $function$;

Конфигурация Kafka Connect Task, которая выгружает обновления из таблицы БД и складывает их в Kafka Topic в формате формате key = terminal_id и value = json(row):

{
 "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
 "connection.url":"${tpp:reference-data.db.url}",
 "connection.user": "${tpp:reference-data.db.user}",
 "connection.password": "${tpp:reference-data.db.password}",
 "dialect.name" : "PostgreSqlDatabaseDialect",


 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "table.whitelist": "terminal_acquirer",
 "topic.prefix": "${tpp:kafka.topic.prefix}",


 "mode": "timestamp+incrementing",
 "incrementing.column.name": "id",
 "timestamp.column.name": "updated_date",


 "transforms": "changeCase,copyId,extractId,updatedDateFormat",


 "transforms.changeCase.type" : "com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value",
 "transforms.changeCase.from" : "LOWER_UNDERSCORE",
 "transforms.changeCase.to" : "LOWER_CAMEL",


 "transforms.copyId.type": "org.apache.kafka.connect.transforms.ValueToKey",
 "transforms.copyId.fields": "id",


 "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
 "transforms.extractId.field": "id",


"transforms.updatedDateFormat.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.updatedDateFormat.format": "yyyy-MM-dd HH:mm:ss.SSS",
"transforms.updatedDateFormat.target.type": "string",
"transforms.updatedDateFormat.field": "updatedDate"
}

Проиллюстрируем соответствии событий в БД и Kafka:

Postgres

Kafka

offset

key

value

INSERT INTO termainal_acquirer (terminal_id, acquirer_id) VALUES('1', '1');

0

1

{

“id” : “1”,

“terminalId”:”1”,

“acquirerId”:”1”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:11.000”

}

INSERT INTO termainal_acquirer (terminal_id, acquirer_id) VALUES('2', '2');

1

2

{

“id” : “2”,

“terminalId”:”2”,

“acquirerId”:”2”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:12.000”

}

UPDATE termainal_acquirer SET acquirer_id='3' WHERE terminal_id = '1'

2

1

{

“id” : “1”,

“terminalId”:”1”,

“acquirerId”:”3”,

“isActive”:true,

“updateDate”:”2024-01-01 11:11:13.000”

}

Kafka Store Component

В Kafka Streams framework есть концепция Store, где Kafka topic выступает в качестве персистентного хранилища. В Kafka Streams есть разные реализации для работы с topic как хранилищем. Различают Global Store - хранилище, которое содержит все данные из всех партиций топика и простой State Store, который партиционирован, т.е. экземпляр приложения содержит данные в хранилище только из партиций, назначенных данному экземпляру клиента. В свою очередь есть различные реализации хранилищ по типу хранения данных, например это может быть In-Memory хранилище или хранилище, использующие какие-то embedded DB, например Rocks DB.

В наших сервисах, так исторически сложилось, для обработки сообщений используется реактивный клиент. В целом Kafka Streams накладывает определенные ограничения на параллелизацию обработки сообщений ( количество параллельных тасок зависит от количества партиций выделенных на топике и не может быть больше). Т.к. при обработке сообщений из Kafka topic нам требуется выполнять вызовы внешних сервисов, запросы к Cassandra, то реактивный подход обработки сообщений из топика батчами позволяет более эффективно использовать ресурсы.

Концепция с In-memory Global Store кажется очень подходит для эффективной и удобной работы со справочниками в наших сервисах. У нас есть некий Kafka topic, который хранит в себе выгрузку справочных данных из Postgres.

Реализуем данную концепцию без Kafka Streams. При старте приложения мы должны вычитать все сообщения, которые есть в топике-справочнике на момент старта, сформировав некую In-Memory структуру на стороне сервиса( это может быть ConcurrentHashMap). После этого, этого мы можем начать собственно обработку сообщений из Kafka, при этом должны продолжать слушать топик-справочник и обновлять нашу In-Memory структуру, чтобы поддерживать актуальной реплику справочника на стороне сервиса.

Приведу реализацию базового абстрактного класса с метриками, который вычитывает весь топик при старте приложения и затем слушает новые сообщения с абстрактным методом process, реализация которого должна отвечать за формирования справочника:

abstract class KafkaGlobalStore(
    private val kafkaStoreProperties: KafkaProperties,
    private val topicSource: List<String>,
    private val metricSchedulePeriod: Duration = Duration.ofSeconds(5)
) {

    private companion object : Log()

    private val properties = Properties().apply {
        setProperty("bootstrap.servers", kafkaStoreProperties.bootstrapServers)
        setProperty("auto.offset.reset", "none")
        setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
        setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")

        if (kafkaStoreProperties.sslEnabled) {
            setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
            setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaStoreProperties.truststorePath)
            setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaStoreProperties.truststorePassword)
            setProperty(
                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
                kafkaStoreProperties.sslEndpointIdentificationAlgorithm
            )
        }
    }

    // KafkaConsumer is not safe for multithreaded access
    private val consumer = KafkaConsumer<ByteArray?, ByteArray>(properties)
    private val metricConsumer = KafkaConsumer<ByteArray?, ByteArray>(properties)

    private lateinit var topicPartitions: List<TopicPartition>
    private val positions = ConcurrentHashMap<TopicPartition, Long>()
    private val errors = ConcurrentHashMap<Long, StoreError>()

    protected abstract fun process(record: ConsumerRecord<ByteArray?, ByteArray>)

    @PostConstruct
    fun init() {
        StoreMetrics.startInitTime(topicSource.toString())
        topicPartitions = topicSource.flatMap { topic ->
            consumer.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
        }
        val highWatermarkBarrier = CountDownLatch(1)

        Thread {
            val consumer = KafkaConsumer<ByteArray?, ByteArray>(properties)
            consumer.assign(topicPartitions)
            consumer.seekToBeginning(topicPartitions)

            val highWatermarks = consumer.endOffsets(topicPartitions)

            while (true) {
                if (highWatermarks.isNotEmpty()) {
                    topicPartitions.forEach { partition ->
                        highWatermarks[partition]
                            ?.also { hwm ->
                                if (hwm <= consumer.position(partition)) {
                                    highWatermarks.remove(partition)
                                }
                            }
                    }
                    if (highWatermarks.isEmpty()) {
                        highWatermarkBarrier.countDown()
                    }
                }

                consumer.poll(Duration.ofMillis(kafkaStoreProperties.consumer.maxPollInterval.toLong()))
                    .also { batch ->
                        batch.partitions()
                            .forEach { topicPartition ->
                                batch.records(topicPartition).forEach { record ->
                                    kotlin.runCatching { process(record) }
                                        .onFailure { ex ->
                                            log.error(
                                                "Error processing record in store: key: ${record.key()} offset: ${record.offset()}",
                                                ex
                                            )
                                            errors[record.offset()] = ex.toError(record)
                                            StoreMetrics.updateMetric(StoreMetrics.Error, Tags.empty(), errors.size.toLong())
                                        }
                                }
                                val offset = consumer.position(topicPartition)
                                positions[topicPartition] = offset
                                StoreMetrics.updateMetric(StoreMetrics.Offset, topicPartition, offset)
                            }
                    }
            }
        }.start()
        Timer("store-metric-thread", true).schedule(0L, metricSchedulePeriod.toMillis()) {
            metricConsumer.endOffsets(topicPartitions).forEach { (topicPartition, value) ->
                StoreMetrics.updateMetric(StoreMetrics.Watermark, topicPartition, value)
            }
        }

        highWatermarkBarrier.await()
        StoreMetrics.finishInitTime(topicSource.toString())
    }

    fun endOffsets(): Map<TopicPartition, Long> = consumer.endOffsets(topicPartitions)
    fun positions(): Map<TopicPartition, Long> = positions.toMap()
    fun errors(): Map<Long, StoreError> = errors.toMap()

    private fun Throwable.toError(record: ConsumerRecord<ByteArray?, ByteArray>) =
        StoreError(record.offset(), record.key(), record.topic(), record.partition(), record.timestamp(), message, stackTraceToString())
}

Продолжая наш примера с terminal_acquirer приведем пример реализации этого справочника:

class ReferenceDataStore(
   kafkaProperties: KafkaProperties,
   topicSource: List<String>,
   val objectMapper: ObjectMapper
) : KafkaGlobalStore(kafkaProperties, topicSource) {


   private val terminalAcquirerMap = ConcurrentHashMap<String,String>()
   override fun process(record: ConsumerRecord<ByteArray?, ByteArray>) {
       val referenceData = objectMapper.readReferenceData(record.value())
       if (referenceData.isActive)
           terminalAcquirerMap[referenceData.terminalId] = referenceData.acquirerId
       else
           terminalAcquirerMap.remove(referenceData.terminalId)
   }
  
   fun getAcquirerId(terminalId: String): String? = terminalAcquirerMap[terminalId]
}

При такой реализации наш сервис содержит в себе полную актуальную реплику нашего справочника из БД terminal_acquirer в виде ConcurrentHashMap. Данная реализация позволяет эффективно работать со справочником, т.к. обращение происходит к обновляемой In-Memory структуре.

Потребляемые ресурсы и время загрузки

При таком подходе нужно аккуратно оценивать размер In-Memory структуры и ресурсы, необходимые для работы приложения с учетом особенностей работы с памятью JVM. Также необходимо оценивать время, которое необходимо на старте приложению, для того, чтобы вычитать, десериализовать и сложить в In-Memory структуру все данные из топика-справочника.

На данный момент все наши справочники небольшие и время их загрузки и потребляемые ресурсы не требуют серьезной калибровки настроек сервиса. Но необходимо учитывать и тестировать приложение на случай, если количество этих данных может кратно возрасти. 

Если говорить о приведенном примере со справочником terminal_acquirer, то я провел тест с наполнением справочника 50 000 значениями. Сервис был успешно запущен с параметрами: 

limits:
   cpu: '2'
   memory: 2000Mi
 requests:
   cpu: '2'
   memory: 1000Mi

Время инициализации хранилища на старте( вычитка всех 50 000 значений в in memory) ~ 2510ms, что не так критично для spring boot service. В среднем в наших сервисах данных в справочниках сейчас на порядок меньше, но и при таком порядке время загрузки и потребляемые ресурсы выглядят для нас удовлетворительно.

Что делать, если все же появятся большие справочники

Сейчас наши справочники достаточно небольшие и в обозримом будущем не предполагается их увеличение на порядок. Поэтому подхода с In-Memory Global Store достаточно. Но что делать, если все же появятся кратно большие справочники?

Есть 2 проблемы в таком случае: 

  1. In-Memory хранилища недостаточно

  2. Вычитка и обработка всего топика занимает слишком много времени

В таком случае можно также подсмотреть подходы, которые используются в Kafka Streams. Например использовать вместо In-Memory структуры embedded DB. В Kafka Streams используется Rocks DB - быстрая key-value embedded DB. Использование Rocks DB позволит снизить потребления оперативной памяти, что решит проблему 1). Проблему 2) можно решить за счет подключения persistent volume, что позволит не вычитывать заново весь топик при рестарте сервиса,а читать только новые данные.

Заключение

Приведенный подход позволяет использовать в своих сервисах Kafka Topic как хранилище данных, не затягивая Kafka Streams Framework в стек, если по каким-то причинам этого делать не хочется. К тому же Kafka Streams имеет ряд ограничений, в том числе если нужна какая-то кастомизация хранилищ, то там на мой взгляд это делать неудобно. В данное реализации есть возможность более гибко организовывать хранилища.

Комментарии (42)


  1. RodionGork
    20.10.2024 19:14

    Ну... центральное место наверное скорее - хранилище :) а то я уж по заголовку подумал что в самой кафке хранить научились, типа как оперативная память была на звуковых волнах.

    А если бы не кафка была, все поломалось бы? :)


    1. typik89 Автор
      20.10.2024 19:14

      я не совсем вас понял. Kafka вполне себе выступает как хранилище, в топике хранится реплика данных из postgres.

      А если бы не кафка была, все поломалось бы? :) - тоже не понял что имеется ввиду, что значит была или не была, у нас уже используется кафка, почему и как - это отдельная наверное дискуссия должна быть, кафка как сказано в начале является центральной высокодоступной системой и если она не работает, то да, все не работает - такое допущение, при этом если не работает postgres - часть системы должна функционировать


      1. Kuch
        20.10.2024 19:14

        Я тоже разделяю смятение, которое выразил пользователь выше. Тоже по заголовку подумал, что вы в кафке данные научились хранить. Но по сути у вас есть сервис, который с помощью Кафки доставляет данные, но никак не хранит. А хранит он в оперативной памяти. И тогда возникает вопрос, что делать если сервис упал- поднялся и в оперативной памяти данных нет, а в кафке топики уже прочитаны?


        1. typik89 Автор
          20.10.2024 19:14

          а я не понимаю вашего смятения:)

          данные именно что хранятся в 1) postgres 2) в compacted топике кафка 3) в оперативной памяти сервиса. Не знаю возможно и можно придумать название удачней, но я вас точно не обманул, данные вполне себе хранятся в топике кафка и кафка вполне себе выступает хранилищем из которого данные вытягиваются в приложение.

          если сервис упал-поднялся-отжался - то все данные при инициализации сервиса будут загружены с самого начала. Посмотрите внимательней код KafkaGlobalStore, также я отдельно расписал про проблемы, что нужно тестировать и оценивать размер этих справочников, чтобы для вас не было дорого как хранение данных в оперативной памяти так и их загрузка каждый раз туда при старте сервиса. Посмотрите разделы "Потребляемые ресурсы и время загрузки" и "Что делать, если появятся большие справочники". При каждом старте приложения топик будет вычитываться сначала, также я ответил на комментарий ниже про горизонтальное масштабирования, смысл тот же - для каждого инстанса при каждом старте топик будет вычитываться заново до старта основной логики в идеале ( тут уж spring в помощь)


          1. Kuch
            20.10.2024 19:14

            Но ведь у Кафка топиков есть ttl, не все данные можно будет загрузить сначала


            1. typik89 Автор
              20.10.2024 19:14

              у кафка топик есть разные retention policy и мы используем compacted для таких топиков. Тоже упомянул про это в статье со ссылкой:

              Таким образом любой сервис, который слушает Kafka topic, может собрать у себя актуальную реплику справочника. Но нужно не забывать, чтобы key(row i) не изменялся. В таком случае мы можем применить к kafka topic retention policy compacted. Т.к. нас интересует только последний record по ключу пусть kafka удаляет старые данные для одинаковых ключей в соответствии с retention policy.


              1. Kuch
                20.10.2024 19:14

                Но зачем другим сервисам собирать реплику, если это делает ваш сервис и представляет об этом данные?

                Но и опять таки, вопрос то был про ttl. То есть часть данных просто будет недоступна и в итоге невозможно будет восстановить реплику полностью. Плюс размер топика.

                В общем, я понимаю, что это все возможно сделать, пару лет назад я сам делал систему частичной репликации постгри через кафку, но одновременно с этим я помню насколько это мучительно , потому что есть инструменты более подходящие


                1. typik89 Автор
                  20.10.2024 19:14

                  к сожалению не понял в такой формулировке вашего вопроса, но попробую додумать и ответить на примере, который приведен

                  Есть справочник terminal_acquirer. Служба поддержки вводит 100 новых терминалов, первые 50 должны идти в Сбер, вторые в Альфа. Далее у нас есть сервис роутер, в которые приходит транзакция в которой есть terminal_id. С помощью реплики справочник в сервисе мы определяем куда дальше отправлять сообщение или делать запрос.

                  По ttl тоже непонятно, я вам вроде ответил, что для cleanup.policy=compact нет ttl - перейдите по ссылке.

                  Про репликацию постгри с вами подискутировать не могу, у меня не было такой задачи и не знаю какая именно задача была у вас и что вы имеете ввиду под более подходящими инструментами. Не сомневаюсь, что есть альтернативные решения для каждой задачи, у каждого есть свои плюсы и минусы, и не всегда есть возможность говорить про то, что какое-то однозначно лучше другого, вопрос восприятия, я лишь привел свое, основанное на подходах Kafka Streams.


                  1. Kuch
                    20.10.2024 19:14

                    Значит мы друг друга не сможем понять. Спасибо!


                  1. ris58h
                    20.10.2024 19:14

                    По ttl тоже непонятно, я вам вроде ответил, что для cleanup.policy=compact нет ttl - перейдите по ссылке.

                    Ну вы то как раз и не ответили (до этого момента), а в статье по ссылке нет ни слова про ttl (буквально).


                    1. typik89 Автор
                      20.10.2024 19:14

                      Как же не ответил, если написал про retention policy compacted и привёл ссылку и в самой статье и в комментарии, мне показалось что ответил, возможно не подробно, но по ссылке все нюансы описаны


                      1. ris58h
                        20.10.2024 19:14

                        Посылать читать статью вместо простого ответа на простой вопрос - это не ответил. В статье нет ни слова про ttl - мне погуглить проще было.


                      1. typik89 Автор
                        20.10.2024 19:14

                        Коллега, я ни в коим случае никуда не посылаю, ваше право считать мои ответы неправильными. Я искренне хотел понять и ответить на вопросы предыдущего комментатора. Мне показалось, что я вполне себе ответил, дав ссылку на официальную доку, где можно почитать про разные clenup policy и в скользь упомянул про режиме compaction в статье, возможно не очень удачно, но я и не являюсь профессионалом в этом деле, я всего лишь любитель.

                        На мой взгляд при желании перейдя по ссылке можно ознакомиться со всеми нюансами, т.к. у человека видно, что нет пока необходимых знаний по этому вопросу, а вот то, что вы называете простым ответом повлекло ,бы за собой еще 10 простых вопросов, исхожу из того, что и в комментарии и в статье это объяснено так, что вы не поняли. Зачем я буду отвечать про ttl, если как вы правильно заметили такая терминология не используется в доках конфигурации - потом кто-то предъявит за ttl, что ввожу в заблуждение. Если есть действительно желание вникнуть в этот момент, то лучше идти к первоисточнику. Моей целью не было осветить все нюансы работы с Кафкой.


  1. AlexCzech01
    20.10.2024 19:14

    Интересное решение, только не проще ли было бы проблему временной недоступности Postgre решать репликацией той части Postgre, которая должна быть всегда доступна. Всё-таки хранилище в Кафке - оно немного для других целей сделано


    1. typik89 Автор
      20.10.2024 19:14

      специфика работы Postgres в компании, что Postgres предоставляется команде как сервис и служба поддержки проекта не имеет доступа, такие требования были объявлены на старте проекта. Не исключаю, что можно сделать элегантней и проще, но из тех вводных, что были, показалось, что данное решение тоже достаточно простое и эффективное, тем более что в confluent guides вполне себе описаны схожие на мой взгляд кейсы.

      В целом данная статья написана еще и из интереса того, как этот "велосипед" выглядит для не замыленного взгляда, так что не исключаю :)

      Из преимуществ решения для меня это то, что я имею обновляемый кэш и в тоже время полную реплику справочника в топике.


      1. AlexCzech01
        20.10.2024 19:14

        Я не знаю, что там описано в Confluent Guides (не читал или читал настолько давно, что забыл про это), но Кафку можно изящным движением руки превратить в key value storage. А вот в хранилище, которое можно сканировать по диапазону ключей или вообще без фильтра, её лёгким движением руки не превратить - оно будет тупить

        Не совсем понятно, что те, кто ставил вам такие требования, предлагал делать в случае, когда данные в момент недоступности надо не прочитать из базы, а записать в неё? Записывать на бумажку, чтобы потом через пару часов в базу перенести? Значение в справочнике может же быть и неправильным

        Но в целом решение вполне рабочее, пока справочники - это что-то редко изменяемое


        1. typik89 Автор
          20.10.2024 19:14

          При желании при таком подходе можете реализовать такую структуру, которая будет индексировать по разному, насколько хватит вашей фантазии, но это да нетривиально. По умолчанию да согласен, Kafka это key value store и в kafka streams как раз таки на это упор, все реализации хранилищ строятся вокруг того, что это key-value store и даже в Rocks DB Java Api я не нашел ничего, кроме как put(key) get(key) remove(key), хотя в c++ api там есть и про диапазон тоже. Не каждое решение универсально, с этим спорить не буду. У нас кстати есть пару справочников, которые предоставляют api поиска по ключу и активным датам и по диапазону ключей и по префиксу, конечно же там не такая простая ConcurrentHashMap.

          По требования возможно дал не так много вводных. Мне они кажутся вполне себе логичными. У нас есть Postgres предоставляемый как сервис. Сказано, что оооооочень редко может быть такое, что ночью нужно сделать какие-то работы, которые приведут к тому, что сервис будет лежать 2-3 часа. По нашему SLA мы не можем обрабатывать поступающую транзакцию более 10 мин ( т.е. нам несмотря на лежащую БД нужно обрабатывать транзакции).

          "Редко изменяемые" довольно неточный термин, не знаю "редко у нас изменяемые справочники или нет" в вашей классификации, но как это выглядит: условно бизнес заключает договор с несколькими компаниями и несколькими банками, далее он передает службе поддержки, что к такому то числу нужно уметь обрабатывать поступающие транзакции, служба поддержки готовит справочные данные к данной дате, обычно проводится какое-то боевое тестирование в проде на столе. Таких запросов от бизнеса может быть несколько в неделю, что выливается в изменение/добавление/удаление сотен записей в нескольких справочниках. Может быть и больше в зависимости от нагрузки от бизнеса

          Данный подход вполне себе покрывает все данные требования, изменения в справочниках быстро доставляются до сервисов с помощью представленного механизма. В тех исключительных \ситуациях, когда постгрес лежит ночью справочники понятное дело никто и не думает заполнять, все эти работы проводятся днем, в ночное время работают только дежурные.


          1. AlexCzech01
            20.10.2024 19:14

            По умолчанию Кафка - это журнал сообщений, и я знаю довольно много людей, которые использовать её даже в роли key value storage считают извращением. И я их в чём-то понимаю - это уже где-то довольно близко к забиванию гвоздей микроскопами, она оптимизирована совсем не под это - хотя да, может. Почему Кафка не предоставляет базу с произвольным доступом - потому что там это key-value store устроено как таблица ключей со ссылками на оффсет в логе (ну либо как честный key value store на консумере из очереди как второй вариант - типа вашего решения, но не в памяти). Любой скан не по ключу - это скан этой таблицы с лукапами по всему журналу вразброс, это просто не может работать быстро, хоть убейся, by design

            Ещё раз - я не критикую ваше решение и искренне готов вас похвалить (абсолютно безо всякой снисходительности и иронии) за креативность. Мне бы такой вариант наверное в голову не пришёл, я бы сделал что-то более унылое и требующее дополнительного софта. То есть то, что ваше решение мне кажется странным - это скорее моя проблема, чем недостаток решения

            Если справочник будет часто изменяемым (например, в справочнике будет волшебное поле "последняя цена", которое меняется после каждой транзакции, видал я такие справочники) - в Кафке постоянно будет что-нибудь новенькое и вся эта схема начнёт выдавать интересные спецэффекты, я это имел в виду. Но пока такого нет - всё должно быть хорошо


            1. typik89 Автор
              20.10.2024 19:14

              спасибо за оценку!

              "По умолчанию Кафка - это журнал сообщений, и я знаю довольно много людей, которые использовать её даже в роли key value storage считают извращением. " - кажется что ваши знакомые не пользуются всеми возможностями кафки, т.к. confluent в целом пропагандирует другое и много докладов в том числе про это, даже есть такая ksqlDB - где DB прям в названии, хотя конечно это не совсем про DB :)

              Я с вами полностью согласен, что представленное мной решение очень специфической для кейса с множеством предусловий и оговорок. В целом такая идея возникла после изучения примеров на сайте confluent по построению kafka stream application и прочтения Kafka Streams In Action. Даже думал начать использовать kafka streams. Но во первых понял, что у нас уже 10ки сервисов написаны с использованием реактивного клиента, которые просто так не перепишешь и кажется, что следуя рекомендациям этих гайдов эти сервисы надо немного по другому организовывать и разбивать на сервисы. Также мне на самом деле не очень понравилось то, как написан код как раз таки с этими Store, из-за иерархии наследования, которая была там, очень сложно кастомизировать так, как мне хотелось. Поэтому пока отложил этот и решил для начала взять оттуда принцип для одного специфичного кейса со справочниками. Т.е. сам kafka streams framework на первый взгляд субъективно не понравился, но с точки зрения описания принципов построения архитектуры приложения на базе Kafka - очень вдохновило, так что сейчас от этого активно использую kafka connect и вот представленное решение.

              Я уже упомянул в другом комментарии, что изначально на этапе mvp эти справочники вообще велись в Redis. Но Redis это:

              1) это дополнительная точка отказа ( сам кластер надо содержать и поддержать, у нас кстати на проде пару раз была потеря кластера)

              2) это дополнительное межсетевое взаимодействие, на которое тратится время при обработке транзакции

              3) неудобство ведения ( можно конечно пофиксить это с помощью такой же репликации из postgres через кафку)

              А что касается справочников, хотя все таки это наверно не совсем справочники, где что-то динамично меняется, в зависимости от обработанных транзакций например - на этот счет можно также найти примеры в вышеупомянутых ресурсах на confluent, конкретно представленное решение в таком виде не подходит, согласен.


              1. AlexCzech01
                20.10.2024 19:14

                Они знают про ksql DB (мы используем ksql DB в production), отчасти потому и не в восторге. Хотя я лично скорее одобряю. Но там конечно есть свои подводные камни, в первую очередь про то, что оно не SQL, как бы ни притворялся, и самое больное место - как всю эту машинерию надёжно и безболезенно деплоить


  1. mbolsh
    20.10.2024 19:14

    А как решался вопрос при горизонтальном масштабировании сервиса, в память которого вычитывался справочник? т е если доп инстанс сервиса поднимется, как он сможет получить справочник в памяти?


    1. typik89 Автор
      20.10.2024 19:14

      точно также, любой инстанс будет работать одинаков - загружать всю реплику из топика полностью. В пункте "Kafka Store Component" я об этом написал, что есть в kafka streams партицированные хранилища, есть Global Store, вот нам пока достаточно концепции Global Store - когда мы полностью всегда при старте приложения вычитываем все данные топика из всех партиций, а затем уже приложение начинает делать свою остальную работу используя справочник, при этом справочник продолжает обновляться.

      Т.е. если мы запустим 10 или 20 инстансов, все 10 или 20 инстансов каждый вычитают весь топик и будут содержать в себе полные реплики справочников. Собственно про это код KafkaGlobalStore.Т.к. это Global Store можете масштабировать сколько угодно без специальных настроек и оглядки на количество партиций в топике-справочнике, все зависит только от остальной логики вашего приложения. Данная реализация никак вас не ограничивает в масштабировании.


  1. mbolsh
    20.10.2024 19:14

    в проекте не используется распределенный кэш(редиса например)? кажется, что подобная задача с распределенным кэшем решается более естественно, при этом пропадает необходимость хранения снэпшотов справочников(их ведь может быть много) в памяти каждого экземпляра сервиса.


    1. typik89 Автор
      20.10.2024 19:14

      о, да был у нас и даже еще где-то остался кластер редис:)

      конечно есть разные подходы и решения, но от redis как раз в какой-то момент отказались

      Давайте порассуждаем, уточните, что вы имеете ввиду под распределенным?

      Вы имеете ввиду, то, что в каждом инстансе мы запускаем внутри ноду редиса и все их объединяем в кластер? или вы про централизованный кластер редиса?

      Данные в редис вести неудобно - это опробовано, служба поддержи писали скрипты на питоне для загрузки и обновления 1000 терминалов, хотят писать sql запросы, т.е. их все равно нужно как-то туда реплицировать из реляционной БД.

      Реализация хранилищ ограничена структурами редиса - с этим тоже сталкивались, не хватало.

      Представленное решение для работы со справочниками не использует никакого межсетевого взаимодействия, кроме загрузки данных на старте и фонового обновления, что тоже не так плохо. Оба варианта развертывания редис все равно предполагают межсетевое взаимодействие и влияют на скорость обработки.

      Опять же не претендую на то, что это самое оптимальное и правильное решение. Возможно я не дал всех вводных, возможно есть решения лучше. Но конкретно с Redis пока не вижу как это можно сделать проще и эффективней, не исключаю что можно, но сейчас мы таки отказались от Redis.


  1. ris58h
    20.10.2024 19:14

    Справочники существовали и до этой конструкции с Кафкой? Если да, то как происходило изначальное наполнение Кафки данными из справочников?


    1. typik89 Автор
      20.10.2024 19:14

      Были справочники. Изначально были в редис кластере. В какой-то момент стало неудобно их там вести. Поняли что в этом случае кроме поддержки высокодоступной кафки, нужно еще и поддерживать высокодоступный кластер редис. После нескольких падений кластера редис всем надоел.


      1. ris58h
        20.10.2024 19:14

        Что по второму вопросу?

        Если да, то как происходило изначальное наполнение Кафки данными из справочников?


        1. typik89 Автор
          20.10.2024 19:14

          буквально вручную на этапе mvp службой поддержки


  1. ris58h
    20.10.2024 19:14

    Ещё вопрос, связанный с предыдущим. Как я понял, данные в Кафке у вас хранятся в памяти. Что будет, если кластер Кафки полностью перезапустится по некоторой причине?


    1. Kuch
      20.10.2024 19:14

      Как я понял из ответов выше при бутстрапе сервиса происходит чтение всех топиков Кафки и последовательное воссоздание хранилища заново в оперативную память.


      1. typik89 Автор
        20.10.2024 19:14

        да все правильно вы поняли, только не всех топиков, а тех топиков-справочников, которые используются в конкретном сервисе


    1. typik89 Автор
      20.10.2024 19:14

      "данные в кафке хранятся в памяти" - не понял утверждение. Возможно вы имели ввиду данные Из Кафки хранятся в оперативной памяти сервиса - тогда ДА.

      Что будет если кластер Кафки перезапуститься?

      Если Кластер Кафка не работает - то вся наша система ничего не может делать - это сказано в предусловиях, так что если понадобиться перезапуск Кластера Кафки, то мы скорей всего просто остановим все сервисы, разберемся с кластером, перезапустим его и перезапустим сервисы. Если перезапуск произойдет внештатно, то мы не сможем обрабатывать транзакции дальше до восстановления. Смогут ли сервисы восстановить работу после перезапуска кластера без перезапуска самих сервисов - надо тестировать каждый конкретный сервис и то, как там реализована работа с консюмерами.

      Но повторюсь, Кафка у нас считается высокодоступной, поэтому при таком редком маловероятном кейсе легче все выключить и включить, чем тестировать и поддерживать работу при таком кейсе. Поэтому вдруг для вас это важно, переподключение консюмера при перезапуске KafkaGlobalStore, то возможно его нужно немного доработать. Для нас такое усложнение логики не актуально.


      1. ris58h
        20.10.2024 19:14

        "данные в кафке хранятся в памяти" - не понял утверждение. Возможно вы имели ввиду данные Из Кафки хранятся в оперативной памяти сервиса - тогда ДА.

        Я про саму Кафку. Перечитал соответствующий параграф - понял что там про клиентов.


        1. typik89 Автор
          20.10.2024 19:14

          вкратце - на диске, но это наверное совсем другая история за пределами данного топика


  1. dabrahabra
    20.10.2024 19:14

    А не сравнивали сколько занимает инициализации кеша напрямую из базы, приведены метрика только для инициализации из Kafka - 2.5s


    1. typik89 Автор
      20.10.2024 19:14

      Нет не сравнивал, т.к. не особо интересно. Убежден, что такую выгрузку можно сделать быстрее, но для меня это не так важно. Мне нужно было убедиться, что на кратно большем объеме инициализация справочника выполняется за удовлетворительное время, т.е. время не сильно отличимое на фоне запуска spring boot сервиса который работает с кафкой ( в самом лучше случае это тоже занимает секунду, но иногда и больше, т.к. требуется время на то, пока консьмерам назначаться партиции, пока произойдет ребалансировка - это может занять и 10ки секунд).

      Если я вас правильно понял, вы хотели предложить подход, при котором загружать данные в кэш из БД напрямую а потом периодический его обновлять. Если так, то такое решение не подходи, т.к. если сервис начнет перезапускаться в момент, когда БД не подключена - он не сможет подняться.


  1. PrinceKorwin
    20.10.2024 19:14

    ConcurrentHashMap - при описанной архитектуре избыточен. Достаточно обычного HashMap.

    У вас же параллельный доступ к этой структуре только на чтение.

    Модификация, а точнее создание происходит изолированно в отдельном потоке.


    1. typik89 Автор
      20.10.2024 19:14

      у меня сомнение на этот счет. hash map не tread safe. любое добавление элемента в hash map может привести к перестройке внутренней структуры, таким образом не уверен, что не будет спецэффектов при чтении из других потоков. возможно вы что-то лучше меня знаете или понимаете и в кейсе когда только один поток пишет, а другие читают должно работать или я пропустил какие-то изменения в новых версиях jdk.
      В любом случае даже если так, я обычно следую принципу: если есть сомнения - используй thread safe классы, такие как Atomic*, ConcurrentHashMap и др. По-моему на каком-то очень крутом докладе на joker, где разбиралось эффекты влечет объявление volatile и какие можно не помечать volatile, в конце был дан совет - при сомнениях лучше использовать оптимизированные под это спец классы, такие как Atomic* ConcurrentHashMap и др, потому что даже зная все детали того, как компилируется и оптимизируется код в jvm, можно не туда свернуть в рассуждения и прийти к неправильному выводу. Так что если мне нужна HashMap в многопоточное приложение, то я даже если честно долго не думаю и беру CHM.
      Кажется, что даже если вы правы и я что-то упускаю и недопонимаю, вряд ли выигрыш от использования HashMap будет вообще различим в такого рода сервисах, а вот риск получить спецэффекты при работе с HashMap, когда по логам значение будет записано, а в итоге не будет вычитано, точно не хочется потом разбираться с этим.


      1. PrinceKorwin
        20.10.2024 19:14

        один поток пишет, а другие читают

        Обычно изменения не единичны и чтобы потребители видели консистентные данные делают сначала заполнение данных в структурах в памяти полностью и только потом эти структуры становятся доступны на чтение.

        Самый простой вариант через подход с двойным буфером.

        Но выбор конечной структуры зависит от типа данных, какие модификации и как часто, какие требования к консистентности и к потребляемой памяти.

        вот риск получить спецэффекты при работе с HashMap

        Само использование ConcurrentHashMap не отменяет всех возможных спец эффектов, но дает ложную надежду, что их не будет.


        1. typik89 Автор
          20.10.2024 19:14

          Я если честно вас не понимаю, особенно про 'ложную надежду' в chm. Данные именно что обновляются в рантайме и перестройка структуры может произойти в любой момент. Моё понимание java memory model и ваши обьяснения на данный момент не позволяют с вами согласиться. Повторюсь chm или hm в рамках моих сервисом будут не различимы, но предпочитаю использовать при малейшем сомнении thread safe структуру. Не важно могут ли быть спецэффекты которые я могу предположить или такие, которые я не могу. Я вот честно не хочу даже об этом думать, есть thread safe структура из стандартной библиотеки и если с ней будут спецэффекты, то я по крайней мере буду уверен, что это не из-за того, что я использовал не thread safe структуру.


          1. PrinceKorwin
            20.10.2024 19:14

            Если вам нужно изменить десятки записей в CHM, то атомарными у вас будут только "точечные" изменения. Другими словами пока вы потокобезопасно ее правите у вас потребители этой структуры читают неконсистентные данные.

            Другими словами - происходит так называемое грязное чтение.

            Одно из решений этого - использовать подход с двойным буфером. А это означает, что CHM никакой пользы не принесёт.


            1. typik89 Автор
              20.10.2024 19:14

              меня устраивает то, что вы описали, у меня нет цели, чтобы мгновенно получить точечные изменения, в целом сама доставка от момента добавления записи в постгрес, добавления в топик и вычитки занимает тоже довольно неопределенное время и мы к этому готовы, выполняем конфигурацию заранее. Зато я не думаю о том, как устроена внутрянка HM и какие эффекты могут быть в многопоточном приложении.

              я повторюсь, я не хочу думать и фантазировать об эффектах и рисковать, мне просто не нужна такая оптимизация, использовать HM или CHM для меня скорей всего при тестировании будет даже статистически неразличимым. Главная оптимизация уже сделана, за данными не ходим в редис, а обращаемся к In-Memory структуре.

              Спасибо, что поделились предложением, но меня не убедили. Ваше право так подходить к оптимизации ваших приложение, мне это кажется излишним, даже если это действительно правда.