Привет, Хабр! Меня зовут Никита Летов, и я являюсь Java бекенд разработчиком, а также Техлидом. Данный пост продолжает серию постов по разработке бэкенд-микросервисов на Java и Spring.

Для тех, кому важен только код и читать лень, предоставляю ссылку на GitHub по которой вы найдете полностью функциональную реализацию описываемого решения и попробовать ее внедрить или усовершенствовать:

·       https://github.com/TechniXC/sse-service

Если будут сложности или вопросы по проекту — добро пожаловать в комментарии или личные сообщения, постараюсь всем помочь и ответить на вопросы.

Введение

В этом посте я расскажу, почему для доставки push-уведомлений в клиентское приложение была выбрана технология Server-Sent Events, и главное, как реализовать масштабируемый SSE сервис на Java + Spring Boot + Webflux, который подойдет для использования не только в пет-проектах, но и в крупных организациях с большим количеством пользователей. Естественно, без подготовки дополнительной инфраструктуры не обойдется.

Перед тем, как я начну повествование, хочу обратить ваше внимание, что на просторах Хабр-а я не нашел большего количества информации по SSE особенно в Java, что мне кажется странным, ведь SSE не особо новая технология, однако есть такой доклад в котором довольна кратко рассказано про сам подход и основные шаги для реализации, при этом главная проблема описываемого там проекта – отсутствие расширяемости, в случае если потребуется многоподовость или зеркалирование сервиса на другом кластере – то описываемое там решение надо серьезно дорабатывать.

Итак, перейдем к проблеме и решению. (Можно промотать вниз, до первых блоков кода, если интересна только реализация.)

Проблема и выбор решения

В современных клиентских-приложениях, будь то Web, iOS или Android, требуется обновления данных Ui и чем быстрее это произойдет, тем лучше для потребителя и желательно, чтобы это происходило в режиме реального времени. Согласитесь, никто не хочет обновлять страничку или перезагружать приложение, чтобы увидеть новые балансы на банковском счету или новые доступные продукты, на которые потребитель оформил подписку.

Так же, кроме обновления интерфейса, часто возникает требование доставки адресных уведомлений клиенту, порой содержащих чувствительную информацию, в том числе коды доступа или перс-данные.

Для решения данных задач можно использовать разные подходы и вот несколько их них:

  • Polling (Long / Short): самый простой подход – клиент через определённые интервалы времени делает HTTP-запросы на сервер, проверяя, нет ли новых данных. При short polling запросы происходят регулярно (например, каждые N секунд. При Long polling клиент делает запрос и сервер держит соединение открытым, пока не появятся новые данные, после чего отсылает ответ и соединение закрывается, и так в цикле. Оба подхода создают приличную нагрузку на сеть и серверную составляющую, особенно когда клиентов много.

  • WebSockets: полноценный двунаправленный канал поверх TCP. Клиент и сервер выполняют специальный WebSocket-handshake (HTTP Upgrade запрос), после чего устанавливается постоянное соединение, по которому обе стороны могут слать сообщения асинхронно без накладных расходов на установление новых подключений. Данный подход идеально подходит для сценариев с частыми обменами данными с минимальной задержкой – чаты, онлайн-игры и т.п. Однако за удобство приходится платить сложностью - необходимо обрабатывать обрывы соединений, соответственно требуется отправка данных даже когда тоннель не используется.

  • Server-Sent Events: стандартизованный механизм одностороннего push-соединения, является частью спецификации HTML5 и в отличие от WebSocket описывает только одностороннее соединение сервер -> клиент. SSE значительно проще в использовании. Браузер открывает соединение и начинает получать сообщения типа http-stream. Все это работает поверх HTTP/HTTPS, поэтому не требует никаких нестандартных протоколов или ручного управления сокетами. Браузер сам автоматически повторно подключается при разрыве, что упрощает обработку временных сбоев. Формат передаваемых сообщений - текстовые строки вида data: {...} (Да-да! Мы можем слать, наш любимый JSON.

  • Web Push API: отдельная технология, решающая другую задачу – доставка уведомлений даже когда сайт не открыт у пользователя. Web Push основан на Service Worker в браузере и в мобильном устройстве: пользователь дает разрешение, после чего браузер генерирует subscription endpoint. Сервер отправляет сообщения не напрямую, а через push-провайдера, например Google FCM или Apple APNs, и если даже вкладка веб-приложения не активна, браузер покажет нативное уведомление. Важно понимать, что данный подход не реализует прямое соединение, полагается на push-провайдера и его SLA, часто провайдеры web-push не имеют никаких сертификаций и попросту их использование может быть заблокировано ИБ компании или регулятором, если речь идет о Банках, так же для использования в WEB-приложениях обычно это стоит подписки.

Большинство приложений для мобильных устройств, как мы знаем, используют Web-Push подход, однако для определенных компаний, в частности Финансовых организаций, он может не подходить по причинам, связанным с информационной безопасностью, в случае отправки клиентам чувствительных данных.

Получается, для доставки pushуведомлений на клиентские приложения у нас остается только 3 подхода. Давайте взглянем на исследование, в котором производится сравнительный анализ накладных расходов на сетевое взаимодействие при использовании всех трех технологий.

Рис. 1. Сравнение накладных расходов на сетевое взаимодействие.
Рис. 1. Сравнение накладных расходов на сетевое взаимодействие.

На рисунке наглядно представлено, как Polling, WebSocket и SSE нагружают сеть передаваемыми данными просто при простое «туннеля». А поскольку, мы хотим систему, работающую в энтерпрайз окружении, где каждый лишний байт, помноженный на милионное количество клиентов, может выстрелить нам в ногу, выбираем решение, наименее сильно нагружающее сеть – SSE.

Архитекутра

Давайте рассмотрим простейший пример работы сервиса, в одну поду с хранением всех данных в памяти внутри самого сервиса.

Рис. 2. Работа приложения в одну поду.
Рис. 2. Работа приложения в одну поду.

В посте, на который я ранее давал ссылку, архитектура выглядит именно так. Решение построено чисто на Java + Spring Boot без использования дополнительной инфраструктуры в виде брокеров сообщений или кеша. И все прекрасно работает, по крайней мере до тех пор, пока у нас количество потребителей не вырастет до критической отметки и не придет время масштабировать решение.

Давайте теперь разберемся, зачем нам нужна какая-то ещё инфраструктура, почему бы просто не увеличить количество под сервисом и всё.

Рис. 3. Работа приложения в несколько под.
Рис. 3. Работа приложения в несколько под.

Дело в том, что при сохранении пользователей в памяти сервиса, эта память никак не делится между подами одного Deployment Config, и соответственно, если запрос на отправку сообщения придет в под в котором клиент не зарегистрирован – сообщение не сможет быть доставлено, так как для сервиса такого клиента не существует. А если сервис перезагрузится - данные из памяти будут потеряны, и соответственно сообщения также не будут доставлены.

Но ведь можно жить на одной поде, увеличивая ей процессор и память, но имеет ли такой подход право на жизнь в рамках Энтерпрайза? Короткий ответ – нет, так как это как минимум не надежно.

Для решения данных проблем нам нужны две новых сущности:

  1. Общее хранилище подписок на события сервера.

  2. Общая очередь, для хранения самих сообщений перед отправкой.

Рис. 4. Целевая Архитектура приложения.
Рис. 4. Целевая Архитектура приложения.

В примере, я буду использовать в качестве хранилища подписок – Redis, а в качестве очереди RabbitMQ. Выбор данных решений обусловлен несколькими факторами:

  • Redis поддерживает TTL и мы можем создавать подписку на определенный срок, а затем авто-удалять в случае если сервис не сделает этого в следствии сбоя. Затем всегда можно пере регистрировать клиента в случае необходимости

  • Redis гибок и не требует дополнительных манипуляций с таблицами, правами и т.д. Работа с ним будет организована через JpaRepository, так что в случае необходимости всегда можно перейти на другую NoSql или SQL БД.

  • RabbitMQ просто прекрасно подходит для данной задачи, так как он поддерживает авто-создание и авто-удаление очередей в случае если на них никто не подписан, чего не умеет, например та же Kafka.

  • RabbitMQ легковесный, при этом легко-расширяемый простым добавлением нод в кластер.

  • RabbitMQ отлично дружит с реактивным стеком и WebFlux.

Реализация

Наконец, перейдем к самом интересному, к коду.

Подразумевается, что уже развернуты: (Если вы не используете мой проект)

  • Redis доступный по redis:6379

  • RabbitMQ доступныйпоrabbitmq:5672(скредами: user/password)

Ни Redis, ни RabbitMQ настраивать дополнительно не надо, вся конфигурация будет в коде.

Шаги, которые мы будет реализовывать:

  1. Подписка / Регистрация: GET-метод /api/events/subscribe с Content-Type: text/event-stream, который будет отвечать за привязку пользователю уникального queueId, регистрацию пользователя, а также подписку на получение сообщений.

  2. Отправка события в сервис: POST-метод  /api/events/send-message который будет отвечать за получение DTO Event и отправку его в процессор сообщений. В примере мы не будем реализовывать отправку сообщений в сервис SSE асинхронно через другой брокер, а просто сделаем отправку клиенту, через POST метод. При желании его легко можно перевести на получение Event из топика Кафки.

  3. Процессор: Компонент сервиса, принимающий DTO Event, определяющий назначение сообщения по связке в Redis и отправляющий сообщение в сервис обработки эвентов.

  4. Сервис регистрации: Отвечающий за сохранение связки userId-queueId в Redis

  5. Сервис обработки эвентов: Отвечающий за получение и отправку сообщений по SSE.

Начнем с конфигурации application.yml:

spring:
  main:
    web-application-type: reactive
    allow-bean-definition-overriding: true
  application:
    name: sse-service
  data:
    redis:
      host: redis
      port: 6379
      timeout: 5000ms
  rabbitmq:
    host: rabbitmq
    port: 5672
    username: user
    password: password

Основные зависимости, которые потребуются нам для работы сервиса build.gradle:

//Reactive for SSE
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.2'

//Data
implementation 'org.springframework.boot:spring-boot-starter-data-redis'

Начнем с контроллера, в котором мы опишем контракт подписки и отправки сообщений.

В качестве авторизационных данных, мы будем использовать хедер “x-user”, который будет содержать некий идентификатор пользователя.

Тут подразумевается, что ваш сервис расположен за API Gateway, на котором происходит разбор токенов доступа и обогащение запросов к проксируемым сервисам хедером с идентификатором пользователя. (Процесс аутентификации)

В методе “/subscribe” мы генерируем уникальный queueId, затем регистрируем пользователя через сервис “RegistrationService” и запускаем прием стриминга через “SsePushService” Flux<String>, где строка это и будет наш JSONв виде текста:

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/events")
public class SseController {

    private final SsePushService userMessageService;
    private final RegistrationService registrationService;
    private final EventProcessor eventProcessor;

    public final static String HEADER_NAME = "x-user";

    @GetMapping(path = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> receive(@RequestHeader(name = HEADER_NAME) String userId) {
        var queueId = UUID.randomUUID().toString();
        registrationService.registerUser(userId, queueId);
        return userMessageService.receiveStream(queueId, userId);
    }
  
    ...
  
}

В ручке методе “/send-message” мы просто отправляем полученный Event DTO в процессор, который уже определит дальнейшую логику:

    @PostMapping(path = "/send-message")
    public ResponseEntity<Void> sendMessage(@RequestBody Event message) {
        eventProcessor.process(message);
        return ResponseEntity.ok().build();
    }

Объявим репозиторий доступа в Redis:

public interface RegisteredUsersRedisRepository extends CrudRepository<RegisteredUserHash, String> {

    Optional<RegisteredUserHash> findByQueueId(String queueId);
    List<RegisteredUserHash> findByUserId(String userId);
}

а также сущность:

@Data
@Builder
@RedisHash("sse-push-user")
public class RegisteredUserHash {

    @Id
    private String id;
    @Indexed
    private String userId;
    @Indexed
    private String queueId;
}

И теперь сервис регистрации пользователей, единственной задачей которого будет сохранить связку в Redis:

@Service
@RequiredArgsConstructor
public class RedisRegistrationServiceImpl implements RegistrationService {

    private final RegisteredUsersRedisRepository usersRepository;

    @Override
    public void registerUser(String userId, String queueId) {
        usersRepository.save(
                RegisteredUserHash.builder()
                        .id(UUID.randomUUID().toString())
                        .userId(userId)
                        .queueId(queueId)
                        .build());
    }
}

Перед тем, как мы реализуем ядро нашего сервиса – SsePushService, необходимо создать корректную реактивную Java конфигурацию для RabbitMQ:

@Configuration
@RequiredArgsConstructor
public class RabbitMqConfiguration {

    private final RabbitProperties;

    @Bean
    Mono<Connection> rabbitMqConnection() {
        ConnectionFactory = new ConnectionFactory();
        connectionFactory.setHost(rabbitProperties.getHost());
        connectionFactory.setPort(rabbitProperties.getPort());
        connectionFactory.useNio();
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        return Mono.fromCallable(() -> connectionFactory.newConnection("reactor-rabbit"))
                .cache();
    }

    @Bean
    Sender sender(Mono<Connection> rabbitMqConnection) {
        return RabbitFlux.createSender(new SenderOptions().connectionMono(rabbitMqConnection));
    }

    @Bean
    Receiver receiver(Mono<Connection> rabbitMqConnection) {
        return RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(rabbitMqConnection));
    }

    @PreDestroy
    public void close() throws Exception {
        Objects.requireNonNull(rabbitMqConnection().block()).close();
    }
}

Тут остановимся подробнее. Нам необходимо объявить “rabbitMqConnection” так как без него подтянется дефолтная конфигурация без nio и кеша, properties при этом берем из стандартных мест (RabbitProperties ссылается как раз на spring.rabbitmq) Так же необходим @PreDestroy метод, для корректного отключения соединения при выключении сервиса.

Теперь у нас все готово для реализации SsePushService:

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitSsePushServiceImpl implements SsePushService {

    private static final String SSE_PUSH_EXCHANGE = "sse-push";

    private final SSEConfiguration sseConfiguration;
    private final RegisteredUsersRedisRepository redisRepository;
    private final Sender sender;
    private final Receiver receiver;

    public void sendMessage(String queueId, String content) {
        String routingKey = SSE_PUSH_EXCHANGE + "-" + queueId;

        OutboundMessage message = new OutboundMessage("", routingKey, content.getBytes());

        sender.declareQueue(
                        QueueSpecification.queue(routingKey).exclusive(false).autoDelete(true))
                .doOnSuccess(q -> log.debug("Queue {} declared", routingKey))
                .thenMany(sender.sendWithPublishConfirms(Mono.fromSupplier(() -> message)))
                .doOnError(e -> log.error("Failed to send message for {}", routingKey, e))
                .subscribe(m -> {
                    if (m.isAck()) {
                        log.debug("Message sent for {}", routingKey);
                    }
                });
    }

    public Flux<String> receiveStream(String queueId, String userId) {
        sendMessage(queueId, "queueId:" + queueId);
        var routingKey = SSE_PUSH_EXCHANGE + "-" + queueId;

        return sender.declareQueue(QueueSpecification.queue(routingKey)
                        .exclusive(false)
                        .autoDelete(true))          // Mono<Queue>
                .doOnNext(q -> log.debug("Queue {} declared", routingKey))
                .thenMany(
                        receiver.consumeAutoAck(routingKey)
                                .doOnSubscribe(sub -> {
                                    sendMessage(queueId, "userId:" + userId);
                                })
                                .doOnNext(msg -> log.debug("Received message to queue {}: {}", routingKey, new String(msg.getBody())))
                                .map(msg -> new String(msg.getBody()))
                )
                .timeout(sseConfiguration.getTimeout(),
                        Flux.just("No new messages during 1 hour, closing Streaming"))
                .doOnCancel(() -> {
                    log.debug("Streaming cancelling for queueId {}", queueId);
                    redisRepository.findByQueueId(queueId).ifPresent(x -> redisRepository.deleteById(x.getId()));
                })
                .doOnError(
                        TimeoutException.class,
                        (timeoutException) -> {
                            log.debug("Queue {} is closed by timeout", queueId);
                            redisRepository.findByQueueId(queueId).ifPresent(x -> redisRepository.deleteById(x.getId()));
                        });
    }
}

Пойдем снизу вверх, метод receiveStream(String queueId, String userId), декларирует очередь, в режиме “autoDelete” для того, чтобы в дальнейшем очереди не копились, а удалялись сразу после исчезновения подписчика, и тут возникает ОЧЕНЬ ТОНКИЙ момент, RabbitMq считает, что активный подписчик – это тот подписчик, который подключен и что-то читает, ну или хотя бы читал хоть что-то в рамках соединения. И в случае, если мы просто подпишемся на пустую очередь – она создастся и сразу удалится...

По этой причине, при использовании авто-удаляемых очередей, в нее всегда надо что-то отправить в момент подписки, и мы выполняем следующее:

                                .doOnSubscribe(sub -> {
                                    sendMessage(queueId, "userId:" + userId);
                                })

Вызываем метод отправки сообщения в очередь. В данном случае это просто информация о связке пользователя с очередью.

Далее, все понятно – задаем таймауты, обработку ошибок и удаление связки при закрытии соединения.

В методе sendMessage(String queueId, String content) все чуть проще, он формирует реактивное представление OutboundMessage для RabbitMQ, и отправляет его в очередь, однако, стоит обратить внимание на декларирование очереди. Тут появляется вопрос, зачем еще раз декларировать нашу существующую очередь? Следует помнить, что эта операция идемпотентна и повторное декларирование не создаст очередь второй раз, а просто получит информацию о текущей

Если вы посмотрите на логи приложения, то сможете заметить, что лог “Queue {} declared” появится только 1 раз при регистрации пользователя.

Остался последний, но не маловажный элемент сервиса Процессор отправки сообщений, и DTO Event:

@Slf4j
@Service
@RequiredArgsConstructor
public class EventProcessor {

    private final SsePushService ssePushService;
    private final RegisteredUsersRedisRepository registeredUsersRedisRepository;

    private final ObjectMapper objectMapper;

    public void process(Event event) {
        Optional.ofNullable(event)
                .map(Event::getUserId)
                .map(registeredUsersRedisRepository::findByUserId)
                .ifPresentOrElse(
                        registeredList -> registeredList.forEach(registeredUser -> {
                            try {
                                var content = objectMapper.writeValueAsString(event);
                                log.info("Message for sending: {}", content);
                                ssePushService.sendMessage(registeredUser.getQueueId(), content);
                            } catch (JsonProcessingException e) {
                                log.warn("Bad push message!");
                            }
                        }),
                        () -> log.info("No queue registered for message: {}", event));
    }

}
@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Event implements Serializable {

    private String userId;
    private String eventType;
    private String message;
    private Map<String, Object> data;

}

Основная функция процессора принять Event DTO, выделить из него userId, найти по нему очереди пользователя и отправить сообщение.

На этом по реализации всё, полный код приложения можно взять тут - https://github.com/TechniXC/sse-service.git

Тестирование

Самый простой вариант запуска приложения – в докере командой:

docker-compose up –build

в корне проекта взятого с GitHub по ссылке выше.

После сборки и поднятия всех контейнеров, увидим следующую картинку.

Рис. 5. Запуск приложения
Рис. 5. Запуск приложения

У сервиса доступен Swagger, но можно тестировать и через Postman. Я отправляю сообщение через сваггер, а получаю в постмане:

Рис.6. Пробуем стримминг
Рис.6. Пробуем стримминг

Как видим, сообщения ходят. Ради интереса можем заглянуть в логи сервиса через Grafana по адресу localhost:3000 (admin/admin):

Рис. 7. Observability - наше всё.
Рис. 7. Observability - наше всё.

Про безопасность

На любом крупном проекте, особенно в фин-техе, мы, как разработчики, всегда сталкиваемся с необходимостью доказывать, что написанное нами решение безопасно и защищено от вмешательства хакерами. Архитектура данного решения, явно показывает, что в данном решении злоумышленник, не сможет никак перехватить данные, предназначенные для клиента без доступа к его авторизационным данным. Так как:

  • Очередь для хранения сообщений перед отправкой, создается непосредственно в момент подписки, и мы не передаем ее идентификатор, а получаем его в ответ от системы уже после подключения.

  • Даже в случае перехвата идентификатора очереди, злоумышленник не сможет к ней подключиться, так как такого функционала просто нет.

  • После закрытия приложения или же ручного обрыва соединения, очередь сразу закрывается, а сообщения, предназначенные для пользователя, будут уничтожаться. (Или возможна отправка в запасное хранилище или топик Kafka из которого сообщения будут вычитаны при подключении клиента в приложении)

  • В случае перерегистрации клиента, или перехода на новое устройство – сообщения будут доставляться на все подписанные в данный момент клиентские приложения.

  • Мы не храним сообщения для клиентов в памяти, а значит и перехватить их через уязвимости предоставляющие memory dump сервиса крайне сложно.

Заключение

В статье мы рассмотрели, как технология Server-Sent Events помогает организовать эффективную доставку уведомлений в клиентские приложения.

Реализация на Java +  Spring + WebFluxне составляет большего труда и имеет большие возможности для расширения функционала. Инфраструктура для решения более чем доступна. Добавим к этому простоту масштабируемости:

  • Увеличение количества под сервиса

  • Добавление нод в кластер RabbitMQ

  • Добавление нод в кластер Redis

И мы получаем полноценное in-house энтерпрайз решение для доставки любых видов пушей и уведомлений на фронт-приложения.

Весь код к этому докладу доступен в моем гитхабе. Если появятся вопросы, пишите в комментах, либо на почтуX или телеграм.

Комментарии (7)


  1. Seriouson
    16.10.2025 05:33

    Никита, привет!

    Как всегда, твой доклад был очень интересным и познавательным — спасибо за информацию!

    Хочу добавить небольшое уточнение по теме: Server-Sent Events (SSE) — это действительно один из способов взаимодействия сервера и клиента, но в контексте push-уведомлений для мобильных приложений есть нюансы.

    Де-факто стандартом для push-уведомлений остаются:

    • FCM (Firebase Cloud Messaging) — для устройств с Google-сервисами,

    • HMS (Huawei Mobile Services) — для устройств Huawei (без Google-сервисов).

    Кастомные решения работают нестабильно из-за оптимизации ОС — система может завершать фоновые процессы для экономии ресурсов.

    Для ИБ: можно добавить асимметричное шифрование (например, RSA). Данные шифруются перед отправкой, передаются через FCM/HMS в зашифрованном виде (например, в base64), а расшифровка происходит уже на клиенте с помощью второго ключа.


  1. headliner1985
    16.10.2025 05:33

    От Web flux наоборот стоит отказываться, ребята из нетфликс, которые были его основными идеологами специально ушли от него на виртуальные потоки и перестали контрибьютить в эту технологию из-за сложности разработки, отладки и поддержки кода на нем.


    1. pkokoshnikov
      16.10.2025 05:33

      +1 выпилили у себя webflux в нагруженном апи и заменили на loom. Все работает отлично (java 24)

      Упрощает разработку, поддержку, чтение кода. Синхронный код и аналитики и тестировщики понимают чаще всего.


      1. TechniXC Автор
        16.10.2025 05:33

        Уже есть LTS 25 c отшлейфованными Virtual Threa :)


        1. pkokoshnikov
          16.10.2025 05:33

          Образов ещё не подвезли с 25 java, ждём


    1. TechniXC Автор
      16.10.2025 05:33

      Если тема интересна, могу сделать исследование в контексте того же SSE c Virtual Threads vs WebFlux.


  1. maratkoRuEkb
    16.10.2025 05:33

    На рисунке наглядно представлено, как Polling, WebSocket и SSE нагружают сеть передаваемыми данными просто при простое «туннеля». А поскольку, мы хотим систему, работающую в энтерпрайз окружении, где каждый лишний байт, помноженный на милионное количество клиентов, может выстрелить нам в ногу, выбираем решение, наименее сильно нагружающее сеть – SSE.

    Я конечно извиняюсь но разве не WebSockets решение наименее нагружающее сеть?

    Или все же SSE был выбран потому что он проще в применении?