Привет, меня зовут Екатерина, я работаю в ОТП Банке на позиции Senior-разработчика в одном из трайбов. В продолжение предыдущей статьи мы вместе с Александром, главным solution-архитектором, расскажем о вызовах, с которыми столкнулись при внедрении нереляционного хранилища в наше ДБО.
В ОТП в первую очередь думают об удобстве клиента. Использование современных отказоустойчивых систем, таких как Elasticsearch, — один из важных трейд-оффов: с одной стороны, мы повышаем скорость и качество клиентского опыта, с другой — усложняем архитектуру и сталкиваемся с проблемами, которых не было бы при более простом подходе. Все это работает на эту цель.
В этой статье мы поделимся частью таких кейсов и расскажем, как наша команда их решала.

Задача: современная лента операций

Перед нами стояла задача создать ленту операций, которая соответствовала бы трем ключевым требованиям:
Высокая производительность: Быстрый доступ к клиентским транзакциям при больших объемах данных.
Гибкий поиск: Поддержка полнотекстового поиска и фильтрации.
Масштабируемость по глубине данных: Возможность работы с разной глубиной истории операций.
Мы рассмотрели несколько архитектурных подходов и проанализировали подходящие для них технологии.
Вариант 1: Реляционная база данных с кешем

Суть подхода: Классическая связка базы данных (например, PostgreSQL) и горячего кеша в памяти для оперативных данных.
Почему отказались: Главная проблема — глубина поиска данных. Глубина поиска для выписок может составлять от нескольких дней до нескольких лет. Хранить многолетний кеш для всех клиентов крайне ресурсоёмко с точки зрения оперативной памяти. При горизонтальном масштабировании пришлось бы либо использовать отдельное распределенное кеш-хранилище (например, Redis), что усложняет архитектуру, либо механизм sticky sessions, от которого мы хотели уйти. Этот подход хорошо справляется с недавними данными, но не подходит для эффективного поиска по полной истории.
Кроме того, по результатам множества бенчмарков, системы вроде Elasticsearch показывают значительно более высокую скорость выполнения сложных поисковых запросов по большим datasets по сравнению с реляционными СУБД.
Вариант 2: ClickHouse

Суть подхода: Использование колоночной аналитической СУБД, оптимизированной для быстрого чтения.
-
Почему отказались: ClickHouse отлично подходит для аналитических отчетов, но оказался неудобен для нашей онлайн-системы по нескольким причинам:
Сложность эксплуатации: Требует глубоких знаний для поддержки и тонкой настройки.
Ограничения на обновления: Отсутствие привычной поддержки точечных UPDATE. Клиентские операции часто обновляются (например, баланс карты может меняться многократно за день), что в ClickHouse приводит к необходимости замены всей записи. Это ведет к высокой фрагментации данных и снижению производительности.
Вариант 3: Elasticsearch

Суть подхода: Использование поискового движка, заточенного под работу с большими объемами неструктурированных данных.
-
Почему выбрали: Этот вариант наилучшим образом соответствовал нашим требованиям:
Скорость поиска
Встроенная морфология и поддержка нечеткого (fuzzy) поиска
Производительность на больших данных: Система стабильно работает с миллиардами документов, в то время как запросы аналогичной сложности в SQL-базах могут выполняться неприемлемо долго.
Гибкость схемы: Данные хранятся в виде JSON-документов, возможность создавать динамический маппинг
Масштабируемость: Поддержка шардирования и репликации реализована на уровне платформы, что избавляет от необходимости разработки кастомных решений.
Итог: После анализа мы остановились на Elasticsearch как на наиболее сбалансированном решении. Стоит отметить, что на момент принятия решения у команды не было практического опыта работы с этой технологией. Поэтому процесс внедрения сопровождался как успешными находками, так и преодолением трудностей, о которых мы подробнее расскажем в следующих разделах.
Проблема 1. Производительность Elasticsearch и количество индексов

В Elasticsearch данные организованы в индексы, которые, в свою очередь, делятся на шарды. Изначально мы выбрали простую и логичную схему: выделить отдельный индекс для каждой организации (клиента). На старте, при небольшом количестве клиентов, система работала идеально, обеспечивая мгновенный отклик.
Однако с ростом числа клиентов мы столкнулись с фундаментальным ограничением Elasticsearch: производительность критически падает при большом количестве индексов и шард. Согласно официальной документации, для одного узла не рекомендуется превышать лимит в 1000 шард [источник]. Хотя этот лимит можно увеличить, сама компания Elastic предупреждает, что это временное решение, и стабильная работа не гарантируется [источник].
На практике деградация проявилась быстро: время операций (например, реиндексации документов) выросло до ~200 мс. Для высоконагруженной системы с SLA в десятки миллисекунд такие показатели стали неприемлемыми.
Расчет допустимого количества индексов
Чтобы найти решение, мы рассчитали верхнюю границу количества индексов для нашей инфраструктуры. Исходили из следующей логики:
Конфигурация одного индекса: 8 шард (для горизонтального масштабирования) × 4 реплики (для отказоустойчивости) = 32 шарда на индекс.
Общий лимит для нашего кластера из 4 узлов: ~4000 шард.
С учетом запаса на миграции и техническое обслуживание мы задействовали только треть от общего лимита: ≈1300 шард.
Поделив 1300 на 32, мы получили максимум ~40 индексов. Стало очевидно, что первоначальная схема «один индекс на организацию» нежизнеспособна на долгосрочную перспективу.
Поиск оптимальной стратегии
Мы рассмотрели несколько путей оптимизации структуры индексов:
-
Объединение мелких организаций в общие индексы.
Почему отказались: Невозможно надежно прогнозировать объем транзакций компании. Маленький клиент сегодня мог стать крупным завтра, создавая дисбаланс нагрузки в общем индексе.
-
Разбивка данных по временным интервалам (например, по месяцам).
Почему отказались: Этот подход усложняет архитектуру, так как поиск по истории операций потребовал бы отправки множественных запросов (multisearch) к нескольким индексам. Это сделало бы время отклика непредсказуемым и увеличило бы нагрузку на кластер.
-
Разбивка по регионам.
Почему отказались: Деление оказалось слишком крупным и неравномерным, что не решало проблему эффективного распределения нагрузки.
В итоге мы выбрали стратегию равномерного распределения организаций по фиксированному пулу индексов. Этот подход позволил нам жестко контролировать общее количество индексов (в пределах рассчитанного лимита в 40 штук), обеспечивая предсказуемую производительность, отказоустойчивость и простоту масштабирования.
Скрытый текст
public void init() {
final var indexes = indexRepository.findAll().stream()
.collect(Collectors.toMap(OrganizationIndex::getOrganizationId, Function.identity()));
INDEX_BY_ORGANIZATION_ID_MAP.putAll(indexes);
}
/**
* При запросе на регистрацию присваиваем организации индекс (по очереди)
*/
@Transactional
public void initializeOrganization(final UUID organizationId) {
init();
// горячий кеш с данными индексов и организаций
if (INDEX_BY_ORGANIZATION_ID_MAP.containsKey(organizationId)) {
return;
}
final int organizationsCount = (int) indexRepository.count();
// номер индекса - остаток от деления на количество организаций
final var indexName = getIndexName(organizationsCount % indexCount + 1);
final var index = indexRepository.save(OrganizationIndex.of(organizationId, indexName, OrganizationIndexStatus.READY));
INDEX_BY_ORGANIZATION_ID_MAP.put(organizationId, index);
log.info("Add organization to index {}", index);
}Проблема 2. Дублирование данных и конкурентные обновления
Клиентские операции в нашей системе могут создаваться через несколько независимых каналов:
Обращение в офис;
Новое ДБО (дистанционное банковское обслуживание);
Старое ДБО;
Внутренние операции.
После создания событие проходит через цепочку промежуточных сервисов и финализируется в АБС. Критически важным моментом стало то, что обновление одной и той же операции могло быть инициировано разными системами параллельно. На практике это вылилось в ситуацию, когда обновления для одного документа в Elasticsearch приходили одновременно из разных источников — через REST-API и несколько Kafka-топиков.
Почему стандартные механизмы Elasticsearch не подходят
Попытка обновлять данные «в лоб» привела к потере изменений. Причина в том, что Elasticsearch не гарантирует строгой консистентности (immediate consistency) на уровне отдельных документов. Даже принудительное обновление индекса после записи с помощью withRefreshPolicy(RefreshPolicy.IMMEDIATE) не защищает от race condition (гонки состояний) между параллельными запросами.
В отличие от реляционных СУБД, в Elasticsearch отсутствуют механизмы строгой блокировки на уровне записи (row-level locking) в рамках транзакции. В результате конкурентные операции update могут выполняться на устаревшей версии документа, затирая изменения друг друга.
Этап 1: Оптимистическая блокировка (Optimistic Concurrency Control)
В качестве первого решения мы реализовали оптимистическую блокировку средствами самого Elasticsearch, используя механизм версий документов (_version). Однако этот метод не сработал в наших условиях. Из-за высокой задержки индексации, вызванной проблемой с большим количеством индексов (см. предыдущий раздел), обновления приходили быстрее, чем успевала обновляться версия документа в индексе. Это приводило к лавине ошибок конфликта, и большая часть обновлений не применялась.
Этап 2: Специализированный сервис-агрегатор и синхронизация через Redis
Мы пришли к выводу, что нужен централизованный механизм синхронизации, вынесенный за пределы Elasticsearch. Для этого был разработан отдельный сервис-агрегатор, который выполняет следующие функции:
Принимает события из всех источников (REST, Kafka-топики).
Приводит их к единому формату данных.
Определяет приоритет обработки для событий из разных каналов.
Вводит небольшие искусственные задержки для выравнивания нагрузки.
Таким образом, разнородные операции превращались в упорядоченный поток событий с единым идентификатором.

Для синхронизации параллельных обращений к одному и тому же идентификатору операции и сохранения stateless-архитектуры самого агрегатора мы использовали Redis в качестве распределенного кэша и легковесного координатора:
Ключ: Идентификатор операции.
Значение: Временная метка последнего принятого обновления.
Скрытый текст
public void handle(final TransactionUpdate transactionUpdate) {
final var startTime = System.currentTimeMillis();
// transactionUpdate - это класс wrapper, который оперирует идентификаторами операций
transactionUpdate.message().getUpdateIds().forEach(updateId -> {
try {
// делаем лок в redis c максимальным временем обработки - 1 минута
redisLockRegistry.executeLocked(
updateId,
Duration.ofMillis(lockTimeoutMs),
() -> {
// в связи с тем, что нужно дать время elasticsearch обновить индексы, то вычисляем время ожидания для операций, которые одновременно обновляются из разных источников
final long timeToWait = getTimeToWaitAndUpdateCache(updateId);
if (timeToWait <= 0) {
process(transactionUpdate, updateId);
return;
}
Thread.sleep(timeToWait);
process(transactionUpdate, updateId);
});
} catch (Exception e) {
log.error("Error while processing update transaction {}, in: {}", updateId, timeDurationFrom(startTime), e);
throw new UnexpectedException("Error while processing update transaction");
}
});
log.info("Transaction message was processed in: {}", timeDurationFrom(startTime));
}
public long getTimeToWaitAndUpdateCache(final String updateId) {
// ищем в редисе последнее время обновления операции
final var savedLastUpdateTime = transactionLastUpdateRepository.findById(updateId)
.map(TransactionLastUpdate::getLastUpdateTime)
.orElse(null);
// получаем время для начала обновления
final var newLastUpdateTime = calculateUpdateTime(savedLastUpdateTime);
// обновляем данные в кеше redis
transactionLastUpdateRepository.save(TransactionLastUpdate.builder()
.updateId(updateId)
.lastUpdateTime(newLastUpdateTime)
.ttl(lastUpdateCacheTtlSec)
.build());
long timeToWait = newLastUpdateTime - System.currentTimeMillis();
log.debug("Calculate time to wait: id {}, new update time {}, time to wait {}",
updateId, newLastUpdateTime, timeToWait);
return timeToWait;
}
private long calculateUpdateTime(@Nullable final Long lastUpdateTime) {
// операцию еще не обновляли или обновляли в прошлом (раньше, чем delayMilliSeconds назад), значит можно обновлять сразу
if (isReadyForProcess(lastUpdateTime)) {
return System.currentTimeMillis();
}
// в ином случае надо ожидать
return lastUpdateTime + delayMilliSeconds;
}
private boolean isReadyForProcess(@Nullable final Long lastUpdateTime) {
return lastUpdateTime == null || lastUpdateTime < System.currentTimeMillis() - delayMilliSeconds;
}
private void process(final TransactionUpdate transactionUpdate, final String updateId) {
// по типу источника информации понимаем, какой пришел формат данных, и применяем необходимые операции к нему
switch (transactionUpdate.type()) {
case TYPE_1 -> service1.process(transactionUpdate);
case TYPE_2 - > service1.process(transactionUpdate, updateId);
.....
default -> throw new UnsupportedOperationException("Unknown update type " + transactionUpdate.type());
}
}
Это решение позволило эффективно справляться с параллельными обновлениями и стало временным решением проблемы.
Этап 3: Гарантия порядка с помощью Kafka и унификация формата
Решение с сервисом-агрегатором и Redis было работоспособным, но добавляло сложность: необходимость поддерживать отдельный сервис и инфраструктуру кэша. Мы решили пересмотреть архитектуру потоков данных, чтобы найти более фундаментальное и простое решение.
Проблемы предыдущего подхода:
Разнородность форматов: Источники продолжали генерировать события в разных структурах данных.
Нарушение последовательности: Не было гарантии, что обновления одной операции придут в агрегатор в правильном хронологическом порядке.
Архитектурные изменения
Мы провели ряд ключевых изменений, переведя взаимодействие на Apache Kafka:
Kafka с ключом сообщения
В качестве ключа каждого сообщения стал использоваться идентификатор операции. Это фундаментальное изменение, так как Kafka гарантирует порядок доставки сообщений с одним и тем же ключом в рамках одной партиции. Это решило проблему конкурентных обновлений на транспортном уровне, устранив саму возможность нарушения последовательности.Единый унифицированный формат
Мы разработали и внедрили единый контракт для событий об операциях. Все системы-источники стали публиковать сообщения только в этом формате. Это позволило упростить логику агрегатора, который теперь мог напрямую десериализовать и обрабатывать входящие события без дополнительных преобразований.Консолидация топиков
Вместо множества специализированных топиков для каждого сервиса мы ввели единый топик для событий, связанных с операциями. Это сместило фокус: теперь не каждый сервис диктовал свой формат, а агрегатор определял единый контракт для всего потока данных.
Результат
Благодаря этим изменениям нам удалось полностью отказаться от механизма синхронизации через Redis. Новый подход обеспечил:
Предсказуемость: Порядок обработки гарантирован средствами Kafka.
Упрощение архитектуры: Исчезла необходимость в поддержке отдельного кэша и логики разрешения конфликтов.
Повышение надежности: Система стала менее зависимой от дополнительных компонентов.
Минимальная компонентная архитектура развертки
В качестве итога приведем минимальную рабочую, по нашему мнению, архитектуру кластера Elasticsearch, разработанную для следующих условий:
Инфраструктура: 2 основных ЦОДа с кластерами Kubernetes и микросервисами.
Ограничение: 1 дополнительный ЦОД для кворума с ограниченными ресурсами.
Схематичное изображение финальной архитектуры представлено ниже.

Для начала кратко обозначим роли нод, которые мы использовали:
Мастер-нода (Master Node): Формирует кворум, управляет состоянием кластера (метаданные, шардирование), обеспечивает консистентность и защиту от split-brain. Не хранит данные и не обрабатывает пользовательские запросы.
Координирующая нода (Coordinating Node): Принимает запросы от микросервисов на чтение и запись, маршрутизирует их к нужным data-нодам и агрегирует результаты. Является входной точкой в кластер.
Data-нода (Data Node): Хранит данные, выполняет поиск, агрегацию и другие операции непосредственно с индексами.
Выбранная конфигурация
Исходя из требований отказоустойчивости и производительности, мы развернули кластер следующей конфигурации:
Координирующие ноды: По 1 ноде в каждом из 2 основных ЦОДов. Это обеспечивает минимальные задержки для микросервисов, которые обращаются к координатору в своем ЦОДе.
Мастер-ноды: 3 ноды, распределенные по трем ЦОДам. Это гарантирует работоспособность кворума (требуется большинство, т.е. 2 из 3) даже при полном отказе одного из основных ЦОДов. В случае сетевого разрыва кластер запретит запись на изолированном участке, но сохранит доступность данных для чтения.
Data-ноды: 4 ноды в основных ЦОДах. Это минимальное количество для эффективного шардирования, которое можно легко увеличить для горизонтального масштабирования.
Итог
Представленная архитектура позволила нам построить отказоустойчивую и надежную систему, которая соответствует строгим требованиям бизнеса к доступности и производительности. Она устойчива к сбоям на уровне ЦОДа и предоставляет четкий путь для дальнейшего масштабирования хранилища данных.