
Привет, Хабр! Я Борис Шляга, программист в отделе разработки кредитных продуктов для физических лиц. Мы занимаемся продуктом «Кубышка» — это небольшой заем до зарплаты на непредвиденные расходы.
С появлением новых правил регулирования кредитов и займов возникла необходимость в проверке долговой нагрузки клиентов при выдаче продукта. Теперь Кубышка не может быть одобрена клиентам с высоким ПДН.
Сервис расчета ПДН давно работал внутри компании — нам оставалось только реализовать интеграцию с ним. Расскажу, как мы это сделали, как решили проблемы «по дороге» и что в итоге получилось.
Постановка задачи
Мы живем в неидеальном мире, а системы — тем более, особенно когда ограничены правилами. В нашем проекте нельзя было менять формат взаимодействия, контракт или структуру ответных сообщений. Все приходилось решать на стороне нашего сервиса. Тем интереснее было искать элегантное решение в заданных условиях.
Работало себе приложение на Котлине и Спринге, используя стандартную реляционную БД. В приложении процесс подключения услуги работал по запросу от клиента. Все было устроено просто. Обычный HTTP-вызов, внутри которого проходила проверка документов клиента. Если запретов по клиенту не было, услуга подключалась, подготавливались документы для клиента и отправлялся ответ со ссылкой на документы.

Нюанс был в том, что интеграция с сервисом существовала только через Кафку, то есть в асинхронном режиме.
Логичный вопрос: почему не сменить канал интеграции и не переписать все на рест? К сожалению, в отведенные сроки команда, владеющая сервисом расчета ПДН, все переписать не могла. Поэтому ничего не оставалось, как использовать то, что есть.
Проблемы и их решения
Во время разработки решения мы столкнулись с рядом проблем. Расскажу, как решали каждую:
Проблема. Топик Б с результатом проверки был общим для всех внутренних систем компании. Процесс проверки условия асинхронный, то есть мы отправляли запрос в топик А, а ответы с результатом проверки прилетали в топик Б.
Решение. Так как топик ответов Б с результатами проверки общий, его слушает много внутренних сервисов компании. Декларируем свою консьюмер-группу, чтобы получить сообщение, ожидаемое нашим сервисом. Такой подход обеспечит нам свою очередь всех сообщений.
Проблема. Как отфильтровать сообщения, предназначенные только для нашего сервиса.
Решение. На стороне нашего сервиса перед отправкой в топик А генерируем уникальный идентификатор (request_id
), который перекладывается в топик Б с результатом проверки. Проверяя request_id
, мы понимаем, что сообщение для нас, и запускаем его обработку, а остальные сообщения пропускаем.
Идеально было бы вынести это на уровень хидеров сообщения, в таком случае парсить боди сообщения не пришлось бы. Но тут мы всего лишь клиенты, поэтому используем как есть.
Проблема. Продюсер и консьюмер работают в разных потоках. Ответ по Кафке в топик Б с результатом проверки может и не прийти, поэтому не стоит навечно зависать в ожидании ответа внутри пользовательского запроса. Последующая очистка наших топиков (на стороне Кафки) происходила по настройкам брокера Кафки, была настройка по времени жизни (retention.ms
) и по размеру (retention.bytes
).
Решение. Мнения в команде разделились, и было несколько вариантов. Расскажу о самом простом.
Нам пришел запрос от пользователя на подключение продукта. На основе входящих данных генерируем запрос на проверку для топика А, сохраняем запись в БД с ключом request_id
и отправляем в Кафку сообщение. Теперь мы знаем, какое сообщение хотим получить. Дальше можно делать другую работу, например запускать другие проверки.
Наступает момент, когда нам нужно получить ответ. Тогда мы запускаем «бесконечный» цикл, в котором бегаем в БД с нашим request_id
и проверяем, есть ли там ответ. Если есть, используем его по назначению.
Вернемся к «бесконечному» циклу. Я взял в кавычки слово потому, что мы должны ожидать ответа с определенным таймаутом — согласно проблемам, которые мы решаем. Ну и как же появляется ответ? Наш консьюмер, получая в топике Б ответ с результатом проверки, проверял, нет ли записи с таким request_id
в БД. Если есть, сохранял ответ в БД.

Проблема. Конечно же, у нас все модно и молодежно — микросервисы. Наше приложение имело больше одного инстанса, HTTP-вызов от клиента приходил на один из них, и отвечать нужно было там же.
Решение проблемы микросервисной архитектуры покрывается решением с БД. Любой инстанс нашего приложения, прочитав сообщение из топика Б с результатом проверки, сохранял результат в БД.
Приложение микросервисное, но БД у всех инстансов одна — она и была точкой синхронизации. Подобное решение позволило избегать сложных межпотоковых взаимодействий, прекрасно распределяло нагрузку по отправке и чтению сообщений из Кафки, так как консьюмеры распределялись по всем инстансам. А еще выбранное решение обеспечило долговременное хранение и использование результата, при этом обладая хорошей отказоустойчивостью, так как данные хранятся на диске.
Но есть и минусы — это взаимодействие с БД. Появился постоянный опрос БД о результате, и нужно было решить, как ходить за результатом в БД. Например, N-е количество раз с фиксированной задержкой, чтобы соблюдать общий таймаут на проверку.
Имплементируя подобное решение, требуется правильно рассчитать нагрузку, а если рассчитать тяжело, нужно тестировать.
Мы с командой решили, что не хотим нагружать БД и будем делать все в памяти.
В нашем случае это подходило из-за специфического флоу: данные нужны только во время запроса клиента. Перезагрузка наших инстансов влечет очистку очередей в памяти, но нам они и не нужны, ведь запрос клиента уже закончен и долго висеть не может. Если требуется какое то долговременное хранение, значит, это плюс в копилочку решения с БД.
Реализация итогового решения
Итоговое решение получилось таким: клиент отправляет HTTP-запрос на наш сервис, сервис кладет идентификатор запроса в ResultContainer
и отправляет сообщение в топик А.
Топик А слушает внешний обработчик, отправляя в топик Б ответ (с тем же идентификатором из запроса — request_id
) с результатом проверки. Наш сервис ожидает решения, слушая топик Б с результатом проверки, и складывает ответы в ResultContainer
по идентификатору из ответа. Он должен соответствовать идентификатору из запроса.
Наш сервис, дождавшись ответа в ResultContiner
, заканчивает обработку запроса клиента.

На словах все очень просто. Разберем процесс обработки запроса клиента в нашем сервисе подробнее.
@Service
class ClientService(private val pdnCheckService: PDNCheckService) {
fun processRequest(clientRequest: ClientRequest) {
// some checks before
//run pdn checking by kafka
val requestId = pdnCheckService.checkClient(clientRequest.clientId)
//do some another actions
//check pdn check result
val pdnCheckResponse = pdnCheckService.getResult(requestId)
// verify additional check result
pdnCheckResponse.verify(clientRequest)
}
}
Получаем запрос, делаем какие-то действия и отправляем сообщение для проверки ПДН клиента. Затем можем совершать любые другие действия. И в конце пытаемся проверить результат проверки pdnCheckService.getResult(requestId).
Внутри pdnCheckService
для проверки клиента (метод checkClient
) мы генерируем request
и сразу сохраняем его в ResultContainer
для последующего чтения результата проверки. Отправляем сообщение в топик А.
Метод возвращает requestId,
который требуется для получения результата. Получение результата происходит через запрос ResultContainer
по requestId
, который вернет Future
с результатом. Просто делаем future.get(),
не забывая про таймаут!
Если результата нет, кидаем ошибку, чтобы быстрее ответить клиенту и он смог попробовать еще раз.
const val TIMEOUT_SEC: Long = 10
@Service
class PDNCheckService(
private val kafkaTemplate: KafkaTemplate,
private val resultContainer: ResultContainer
) {
fun checkClient(clientId: String): String {
val checkRequest = clientId.toRequestDto()
resultContainer.add(checkRequest.requestId)
kafkaTemplate.sendDefault(checkRequest)
return checkRequest.requestId
}
fun getResult(requestId: String): PDNCheckResponse {
val responseFuture = resultContainer.get(requestId)
return if (responseFuture != null) {
responseFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS)
} else {
throw IllegalStateException("No result for request $requestId")
}
}
}
Вот так выглядит ResultContainer:
@Component
class ResultContainer {
private val results = ConcurrentHashMap<String, CompletableFuture<PDNCheckResponse>>()
fun add(requestId: String) {
results.put(requestId, CompletableFuture())
}
fun get(requestId: String): CompletableFuture<PDNCheckResponse>? {
return results.get(requestId)
}
fun remove(requestId: String) {
results.remove(requestId)
}
}
results
— это потокобезопасная Map, где ключ — наш reuestId
в качестве значения CompletableFuture.
В конце обработки требуется очистка Map по ключам, которые уже отработали, иначе мы получим бесконечно растущий размер хранилища. Не забывайте про это! Сделать это можно в ClientService,
добавив try-finally-блок. А еще можно найти готовые коллекции с самоочисткой старых значений или фиксированным размером значений — мы позже так и сделали.
Вот так получаем результат из Кафки:
@Component
class PDNCheckResponseListener(
private val resultContainer: ResultContainer
) {
@KafkaListener(topics = ["topic-B"])
fun onMessage(message: PDNCheckResponse) {
resultContainer.get(message.requestId)?.complete(message)
}
}
Получая сообщение из топика Б с результатом проверки, мы просто шли в ResultContainer.
Если находили, комплитили Future
с ответом, в противном случае пропускали сообщение.
У нас несколько инстансов. Какой из них ожидает результата, листенер не знает, он должен читать все сообщения. Для этого используем requestId,
который отсекает ненужные сообщения как от нашего сервиса, так и от чужих. Помним, что топик для ответов Б с результатом проверки был общим для многих сервисов.
Казалось бы, все сделано. Оставался последний вопрос: как сделать кафка-консьюмер, который будет читать все сообщения на каждом инстансе? Вариант с уникальной groupId не подходил из-за инфраструктурных ограничений по количеству групп для одного сервиса.
Мы использовали Спринг, поэтому я пошел читать документацию и испытал одно смешное разочарование. Прочитав доки, я не нашел ответа — только в самом конце, в разделе Tips, Tricks and Examples, было готовое решение для нас. Эх, как же долго я к нему шел!

Предлагаю немного подробнее посмотреть, что происходит. В нашем случае требуется читать все записи из всех партиций топика. В таких случаях использование групп потребителей нежелательно, так как Кафка сама распределяет партиции между экземплярами.
В примере из документации используется механизм ручного назначения партиций (@TopicPartition
) с динамическим определением их списка через SpEL-выражение.
@KafkaListener(topicPartitions = @TopicPartition(
topic = "compacted",
partitions = "#{@finder.partitions('compacted')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
))
Список партиций формируется во время запуска приложения с помощью компонента PartitionFinder,
который создает временный Kafka Consumer и вызывает partitionsFor(topic)
. Это позволяет получить актуальный список партиций, даже если их количество менялось.
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
Параметр ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = earliest
в конфигурации позволяет загружать данные в порядке их попадания в очередь.
У нашего подхода оказался небольшой побочный эффект: если настроены мониторинги на Кафку и топики, то с ним будут проблемы. Ключевую метрику для потребителей — консьюмер-лаг — мы не можем мониторить из коробки, ведь каждый из наших консьюмеров читает все партиции и непонятно, что считать лагом в таком случае.
Есть еще один нюанс, с которым мы, надеюсь, не столкнемся. Если в процессе работы владелец топика увеличит количество партиций в топике, мы про это узнаем только после рестарта, так как поиск партиций у нас происходит при развертывании приложения. Нам это грозит потерей сообщений, ушедших в новую партицию до следующего редеплоя.
Вот все и готово.
Выводы
Краткий список проблем, которые мы преодолели:
Топик Б с результатом проверки ПДН был общим для всех внутренних систем компании. В нашем случае это решалось уникальным
requestId,
который отсекал все лишнее.Процесс проверки условия был асинхронны: мы отправляли запрос в топик А, а ответы — в топик Б с результатом проверки. Для синхронизации между потоками взяли ResultContainer, который использовал потокобезопасную реализацию Map (
ConcurrentHashMap
).Если продюсер и консьюмер работают в разных потоках, использование потокобезпасной коллекции решает эту проблему.
Ответ по Кафке в топик Б с результатом проверки может и не прийти, поэтому не стоит навечно зависать в ожидании ответа. Нужный ответ мы получали по соответствующему
requestId
, а использование Future из коробки решило проблему бесконечного ожидания. Мы использовали вызов с таймаутом.Наше приложение имело больше одного инстанса, HTTP-вызов от клиента приходил на один из них — там же надо было и ответить. Эта проблема тоже решается по уникальному requestId для каждого запроса и получением каждого из инстансов нашего приложения всех сообщений во всех партициях топика Б.
Я рассказал об одном из вариантов внедрения асинхронного взаимодействия внутри синхронного процесса. Мы не придумали ничего нового, а использовали классические библиотеки из джава-канкаренси-пакета.
Неочевидный пример чтения всех сообщений со всех партиций — динамическое назначение всех партиций Кафки без использования групп потребителей. Подобные случаи бывают при работе на легаси-системах, когда им требуется новая интеграция. А еще подход будет полезен в системах, где необходимо гарантированно обработать все сообщения, независимо от предыдущих запусков.
Наш опыт — хороший пример того, что нужно продумывать архитектуру заранее, чтобы не писать такие нетривиальные решения. Последующее внедрение решения затронуло интеграционные тесты нашего приложения, и нам пришлось внедрять подобный механизм, чтобы протестировать успешный флоу. А еще есть и неуспешные исходы, которые также требуют тестов. Но это уже совсем другая история :)
Комментарии (5)
anevsky
29.07.2025 07:26Интересная статья, всегда приятно почитать по технические реализации высоко нагруженных систем.
В текущей реализации
ResultContainer
используетсяConcurrentHashMap
, гдеCompletableFuture
-объекты остаются в памяти до явного удаления. Если удаление поrequestId
по каким-либо причинам не произойдёт (например, из-за исключения в обработке или забыли вызватьremove
), контейнер будет неограниченно расти. Это особенно опасно в условиях высокой нагрузки или при нестабильной работе внешнего сервиса, когда возникает много "висячих" запросов.Чтобы это решить, можно рассмотреть использование кеширующих структур с автоудалением, например:
Guava Cache
сexpireAfterWrite
Caffeine
сexpireAfter
и ограничением размераConcurrentLinkedHashMap
Это позволит автоматически удалять устаревшие или неотработанные записи без необходимости вручную управлять очисткой. Такой подход помогает избежать утечек памяти и делает
ResultContainer
более устойчивым к ошибкам или недочётам в логике очистки.
Farongy
Может дешевле было редис/хазель прикрутить, если вам БД жалко?
borino Автор
Редис и Хазел внутри нашего приложения не используются - любая новая интеграция кажется более сложным решением по сравнением со встроенными механизмами джава. Новая интеграция это сетевые задержки, это точка которую надо поддерживать, мониторить... - в общем на первый взгляд не кажется простым решением.
Farongy
А чтение партиции всеми консьюмерами - это прям образец эффективности. Ага.
Как вы решили проблему оффсетов? Или просто забили? Что будет, когда консьюмер уйдёт в, например, STW, а остальные продолжат работать?
Есть, например, embedded hazelcast, который решил бы вашу проблему и заодно не создал бы новых.
borino Автор
Спасибо вам за настойчивость в решениях. Безусловно решение с распределенным кешом выглядит заманчиво и просто.
В статье описан один из вариантов, который нам показался наиболее простым в реализации и достаточно эффективным в отведенные сроки.
Проблему оффсетов для себя рассматривали как минимальную или если сказать по простому как вы и написали - забили.
Учитывая что вся проверка происходит в момент запроса от клиента - если произойдет STW или отвалится консьюмер - у клиента высветится окошко по типу 'попробуйте позже', после чего он спокойно еще раз сможет попробовать, где на вторую попытку подключения вероятность подобной проблемы уже крайне мала.