В инфраструктуре банка мы ежедневно работаем с ~200 000 физических устройств: это IP-камеры, сетевые видеорегистраторы и другая периферия. Каждое из них должно быть проверено на доступность — не реже чем раз в 10 минут.
Требования к системе:
- не создавать бессмысленную нагрузку на инфраструктуру;
- решение должно быть горизонтально масштабируемым;
- система должна выдерживать падение pod, rebalance, сетевые сбои.
Контекст
Вся полная информация об устройствах — включая IP, порты, MAC-адреса, версии прошивок, локации и прочие метаданные — хранится в gateway-service в PostgreSQL-базе.
Этот сервис — единый справочник для всего слоя управления оборудованием
Второй сервис, который и занимается непосредственно проверкой сетевой доступности оборудования - checkup-service не нуждается во всём этом объёме данных. Для проверки доступности ему достаточно:
уникальный идентификатор устройства;
IP и порт;
его первоначальное состояние (
ONLINE/OFFLINE);
checkup-service не имеет собственной базы данных. Все события — добавление, изменение или удаление устройств — он получает через Kafka от gateway-service.
Архитектурный выбор: почему не PostgreSQL, а между Redis и Kafka
В нашей инфраструктуре уже работают три ключевых компонента: PostgreSQL, Redis и Kafka.
PostgreSQL — основной источник истины для бизнес-данных, но его нагрузка уже достигает 80-90%, и добавление 200 000 периодических проверок создало бы непосильную нагрузку
Redis — используется для кэширования (как ни странно)
Kafka — обрабатывает все асинхронные события между микросервисами
Задача: выбрать решение, которое не создаст дополнительной нагрузки на уже нагруженную инфраструктуру.
Как инициализируется начальное состояние
Процесс инициализации состояния в системе выглядит следующим образом:
После первого релиза сервис работы с расписанием публикует специальное событие в топик
tool-kit-scheduler;gateway-service, подписанный на этот топик, получает событие;gateway-serviceформирует полный слепок всех устройств из своей PostgreSQL-базы и публикует каждое устройство как отдельное сообщение в compacted топикdevice-management-checkup-elementary;checkup-serviceполучает все сообщения и восстанавливает состояние;После этого в топик приходят только изменения: добавление, обновление, удаление (tombstone).
Если состояние потеряно (упала Kafka/Redis, кто-то еще) — администратор может инициировать повторную генерацию события в tool-kit-scheduler — и всё восстановится автоматически.
Решение 1: Архитектура на Redis
Распределённый мониторинг сотен тысяч устройств — это не про гигантские кластеры, а про правильную декомпозицию - хотя нет, это про гигантские кластеры. Мы планировали использовать Redis как средство координации задач, а Kafka — только как канал доставки. Система должна выдерживать падения как pod сервисов, так и Redis, масштабироваться до 400K+ устройств и при этом обеспечивать равномерную сетевую нагрузку.
Самое простое решение - считать все устройства из Redis и проверить. Да, это решение работает, но только когда у вас одна pod, не нужно горизонтального масштабирования, вы уверены, что один pod в состоянии обработать 200К устройств за отведенный интервал и OOM не вызывает никаких ассоциаций. Но, если в системе хотя бы два pod, то возникают вопросы:
Предположим, pod 1 считал все 200к устройств. Но второй тоже их считает. Как они будут захватывать блокировку на устройства, чтобы не проверить одно и тоже устройство с двух pod одновременно?
В Redis есть возможность захватить блокировку. Но, если интервал проверки 5 мин, то это ~700 обращений в Redis каждую секунду и это только с одного pod, чтобы попытаться ее получить. Бонусом идет то, что половина этих попыток будет бесполезна.
Три сущности
Чтобы система масштабировалась и не пропускала проверки, мы спроектировали три сущности, каждая со своей ролью.
1. Корзины (сегменты\слоты) — логические группы для балансировки
Все устройства должны были распределяться по 4096 фиксированным корзинам:
Ключ: device:management:checkup:elementary:bucket:devices:{bucketId}
Тип: Set
Значения: Список идентификаторов устройств
Номер корзины для устройства вычисляется, к примеру, так:
int bucketId = Math.abs(deviceId.hashCode()) % 4096;
Зачем корзины и почему именно 4096 ?
Без корзин у нас есть 200 000 независимых задач. Координировать их напрямую — невозможно:
Нельзя захватить блокировку на каждое устройство, это слишком дорого;
Нельзя эффективно выбрать «все устройства, готовые к проверке».
Корзины решают это, превращая 200K задач в 4096 логических единиц работы.
Каждая корзина — это:
Единица блокировки: только один pod может обрабатывать её в момент времени;
Единица шедулинга: у всей корзины — одно время следующей проверки;
Гарантия атомарности: все устройства внутри проверяются единым пакетом. Хотя это скорее ограничение этого подхода, а не его плюс
Почему 4096, а не 100 или 10 000?
Мы рассчитываем рост до 300–400K устройств. При 200K устройств — ~49 на корзину, при 400K — ~98. Это позволяет набирать устройства порциями по 500. Один pod берёт 10 корзин, другая — следующие 10 и так далее. Если окажется, что один pod не справляется с 500 устройствами за интервал, мы можем снизить нагрузку, например, до 300 устройств (6 корзин). Но если корзин было бы, скажем, 100, то минимальная единица работы — уже ~2000 устройств, и уменьшить нагрузку ниже этого порога было бы невозможно, потому что корзину можно обрабатывать только целиком.
Поскольку deviceId — это UUID, его хеш распределяется практически равномерно. Статистическое моделирование на реальных данных (200K UUID) показывает: среднее количество устройств на корзину: ~49. Стандартное отклонение: ±10. 99% корзин содержат от 34 до 64 устройств.
Почему корзины хранят только ID, а не полные устройства?
Это сознательное (или осознанное, а может и бессознательное) архитектурное решение, основанное на двух соображениях.
Во-первых, эффективность при изменении состояния.
Состояние устройства (ONLINE/OFFLINE) может меняться у ~0.5% устройств в типичном цикле, но в пиковые сценарии практически может достигать 3–5% (~1 000 штук). Если бы полные данные устройства хранились внутри корзины, каждое такое изменение требовало бы перезаписи всей структуры корзины, даже если меняется только один байт. Это создало бы дополнительный overhead на сериализацию, память и сетевой трафик, несмотря на небольшой размер корзин. А мы должны сохранять полученное состояние устройства, чтобы не посылать gateway-service события в которых нет изменений.
Во-вторых, чёткое разделение зон ответственности.
Корзина отвечает только за принадлежность устройства к сегменту и координацию выполнения. Устройство отвечает за свои данные. Расписание отвечает за время запуска.
Такой подход делает систему более устойчивой к изменениям: добавление нового поля в устройство не затрагивает логику корзин, а изменение стратегии шедулинга — не требует перестройки хранения данных.
2. Расписание — когда проверять корзину
Когда проверять каждую корзину, должен был решать Sorted Set (ZSET):
Ключ: device:management:checkup:elementary:check:schedule
Score: Unix timestamp (в секундах)
Member: bucketId
Зачем отдельное расписание?
Опять же, если бы мы меняли время проверки в корзине, то каждый раз при его изменении, мы бы целиком сохраняли ее всю. Хотя поменялось всего одно поле, а ее содержимое - нет. Также, здесь важно использование именно Sorted Set.
В результате мы получаем: O(1) обновление после проверки, O(log N) выборку готовых корзин через ZRANGEBYSCORE.Естественное упорядочение: самые «старые» корзины в начале.
Без ZSET пришлось бы хранить nextCheckTime у каждой корзины, сканировать все 4096 записей каждые 3 секунды;
3. Устройства — отдельно, в формате JSON
Каждое устройство должно было храниться в Redis как строка:
Ключ: device:management:checkup:elementary:device:{deviceId}
Значение: {"ip":"10.0.0.5","port":554,"state":"ONLINE"}
Почему так?
Независимость от логики группировки. Если бы устройство хранилось внутри корзины, любое обновление IP/порта требовало бы полной перезаписи корзины — это дорого при высокой частоте изменений.
Эффективная загрузка. При проверке корзины мы сначала получаем список UUID через
SMEMBERS, а затем однимMGETзагружаем все устройства. Это один сетевой запрос, а не сотни.Простота обновления состояния. Когда устройство меняет статус (ONLINE → OFFLINE), мы просто перезаписываем его JSON — без влияния на корзину или расписание.
4. Атомарные операции: зачем Lua
При добавлении или удаления устройства важно сохранить целостность всех трёх слоёв: устройство, корзина, расписание.
Представьте: pod удаляет устройство.
Удаляет из корзины (
SREM);Удаляет само устройство (
DEL).
Если она упадёт между шагами 1 и 2 (а так они обычно и делают), устройство исчезнет из корзины, но останется в Redis. Потеря навсегда и печаль для всех.
Чтобы этого избежать, мы планировали делать всё атомарно используя Lua.
Добавление:
-- ATOMIC_ADD
redis.call('SET', KEYS[1], ARGV[1]) -- устройство
redis.call('SADD', KEYS[2], ARGV[2]) -- в корзину
redis.call('ZADD', KEYS[3], 'NX', ARGV[3], ARGV[4]) -- в расписание, только если ещё нет
Удаление:
-- Удаляем устройство и из корзины
-- Но НЕ трогаем расписание — корзина продолжает проверяться
redis.call('DEL', KEYS[1]) -- Удаляем device:id
redis.call('SREM', KEYS[2], ARGV[1]) -- Удаляем из bucket:devices:id
Скрипты выполняются в Redis через Spring Data Redis:
redisTemplate.execute(ATOMIC_ADD_SCRIPT,
List.of(deviceKey, bucketKey, scheduleKey),
List.of(deviceJson, deviceIdStr, timestampStr, bucketIdStr));
Как должен был работать цикл проверки на Redis
На каждой итерации checkup-service должен был выполнять следующий алгоритм:
Шаг 1: Выбор корзин для проверки
Выбирает до 100 расписаний ( в котором хранится номер корзины), у которых nextCheckTime ≤ now.
Почему именно 100? Предположим, мы поднимаем до 10 pod checkup-service в кластере. Каждый pod обрабатывает до 500 устройств за раз. При среднем размере корзины ~49 устройств, 500 устройств = ~10 корзин может обработать один pod. Чтобы все 10 pod гарантированно получили работу, нужно выбрать не менее 100 корзин. Если выбрать меньше (например, 10–30), первые 2–3 pod захватят всё расписание, а остальные останутся без задач.
Шаг 2: Захват блокировки корзины
Для каждой корзины пытается захватить блокировку:
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent("bucket:lock:" + bucketId, podId, lockDuration);
где lockDuration = checkInterval - 10 сек (например, 590 сек при интервале 600 сек). а podId - уникальный идентификатор pod, например его можно сгенерить при старте UUID.randomUUID()
Шаг 3: Обработка захваченных корзин
Загружает все UUID из bucket:devices:{id};
Получает данные устройств через pipeline;
Проверяет каждое устройство из корзины используя виртуальные потоки (основное их описание ниже);
Отправляет в Kafka только изменения состояния;
Обновляет nextCheckTime = now + checkInterval;
Не освобождает блокировку вручную - она снимается сама по TTL.
Почему lockDuration = checkInterval - 10?
Чтобы другие pod, которые уже прочитали расписание со старым временем, не смогли захватить корзину, даже если их цикл отстал. Это гарантирует исключительность обработки внутри интервала.
Шаг 4: Адаптивный цикл опроса
checkup-service pod должны были работать по адаптивному циклу. Базовый интервал - 3 секунды. Если в расписании нет готовых корзин, или все захвачены, интервал увеличивается по backoff: 3 => 3.6 => 4.3 => ... максимум 10 сек (умножение на ~1.2). Как только pod получает хотя бы одну корзину, интервал сбрасывается на 3 сек. Это позволяет не спамить Redis при низкой нагрузке, быстро реагировать на появление новых задач.
Ограничение параллелизма: почему 100, а не 10 000
Мы используем Java 21 и виртуальные потоки, которые отлично подходят для I/O-операций. Как указано в официальной документации (вот здесь коллеги запускают 10 000 потоков совершенно не стесняясь https://openjdk.org/jeps/444), виртуальные потоки легковесны и почти не потребляют память.
Однако ресурсы операционной системы конечны. Даже если виртуальные потоки «бесплатны» на уровне JVM, сокеты, file descriptors и сеть — нет.
На одном pod (1 ядро, 1 ГБ RAM) уже работают Kafka consumer, HTTP endpoints, фоновые задачи. File descriptors ограничены. Сеть может быть перегружена при >100 одновременных TCP-соединений (100 - мало, это скорее как пример, того, что ресурсы конечны).
Кроме того, виртуальные потоки исполняются поверх ForkJoinPool, уровень параллелизма которого по умолчанию равен количеству ядер на pod (обычно 1–2 в нашем случае). Хотя виртуальные потоки могут блокироваться без блокировки carrier-потоков, сами carrier-потоки (платформенные) всё равно ограничены и при большом числе pinned-потоков могут исчерпаться и мы увидим деградацию производительности, вместо роста.
Именно поэтому мы ограничиваем число одновременно выполняющихся проверок семафором, как рекомендует документация Java:
Semaphore concurrentConnectionLimiter = new Semaphore(100);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture[] futures = devices.stream().map(device ->
CompletableFuture.runAsync(() -> {
try {
concurrentConnectionLimiter.acquire();
try {
// проверка устройства
} finally {
concurrentConnectionLimiter.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor)
).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
}
Такой подход позволяет контролировать расходование ресурсов ОС
Отказоустойчивость: что происходит при падении?
Сценарий |
Последствия |
Восстановление |
|---|---|---|
Падение checkup-service pod |
Обработка её сегментов останавливается |
Через |
Падение Redis |
Полная остановка |
Админ инициирует повторную генерацию события в |
Что же здесь не так хорошо, как хотелось?
Расписания, корзины, ZSET, Lua-скрипты и так далее. Это похоже на оверинжиниринг.
Проверка всех устройств корзины атомарно, то есть целиком. Предположим, что в одной корзине оказались 49 "быстрых" устройств, время проверки которых 10 мс и 1 "медленное", со временем проверки 2 сек. В этом случае, время проверки всей корзины будет 2 сек.
Мы постоянно обращаемся в Redis, хотя никакой бизнес потребности в этом нет
И мы решили пересмотреть подход ...
Решение 2: Архитектура на Kafka Streams
Как работает Kafka Streams в этом сценарии
Сервис gateway-service все также один раз отправляет все события в Kafka. Ключом события все также используется id устройства. Kafka гарантирует, что все события с одним и тем же ключом всегда попадают в одну и ту же партицию. Это фундаментальное свойство, лежащее в основе корректной работы stateful-логики.
Сервис checkup-service, под "капотом" которого Kafka Streams, читает этот топик и создаёт у себя в памяти State Store — локальное key-value хранилище (аналог Map в java), где ключом выступает id устройства, а значением — объект с данными устройства (ip, port, state)
Одновременно Kafka Streams автоматически создаёт служебный changelog-топик, имя которого формируется как:
<application-id>-<имя-state-store>-changelog
Для задания application-id используется свойство:
spring.cloud.stream.kafka.streams.binder.application-id=имя
Этот топик имеет ровно столько же партиций, сколько и исходный топик (в который пишет gateway-service) и используется для персистентного хранения всех изменений State Store. При получении события из основного топика Kafka Streams:
Обновляет локальный State Store,
Фоном отправляет это изменение в changelog-топик с тем же ключом.
Если сервис checkup-service перезапускается, Kafka Streams автоматически восстанавливает State Store, последовательно читая все сообщения из соответствующих партиций changelog-топика. Благодаря этому состояние в State Store полностью восстанавливается даже после длительного простоя.
Реализация на Spring Cloud Stream и Kafka Streams
Необходимые зависимости
"org.springframework.cloud:spring-cloud-starter-stream-kafka",
"org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
Конфигурация топологии Kafka streams
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DeviceKafkaStreamsConfiguration {
private final ObjectMapper objectMapper;
private final CoreKafkaStreams core;
private final CoreKafkaStreamsProperties properties;
@Bean
public Consumer<KStream<UUID, DeviceDto>> processDeviceStream() {
// Конфигурация changelog для state store
Map<String, String> changelogConfig = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT,
TopicConfig.RETENTION_MS_CONFIG, "-1",
TopicConfig.RETENTION_BYTES_CONFIG, "-1",
TopicConfig.DELETE_RETENTION_MS_CONFIG, "43200000", // 12 часов на tombstone
TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, "43200000" // 12 часов на компакцию
);
return inputStream -> {
Materialized<UUID, DeviceDto, KeyValueStore<Bytes, byte[]>> store =
Materialized.<UUID, DeviceDto, KeyValueStore<Bytes, byte[]>>as(properties.stateStoreName())
.withKeySerde(new Serdes.UUIDSerde())
.withValueSerde(new JacksonJsonSerde<>(DeviceDto.class))
.withLoggingEnabled(changelogConfig);
// Сначала создаём state store из входящего потока
inputStream.toTable(store);
// Подключаем кастомный процессор. Делается именно в таком порядке.
inputStream.process(() -> new DeviceKafkaStreamsProcessor(
core,
properties.stateStoreName(),
objectMapper,
properties.checkInterval()
), properties.stateStoreName());
};
}
}
Ключевые моменты реализации:
Что здесь происходит простыми словами: мы говорим kafka streams, читай входящий топик, в котором ключом события выступает UUID, а тело представляет из себя объект, описывающий устройство. Далее запиши полученные сообщения в state store.
Map<String, String> changelogConfig = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT,
TopicConfig.RETENTION_MS_CONFIG, "-1",
TopicConfig.RETENTION_BYTES_CONFIG, "-1",
TopicConfig.DELETE_RETENTION_MS_CONFIG, "43200000", // 12 часов
TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, "43200000" // 12 часов
);
Здесь мы настраиваем параметры change-log топика, в котором будут хранится данные для State Store. Его будет использовать Kafka Streams для восстановления после падений, rebalance и так далее (ниже подробнее). Основной параметр - TopicConfig.CLEANUP_POLICY_COMPACT
Materialized<UUID, DeviceDto, KeyValueStore<Bytes, byte[]>> — это описание State Store. Указание KeyValueStore<Bytes, byte[]> является обязательным и связано с внутренним устройством хранилища RocksDB. Другие типы вызовут ошибку во время выполнения.
withKeySerde(new Serdes.UUIDSerde()) — указывает, как десериализовывать ключ. На стороне продюсера (gateway-service) обязательно должен использоваться org.apache.kafka.common.serialization.UUIDSerializer.
withValueSerde(new JacksonJsonSerde<>(DeviceDto.class)) — для payload. Если gateway-service сериализует тело сообщения используя org.apache.kafka.common.serialization.ByteArraySerializer, то на самом деле вызывается ObjectMapper в byte[] , а мы здесь просто десериализуем эти байты обратно в объект.
Порядок вызовов важен: сначала toTable() для создания State Store, затем process() для подключения бизнес-логики.
Changelog-топик: механизм отказоустойчивости
Главная проблема любого in-memory хранилища - pod могут в любой момент умереть, переместиться или быть масштабированы, и вся память будет потеряна.
Kafka Streams автоматически создаёт и управляет changelog-топиком, который является персистентной копией State Store прямо внутри Kafka.
Процесс синхронизации:
Сообщение от
gateway-serviceприходит в исходный топик.Kafka Streams внутри
checkup-serviceобрабатывает его и применяет изменение к своему локальному State Store.Kafka Streams отправляет это изменение в changelog-топик.
При перезапуске pod, Kafka Streams восстанавливает State Store из changelog-топика.
Реализация кастомного процессора
В нашем случае, требуется выполнить дополнительную бизнес-логику: например, запустить проверку устройства сразу после его появления или изменения. Для этого после toTable() мы добавляем наш кастомный Processor.
@Slf4j
public class DeviceKafkaStreamsProcessor implements Processor<UUID, DeviceDto, Void, Void>, Punctuator {
private final String stateStoreName;
private ProcessorContext<Void, Void> context;
private ReadOnlyKeyValueStore<UUID, ValueAndTimestamp<DeviceDto>> stateStore;
private final Duration checkInterval;
//Конструктор пропущен
@Override
public void init(ProcessorContext<Void, Void> context) {
log.info("Старт инициализации kafka streams processor, идентификатор: {}", context.toString());
this.context = context;
this.context.schedule(checkInterval, PunctuationType.WALL_CLOCK_TIME, this);
this.stateStore = this.context.getStateStore(stateStoreName);
log.info("Kafka streams processor с идентификатором {} инициализирован, интервал punctuator раз в: {} сек.",
context, checkInterval.toSeconds());
}
@Override
public void close() {
log.info("Закрываем kafka streams processor c идентификатором: {}", context.toString());
}
@Override
public void process(Record<UUID, DeviceDto> record) {
log.trace("Получено событие. id: {}, payload:{}", record.key(), record.value());
// Метод process() вызывается ПОСЛЕ обновления state store через toTable().
// Здесь можно безопасно читать актуальное состояние из state store, или вызвать бизнес логику для проверки устройства
}
@Override
public void punctuate(long timestamp) {
// Периодическая задача, например, проверка всех устройств в данной партиции.
// Это пример, как можно получить все записи из state store
List<DeviceDto> result = new ArrayList<>();
try (KeyValueIterator<UUID, ValueAndTimestamp<DeviceDto>> iterator = stateStore.all()) {
while (iterator.hasNext()) {
KeyValue<UUID, ValueAndTimestamp<DeviceDto>> entry = iterator.next();
// Остальной код который необходим
}
}
}
}
Важные детали:
Processor создаётся на каждую задачу (task). Обычно 1 задача = 1 партиция входного топика. Это означает, что каждый Processor имеет доступ только к данным из своих назначенных партиций. Если посмотреть на логи приложения, то можно увидеть, что строк
log.info("Kafka streams processor с идентификатором ....будет ровно столько же, сколько партиций обрабатывает сервис.init()вызывается при первоначальном старте, после каждого rebalance и при восстановлении State Store. Это надёжное место для инициализации.в методе
process(Record<UUID, DeviceDto> record)можно обработать событие, полученное из входящего топикаthis.context.schedule(checkInterval, PunctuationType.WALL_CLOCK_TIME, this)в этом месте запускается punctuatorReadOnlyKeyValueStore— именно так следует обращаться к State Store. Запись в него напрямую не рекомендуется. Единственный правильный способ обновить состояние — отправить новое событие в исходный топик, из которого читает Kafka Streams.И самое важное - punctuator вызывается в основном потоке Kafka Streams. Если сделать что-то вроде
TimeUnit.SECONDS.sleep(1000)внутри метода punctuate, то мы его остановим, что грозит rebalance и т.д.
В методе punctuate мы получаем все устройства из State Store и проверяем их в виртуальных потоках, как и в примере с Redis. С той лишь разницей, что можно проверять не целиком пачку устройств, а по одному передавая задачи на проверку в Executors.newVirtualThreadPerTaskExecutor(). Важно не забывать, что реальные ресурсы ОС конечны и нельзя создать сразу 70 000 socket соединений. Следует использовать semaphore.
Чтение целиком всего state store с 15 000 устройств занимает ~50 мс на pod в 1 CPU.
Запуск и rebalance
Запуск
Assignment - Kafka Streams получает список партиций для обработки.
Восстановление - для каждой партиции читается соответствующая партиция changelog-топика, и все сообщения применяются к локальному State Store.
Запуск обработки - только после полного восстановления начинается чтение новых сообщений из исходного топика.
Запуск Punctuator - периодические задачи стартуют в последнюю очередь.
Rebalance
Запускается второй экземпляр сервиса checkup-service.
-
Pod 1 получает уведомление
onPartitionsRevoked([15-29]):Останавливает Punctuator для этих партиций.
Очищает локальный State Store для партиций 15-29.
-
Pod 2 получает уведомление
onPartitionsAssigned([15-29]):Восстанавливает State Store из changelog (партиции 15-29).
Запускает Punctuator для новых партиций.
Почему не Spring Scheduler
Стандартный @Scheduled не совсем подходит следующим причинам:
Не знает о состоянии — может запуститься до завершения восстановления State Store.
Не знает о партициях — может пытаться обрабатывать "чужие" данные.
При rebalance — продолжает работать даже после потери партиций.
В принципе, у State Store есть механизм защиты от такого поведения. Но punctuator кажется наиболее нативным способом получить доступ к State Store.
Итоги и награды
Архитектура на Redis (как хотели):
Да, так можно сделать, мы вывели решение на тестовые стенды и оно работало
Но излишняя сложность делают из этого решения скорее интересный case, который, впрочем, можно применять не только для таких случаев
Архитектура на Kafka Streams (как сделали):
Kafka Streams — полноценный stateful движок, который полностью закрывает наши потребности
State Store + Changelog — надёжное, распределённое хранилище с автоматическим восстановлением при утере локального состояния
Простой подход с Punctuator устраняет необходимость в сложных системах координации
Rebalance и отказоустойчивость — встроены «из коробки»
Система масштабируется до 1 000 000 устройств без изменений архитектуры. Просто при создании входящего топика
device-management-checkup-elementaryсоздайте его со 100 партициями.
Ключевой вывод: Переход от распределённой координации через Redis к встроенным механизмам Kafka Streams позволил полностью устранить нагрузку на инфраструктуру для координации задач. Вся системная нагрузка теперь сводится только к бизнес-логике — проверке устройств. Kafka Streams обеспечивает автоматическое распределение данных, отказоустойчивость и согласованность состояния без каких-либо дополнительных компонентов или сложных механизмов координации.
Система стала проще, надёжнее и требует значительно меньше операционных затрат на поддержку, при этом обеспечивая те же гарантии и лучшую масштабируемость.