Привет, Хабр! Я Борис Шляга, программист в отделе разработки кредитных продуктов для физических лиц. Мы занимаемся продуктом «Кубышка» — это небольшой заем до зарплаты на непредвиденные расходы.

С появлением новых правил регулирования кредитов и займов возникла необходимость в проверке долговой нагрузки клиентов при выдаче продукта. Теперь Кубышка не может быть одобрена клиентам с высоким ПДН.

Сервис расчета ПДН давно работал внутри компании — нам оставалось только реализовать интеграцию с ним. Расскажу, как мы это сделали, как решили проблемы «по дороге» и что в итоге получилось.

Постановка задачи

Мы живем в неидеальном мире, а системы — тем более, особенно когда ограничены правилами. В нашем проекте нельзя было менять формат взаимодействия, контракт или структуру ответных сообщений. Все приходилось решать на стороне нашего сервиса. Тем интереснее было искать элегантное решение в заданных условиях.

Работало себе приложение на Котлине и Спринге, используя стандартную реляционную БД. В приложении процесс подключения услуги работал по запросу от клиента. Все было устроено просто. Обычный HTTP-вызов, внутри которого проходила проверка документов клиента. Если запретов по клиенту не было, услуга подключалась, подготавливались документы для клиента и отправлялся ответ со ссылкой на документы.

Общая схема процесса подключения кубышки
Общая схема процесса подключения кубышки

Нюанс был в том, что интеграция с сервисом существовала только через Кафку, то есть в асинхронном режиме.

Логичный вопрос: почему не сменить канал интеграции и не переписать все на рест? К сожалению, в отведенные сроки команда, владеющая сервисом расчета ПДН, все переписать не могла. Поэтому ничего не оставалось, как использовать то, что есть.

Проблемы и их решения

Во время разработки решения мы столкнулись с рядом проблем. Расскажу, как решали каждую: 

Проблема. Топик Б с результатом проверки был общим для всех внутренних систем компании. Процесс проверки условия асинхронный, то есть мы отправляли запрос в топик А, а ответы с результатом проверки прилетали в топик Б.

Решение. Так как топик ответов Б с результатами проверки общий, его слушает много внутренних сервисов компании. Декларируем свою консьюмер-группу, чтобы получить сообщение, ожидаемое нашим сервисом. Такой подход обеспечит нам свою очередь всех сообщений. 


Проблема. Как отфильтровать сообщения, предназначенные только для нашего сервиса.

Решение. На стороне нашего сервиса перед отправкой в топик А генерируем уникальный идентификатор (request_id), который перекладывается в топик Б с результатом проверки. Проверяя request_id, мы понимаем, что сообщение для нас, и запускаем его обработку, а остальные сообщения пропускаем. 

Идеально было бы вынести это на уровень хидеров сообщения, в таком случае парсить боди сообщения не пришлось бы. Но тут мы всего лишь клиенты, поэтому используем как есть.


Проблема. Продюсер и консьюмер работают в разных потоках. Ответ по Кафке в топик Б с результатом проверки может и не прийти, поэтому не стоит навечно зависать в ожидании ответа внутри пользовательского запроса. Последующая очистка наших топиков (на стороне Кафки) происходила по настройкам брокера Кафки, была настройка по времени жизни (retention.ms) и по размеру (retention.bytes).

Решение. Мнения в команде разделились, и было несколько вариантов. Расскажу о самом простом.

Нам пришел запрос от пользователя на подключение продукта. На основе входящих данных генерируем запрос на проверку для топика А, сохраняем запись в БД с ключом request_id и отправляем в Кафку сообщение. Теперь мы знаем, какое сообщение хотим получить. Дальше можно делать другую работу, например запускать другие проверки. 

Наступает момент, когда нам нужно получить ответ. Тогда мы запускаем «бесконечный» цикл, в котором бегаем в БД с нашим request_id и проверяем, есть ли там ответ. Если есть, используем его по назначению. 

Вернемся к «бесконечному» циклу. Я взял в кавычки слово потому, что мы должны ожидать ответа с определенным таймаутом — согласно проблемам, которые мы решаем. Ну и как же появляется ответ? Наш консьюмер, получая в топике Б ответ с результатом проверки, проверял, нет ли записи с таким request_id в БД. Если есть, сохранял ответ в БД.

Общая схема взаимодействия в системе с БД
Общая схема взаимодействия в системе с БД

Проблема. Конечно же, у нас все модно и молодежно — микросервисы. Наше приложение имело больше одного инстанса, HTTP-вызов от клиента приходил на один из них, и отвечать нужно было там же. 

Решение проблемы микросервисной архитектуры покрывается решением с БД. Любой инстанс нашего приложения, прочитав сообщение из топика Б с результатом проверки, сохранял результат в БД. 

Приложение микросервисное, но БД у всех инстансов одна — она и была точкой синхронизации. Подобное решение позволило избегать сложных межпотоковых взаимодействий, прекрасно распределяло нагрузку по отправке и чтению сообщений из Кафки, так как консьюмеры распределялись по всем инстансам. А еще выбранное решение обеспечило долговременное хранение и использование результата, при этом обладая хорошей отказоустойчивостью, так как данные хранятся на диске. 

Но есть и минусы — это взаимодействие с БД. Появился постоянный опрос БД о результате, и нужно было решить, как ходить за результатом в БД. Например, N-е количество раз с фиксированной задержкой, чтобы соблюдать общий таймаут на проверку. 

Имплементируя подобное решение, требуется правильно рассчитать нагрузку, а если рассчитать тяжело, нужно тестировать.

Мы с командой решили, что не хотим нагружать БД и будем делать все в памяти.

В нашем случае это подходило из-за специфического флоу: данные нужны только во время запроса клиента. Перезагрузка наших инстансов влечет очистку очередей в памяти, но нам они и не нужны, ведь запрос клиента уже закончен и долго висеть не может. Если требуется какое то долговременное хранение, значит, это плюс в копилочку решения с БД.

Реализация итогового решения

Итоговое решение получилось таким: клиент отправляет HTTP-запрос на наш сервис, сервис кладет идентификатор запроса в ResultContainer и отправляет сообщение в топик А. 

Топик А слушает внешний обработчик, отправляя в топик Б ответ (с тем же идентификатором из запроса — request_id) с результатом проверки. Наш сервис ожидает решения, слушая топик Б с результатом проверки, и складывает ответы в ResultContainer по идентификатору из ответа. Он должен соответствовать идентификатору из запроса. 

Наш сервис, дождавшись ответа в ResultContiner, заканчивает обработку запроса клиента. 

Общая схема взаимодействия в системе без БД
Общая схема взаимодействия в системе без БД

На словах все очень просто. Разберем процесс обработки запроса клиента в нашем сервисе подробнее.

@Service
class ClientService(private val pdnCheckService: PDNCheckService) {
    fun processRequest(clientRequest: ClientRequest) {
 
        // some checks before
 
        //run pdn checking by kafka
        val requestId = pdnCheckService.checkClient(clientRequest.clientId)
 
        //do some another actions
 
        //check pdn check result
        val pdnCheckResponse = pdnCheckService.getResult(requestId)
        // verify additional check result
        pdnCheckResponse.verify(clientRequest)
    }
}

Получаем запрос, делаем какие-то действия и отправляем сообщение для проверки ПДН клиента. Затем можем совершать любые другие действия. И в конце пытаемся проверить результат проверки pdnCheckService.getResult(requestId). 

Внутри pdnCheckService для проверки клиента (метод checkClient) мы генерируем requestи сразу сохраняем его в ResultContainer для последующего чтения результата проверки. Отправляем сообщение в топик А.

Метод возвращает requestId, который требуется для получения результата. Получение результата происходит через запрос ResultContainer по requestId, который вернет Future с результатом. Просто делаем future.get(), не забывая про таймаут! 

Если результата нет, кидаем ошибку, чтобы быстрее ответить клиенту и он смог попробовать еще раз.

const val TIMEOUT_SEC: Long = 10
 
@Service
class PDNCheckService(
    private val kafkaTemplate: KafkaTemplate,
    private val resultContainer: ResultContainer
) {
 
    fun checkClient(clientId: String): String {
        val checkRequest = clientId.toRequestDto()
        resultContainer.add(checkRequest.requestId)
        kafkaTemplate.sendDefault(checkRequest)
        return checkRequest.requestId
    }
 
    fun getResult(requestId: String): PDNCheckResponse {
        val responseFuture = resultContainer.get(requestId)
        return if (responseFuture != null) {
            responseFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS)
        } else {
            throw IllegalStateException("No result for request $requestId")
        }
    }
}

Вот так выглядит ResultContainer:

@Component
class ResultContainer {
    private val results = ConcurrentHashMap<String, CompletableFuture<PDNCheckResponse>>()
 
    fun add(requestId: String) {
        results.put(requestId, CompletableFuture())
    }
 
    fun get(requestId: String): CompletableFuture<PDNCheckResponse>? {
        return results.get(requestId)
    }
 
     fun remove(requestId: String) {
        results.remove(requestId)
    }
}

results — это потокобезопасная Map, где ключ — наш reuestId в качестве значения CompletableFuture.

В конце обработки требуется очистка Map по ключам, которые уже отработали, иначе мы получим бесконечно растущий размер хранилища. Не забывайте про это! Сделать это можно в ClientService, добавив try-finally-блок. А еще можно найти готовые коллекции с самоочисткой старых значений или фиксированным размером значений — мы позже так и сделали.

Вот так получаем результат из Кафки:

@Component
class PDNCheckResponseListener(
    private val resultContainer: ResultContainer
) {
     
    @KafkaListener(topics = ["topic-B"])
    fun onMessage(message: PDNCheckResponse) {
        resultContainer.get(message.requestId)?.complete(message)
    }
}

Получая сообщение из топика Б с результатом проверки, мы просто шли в ResultContainer. Если находили, комплитили Future с ответом, в противном случае пропускали сообщение. 

У нас несколько инстансов. Какой из них ожидает результата, листенер не знает, он должен читать все сообщения. Для этого используем requestId, который отсекает ненужные сообщения как от нашего сервиса, так и от чужих. Помним, что топик для ответов Б с результатом проверки был общим для многих сервисов. 

Казалось бы, все сделано. Оставался последний вопрос: как сделать кафка-консьюмер, который будет читать все сообщения на каждом инстансе? Вариант с уникальной groupId не подходил из-за инфраструктурных ограничений по количеству групп для одного сервиса. 

Мы использовали Спринг, поэтому я пошел читать документацию и испытал одно смешное разочарование. Прочитав доки, я не нашел ответа —  только в самом конце, в разделе Tips, Tricks and Examples, было готовое решение для нас. Эх, как же долго я к нему шел!

Скриншот из документации SpringBoot Kafka
Скриншот из документации SpringBoot Kafka

Предлагаю немного подробнее посмотреть, что происходит. В нашем случае требуется читать все записи из всех партиций топика. В таких случаях использование групп потребителей нежелательно, так как Кафка сама распределяет партиции между экземплярами. 

В примере из документации используется механизм ручного назначения партиций (@TopicPartition) с динамическим определением их списка через SpEL-выражение.

@KafkaListener(topicPartitions = @TopicPartition(
    topic = "compacted",
    partitions = "#{@finder.partitions('compacted')}",
    partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
))

Список партиций формируется во время запуска приложения с помощью компонента PartitionFinder, который создает временный Kafka Consumer и вызывает partitionsFor(topic). Это позволяет получить актуальный список партиций, даже если их количество менялось.

public String[] partitions(String topic) {
    try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
	    return consumer.partitionsFor(topic).stream()
	        .map(pi -> "" + pi.partition())
	        .toArray(String[]::new);
    }
}

Параметр ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = earliest в конфигурации позволяет загружать данные в порядке их попадания в очередь.

У нашего подхода оказался небольшой побочный эффект: если настроены мониторинги на Кафку и топики, то с ним будут проблемы. Ключевую метрику для потребителей — консьюмер-лаг — мы не можем мониторить из коробки, ведь каждый из наших консьюмеров читает все партиции и непонятно, что считать лагом в таком случае. 

Есть еще один нюанс, с которым мы, надеюсь, не столкнемся. Если в процессе работы владелец топика увеличит количество партиций в топике, мы про это узнаем только после рестарта, так как поиск партиций у нас происходит при развертывании приложения. Нам это грозит потерей сообщений, ушедших в новую партицию до следующего редеплоя.

Вот все и готово.

Выводы

Краткий список проблем, которые мы преодолели:

  1. Топик Б с результатом проверки ПДН был общим для всех внутренних систем компании. В нашем случае это решалось уникальным requestId, который отсекал все лишнее.

  2. Процесс проверки условия был асинхронны: мы отправляли запрос в топик А, а ответы — в топик Б с результатом проверки. Для синхронизации между потоками взяли ResultContainer, который использовал потокобезопасную реализацию Map (ConcurrentHashMap).

  3. Если продюсер и консьюмер работают в разных потоках, использование потокобезпасной коллекции решает эту проблему.

  4. Ответ по Кафке в топик Б с результатом проверки может и не прийти, поэтому не стоит навечно зависать в ожидании ответа. Нужный ответ мы получали по соответствующему requestId, а использование Future из коробки решило проблему бесконечного ожидания. Мы использовали вызов с таймаутом.

  5. Наше приложение имело больше одного инстанса, HTTP-вызов от клиента приходил на один из них — там же надо было и ответить. Эта проблема тоже решается по уникальному requestId для каждого запроса и получением каждого из инстансов нашего приложения всех сообщений во всех партициях топика Б.

Я рассказал об одном из вариантов внедрения асинхронного взаимодействия внутри синхронного процесса. Мы не придумали ничего нового, а использовали классические библиотеки из джава-канкаренси-пакета.

Неочевидный пример чтения всех сообщений со всех партиций — динамическое назначение всех партиций Кафки без использования групп потребителей. Подобные случаи бывают при работе на легаси-системах, когда им требуется новая интеграция. А еще подход будет полезен в системах, где необходимо гарантированно обработать все сообщения, независимо от предыдущих запусков.

Наш опыт — хороший пример того, что нужно продумывать архитектуру заранее, чтобы не писать такие нетривиальные решения. Последующее внедрение решения затронуло интеграционные тесты нашего приложения, и нам пришлось внедрять подобный механизм, чтобы протестировать успешный флоу. А еще есть и неуспешные исходы, которые также требуют тестов. Но это уже совсем другая история :) 

Комментарии (5)


  1. Farongy
    29.07.2025 07:26

    Может дешевле было редис/хазель прикрутить, если вам БД жалко?


    1. borino Автор
      29.07.2025 07:26

      Редис и Хазел внутри нашего приложения не используются - любая новая интеграция кажется более сложным решением по сравнением со встроенными механизмами джава. Новая интеграция это сетевые задержки, это точка которую надо поддерживать, мониторить... - в общем на первый взгляд не кажется простым решением.


      1. Farongy
        29.07.2025 07:26

        Новая интеграция это сетевые задержки

        А чтение партиции всеми консьюмерами - это прям образец эффективности. Ага.

        Как вы решили проблему оффсетов? Или просто забили? Что будет, когда консьюмер уйдёт в, например, STW, а остальные продолжат работать?

        Есть, например, embedded hazelcast, который решил бы вашу проблему и заодно не создал бы новых.


        1. borino Автор
          29.07.2025 07:26

          Спасибо вам за настойчивость в решениях. Безусловно решение с распределенным кешом выглядит заманчиво и просто.

          В статье описан один из вариантов, который нам показался наиболее простым в реализации и достаточно эффективным в отведенные сроки.

          Проблему оффсетов для себя рассматривали как минимальную или если сказать по простому как вы и написали - забили.

          Учитывая что вся проверка происходит в момент запроса от клиента - если произойдет STW или отвалится консьюмер - у клиента высветится окошко по типу 'попробуйте позже', после чего он спокойно еще раз сможет попробовать, где на вторую попытку подключения вероятность подобной проблемы уже крайне мала.


  1. anevsky
    29.07.2025 07:26

    Интересная статья, всегда приятно почитать по технические реализации высоко нагруженных систем.

    В текущей реализации ResultContainer используется ConcurrentHashMap, где CompletableFuture-объекты остаются в памяти до явного удаления. Если удаление по requestId по каким-либо причинам не произойдёт (например, из-за исключения в обработке или забыли вызвать remove), контейнер будет неограниченно расти. Это особенно опасно в условиях высокой нагрузки или при нестабильной работе внешнего сервиса, когда возникает много "висячих" запросов.

    Чтобы это решить, можно рассмотреть использование кеширующих структур с автоудалением, например:

    • Guava Cache с expireAfterWrite

    • Caffeine с expireAfter и ограничением размера

    • ConcurrentLinkedHashMap

    Это позволит автоматически удалять устаревшие или неотработанные записи без необходимости вручную управлять очисткой. Такой подход помогает избежать утечек памяти и делает ResultContainer более устойчивым к ошибкам или недочётам в логике очистки.