Всем привет! В этой статье я бы хотел затронуть тему горизонтального масштабирования SpringBoot-приложений, использующих вебсокеты. Основная особенность таких приложений - наличие состояния (state). Вебсокеты для работы используют постоянное TCP-соединение, собственно оно и является этим состоянием. А наличие состояния обычно вызывает проблемы при масштабировании. 

Stateless-сервисам все равно на какой из множества инстансов попал очередной запрос, или кто именно вычитал сообщение из очереди. Но когда на серверной стороне возникает понятие клиентской сессии, все запросы, связанные с этим клиентом, должен обрабатывать один и тот же инстанс. Либо должна быть возможность передать эту сессию на другой инстанс. Сложность реализации этого требования варьируется от "добавить одну строчку в конфиг балансировщика" до "написать подсистему мониторинга топологии сервиса и роутинга запросов, сложность которой может превышать сложность основного приложения". К счастью, для решения стандартных задач, как правило, существуют стандартные инструменты. Давайте посмотрим, что за инструменты используются при масштабировании вебсокетных приложений, написанных на SpringBoot.

Постановка задачи

Есть SpringBoot-приложение, в нем есть контроллер, который обслуживает WebSocket-соединения и использует протокол STOMP. Это приложение деплоится в Kubernetes-кластер. Клиент по WebSocket подключается к кластеру и подписывается на пользовательскую очередь. Наша задача - сделать так, чтобы в кластере было развернуто несколько инстансов SpringBoot-приложения, балансировка между которыми осуществляется стандартными средствами. Из документации по SpringBoot WebSocket следует, что для решения этой задачи необходимо использовать внешний STOMP-брокер сообщений, в качестве такого брокера будем использовать RabbitMQ.

Пока, наверное, не очень понятно, а что же мы вообще делаем, что за проблему решаем и откуда берется брокер сообщений. Поэтому давайте разберем все это подробнее.

WebSocket

WebSocket - протокол связи поверх TCP-соединения. Изначально он предполагался для использования между браузером и веб-сервером. WebSocket решал проблему того, что HTTP-сервер не мог по собственной инициативе послать данные браузеру - сначала клиент посылает запрос, потом сервер возвращает ответ, и только так. Это неудобно, когда надо быстро реагировать на события, которые приходят на сервер - чаты, нотификации, онлайн-игры. У WebSocket'а другая парадигма работы - между клиентом и сервером устанавливается постоянное TCP-соединение, по которому обе стороны могут обмениваться сообщениями произвольного формата в любое время.

Механизм установки WebSocket-соединения выглядит так: клиент посылает HTTP-запрос на endpoint, который обслуживает вебсокеты, с заголовками, которые сообщают, что соединение надо "прокачать" до вебсокетного. В ответ приходит статус HTTP 101 и заголовки, подтверждающие, что соединение "прокачано". После этого браузер и сервер используют уже открытое TCP-соединение для обмена сообщениями.

Рис. 2 Установка WebSocket-соединения
Рис. 1 Установка WebSocket-соединения

STOMP

Протокол WebSocket не накладывает никаких ограничений на то, что именно будут передавать друг другу клиент и сервер. Это, с одной стороны, дает много свободы, но с другой - заставляет каждое приложение изобретать собственные протоколы взаимодействия. Поэтому поверх вебсокетов придуманы различные суб-протоколы, более явно задающие формат общения. Один из таких протоколов - STOMP (строго говоря, ему совсем не обязательно работать поверх WebSocket'ов). Протокол реализует абстракции, характерные для асинхронного обмена сообщений, прежде всего, отправка сообщений в очереди и подписка на сообщения из очереди. Кроме того, в протоколе присутствуют механизмы поддержки транзакций, подтверждения получения сообщений, обмена heartbeat'ами и т.п.

Примеры сообщений

Клиент подписывается на очередь /queue/foo

SUBSCRIBE
id:0
destination:/queue/foo

^@

Сервер отправляет сообщение в очередь /queue/foo

MESSAGE
subscription:0
message-id:001
destination:/queue/foo
content-type:text/plain

Hello World!
^@

В приведенных примерах ^@ - NULL-символ (ASCII NULL).

Spring WebSocket и пользовательские очереди

В Spring'е есть поддержка WebSocket и STOMP поверх него. Spring реализует подмножество протокола STOMP в части подписок на очереди и отправки сообщений. Кроме этого, клиенты SpringBoot-приложений могут подписаться на так называемые пользовательские очереди (user destination). Внешне они выглядят как обычные очереди с префиксом /user. Но подписавшись на такую очередь пользователь будет получать только свои сообщения. То есть, если и Алиса, и Боб подписываются на очередь /user/topic/foo, то Алиса увидит только сообщения, адресованные ей, а Боб - адресованные ему. Для этого в Spring используется специальный метод отправки сообщений, в котором указывается имя пользователя-получателя. Это имя должно соответствовать имени пользователя, аутентифицированного стандартными средствами Spring Security (Principal.getName()).

private final SimpMessagingTemplate template;

...

template.convertAndSend("/topic/broadcast", "hi all");
// vs
template.convertAndSendToUser("bob", "/topic/foo", "hi bob");

По своей сути, эта возможность - синтаксический сахар, позволяющий скрыть от клиента уникальные очереди, ассоциированные с пользователями. В противном случае приходилось бы передавать на клиента информацию, достаточную для идентификации пользователя, чтобы потом ее использовать для создания подписки на пользовательскую очередь, а так этот шаг можно пропустить. Но в реальности на бэкенде происходит постоянный маппинг "красивых" именований очередей во внутреннее представление, которое знает об аутентифицированных пользователях и их подписках.

Как видно из описания, в работе с пользовательскими очередями важен момент аутентификации пользователей. Аутентификация происходит при обработке первого HTTP-запроса на установку WebSocket-соединения. Все остальные сообщения в рамках этой сессии будут считаться полученными от этого аутентифицированного пользователя. В частности, это означает, что не используется функциональность STOMP-сообщения CONNECT, в котором можно передавать логин и пароль.

Проблема с несколькими инстансами

Теперь давайте разберемся, что плохого происходит, когда мы пытаемся масштабировать WebSocket-приложения.

Рис. 2 Проблема, возникающая с несколькими WS-компонентами
Рис. 2 Проблема, возникающая с несколькими WS-компонентами

Клиент отправляет начальный запрос, при необходимости аутентифицируется, и создаёт WebSocket-сессию на backend 1 (красные стрелочки). В результате клиент устанавливает постоянное TCP-соединение с backend 1, по которому они обмениваются сообщениями. Конечно, на самом деле это TCP-соединение будет проксировать service, а в реальном мире перед ним ещё будет стоять кубернетовский ингресс, гейтвей и прочее, но в данном контексте это не важно. Затем по установленному соединению клиент подписывается на STOMP'овскую очередь.

Через какое-то время от внешнего источника поступает сообщение для этого клиента (синие стрелочки). Но так как сообщения из очереди вычитываются случайным образом, оно попадает на backend 2. И у нас возникает проблема - как нам доставить это сообщение клиенту? Единственный способ - воспользоваться открытым TCP-соединением клиента, но оно ведёт на backend 1. Значит нам нужен способ передать наше сообщение с backend 2 на backend 1. Внешний брокер как раз и нужен для решения этой проблемы.

Роль брокера сообщений в масштабировании

Рис. 3 Получение сообщений через RabbitMQ
Рис. 3 Получение сообщений через RabbitMQ

Когда мы используем внешний брокер сообщений, то SpringBoot фактически помимо своей внутренней логики просто дополнительно ретранслирует некоторые команды этому брокеру. Получил SUBSCRIBE - переслал в брокер. Брокер создал у себя очередь, на которую подписал этот бэкенд. Теперь бэкенд будет получать сообщения из той очереди и пересылать их клиенту, которого он обслуживает.

Дисклеймер по RabbitMQ

Здесь и далее используется предельное упрощение в описании работы с RabbitMQ: положил в очередь, прочитал из очереди. Как будто нет exchange'ей, routing'а и всего такого.

Если же сервис SpringBoot хочет послать сообщение в очередь, то он, опять же, пересылает его в брокер, брокер видит подписанные бэкенды и доставляет сообщения им. А бэкенды уже пересылают данные конечным клиентам. В общем, все достаточно просто, когда мы говорим про обычные очереди. А вот с пользовательскими очередями есть нюанс.

Пользовательские очереди и внешний брокер

Когда я упоминал про пользовательские очереди, я отделался абстрактной фразой, что внутри Spring'а хранится маппинг аутентифицированных пользователей и их уникальных очередей. Но при работе с внешим брокером нам нужны эти самые уникальные названия очередей. А где их нам взять, если бэкенды не знают друг про друга? Я до сих пор не понимаю, зачем понадобилось что-то более сложное, чем конкатенация имени пользователя с именем очереди, но Spring пошел другим путем...

Путь боли в Spring WebSocket User Destination

  1. Когда от клиента приходит сообщение CONNECT, сервис создает STOMP-сессию. Пользователь добавляется в реестр пользователей, туда же добавляется информация о созданной сессии;

  2. От клиента приходит сообщение SUBSCRIBE на пользовательскую очередь /user/queue/foo. Spring отбрасывает префикс /user и дописывает суффикс -user{sessionId}. То есть, реально пользователь подписывается на очередь /queue/foo-user12345. Информация о подписке также добавляется в реестр пользователей и связывается с сессией;

  3. Когда Spring отправляет сообщение в очередь /queue/foo для пользователя alice, то происходит следующее:

    1. По имени пользователя получается список его открытых сессий;

    2. Для каждой открытой сессии генерится название /queue/foo-user12345, /queue/foo-user54321 и тп. В каждую такую очередь будет отправлено сообщение. Выглядит как баг, потому что пользователь мог подписаться на очередь /user/queue/foo в сессии 12345 и не подписываться на нее в сессии 54321. И эта информация есть в момент подготовки списка очередей для рассылки, поэтому мы можем не посылать лишнее сообщение (оно все равно никуда не уйдёт), но Spring делает именно так.

Этот алгоритм очевидно не работает в случае нескольких экземляров приложения, потому что соседние инстансы не знают идентификаторов сессий друг друга. И Spring предлагает 2 механизма, которые должны позволить посылать сообщения пользователям, отсутствующим на данном бэкенде.

  1. Обмен пользовательскими реестрами (userRegistryBroadcast). Spring позволяет определить общую системную broadcast-очередь, через которую бэкенды могут обмениваться реестрами. Обмен реестрами происходит с определенной периодичностью (по умолчанию, раз в 10 секунд), поэтому пользователь, который только подключился, может пропустить часть сообщений. Это может быть довольно неприятно, поскольку как раз на старте сессии обычно происходят какие-то важные события жизненного цикла подключенного пользователя;

  2. Очередь для неизвестных адресатов (userDestinationBroadcast). Кроме этого, Spring позволяет задать общую системную broadcast-очередь, куда будут отправляться сообщения пользователям, про которых не знает текущий бэкенд (если на шаге 3.1 мы получили список сессий пользователя, и он оказался пустым). На эту очередь подписываются все бэкенды, так что в итоге сообщение дойдет до того, кто знает про подписки пользователя.

Диалог с ИИ:

— Алиса, есть два стула...

— Оба твои!

Вообще, когда для решения одной задачи предлагаются два механизма - это тревожный знак. Скорее всего, оба работают плохо. И перед нами прекрасная иллюстрация для данного утверждения.

  1. Если мы используем обмен реестрами пользователей, то у нас возникает задержка при обмене. В это время пользователь будет терять сообщения;

  2. Если мы используем очередь неизвестных адресатов, то мы увеличиваем трафик между бэкендами и брокером, а также итоговую задержку при отправке сообщений. К тому же, если у нас допускается наличие нескольких сессий от одного пользователя, то может произойти следующий сценарий: у пользователя есть две сессии, которые попали на разные бэкенды. В этих двух сессиях есть подписки на одну и ту же пользовательскую очередь. Один из бэкендов собирается отправить сообщение в эту очередь. Он проверяет свой список сессий, видит, что там одна есть, и отправляет сообщение только в нее. В очередь неизвестных адресатов ничего не попадет (туда попадет сообщение, только если сессий не найдено вообще), соответственно, во вторую сессию ничего отправлено не будет;

  3. Если мы используем оба механизма сразу, то у нас может воспроизвестись проблема номер 2 до того, как реестры пользователей синхронизировались. А еще может произойти дублирование сообщений: пользователь подписался на очередь на бэкенде 1, реестры еще не успели синхронизироваться, бэкенд 2 решает отправить сообщение клиенту. Бэкенд 2 не находит у себя клиентских сессий и отправляет сообщение в очередь неизвестных адресатов. В этот момент реестры, наконец, синхронизируются. Оба бэкенда получают сообщение из очереди неизвестных адресатов и проверяют свои реестры пользователей. Но теперь у каждого из них есть информация о нужной клиентской сессии, каждый из них выполнит для нее отправку сообщения (полный путь сообщения такой: создаем сообщение > не знаем, куда отправить, поэтому кидаем в очередь неизвестных адресатов > вычитываем сообщение из очереди неизвестных адресатов (x2) > пересылаем сообщение в очередь, связанную с сессией (x2) > получаем сообщение из очереди, связанной с сессией (x2) > отправляем сообщение клиенту (x2)). Это тоже выглядит как баг, поскольку в случае получения сообщения из очереди неизвестных адресатов логично проверять только подписки самого бэкенда (игнорировать те, что были получены при обмене реестрами).

Ну, и чтобы не вставать 2 раза, еще одна фича (это действительно скорее фича, чем баг) Spring'овой реализации STOMP'а. Так как STOMP - он вообще про messaging, то там операции несихронные. Почему это важно - часто хочется после получения новой подписки от клиента стартовать какой-то workflow. Ну, например, подключился пользователь к чату, давайте ему пошлем последние 50 сообщений. Spring реализует публикацию событий STOMP-сессии через ApplicationEventPublisher. В частности, мы можем подписаться на событие SessionSubscribedEvent. Но когда мы его получаем, это значит только то, что сервис обработал событие SUBSCRIBE и переслал его в брокер. Но брокер-то его еще мог не обработать. В частности, он еще мог не успеть создать очередь для нового подписчика. При этом, с точки зрения приложения, сессия и подписка уже есть, и в нее можно смело кидать эти самые 50 сообщений. Такой подход приводит к тому, что сообщения, посланные сразу после старта подписки, иногда будут теряться.

Инфа 100%. Пишем тестовое приложение

Чтобы вам не приходилось верить мне на слово, что это все работает именно так, напишем тестовое приложение. Оно будет состоять из 3 компонентов: клиент, бэкенд и генератор. Клиент стартует несколько клиентских сессий. Каждая такая сессия устанавливает устанавливает соединение с бэкендом и подписывается на пользовательскую очередь. Бэкенд обрабатывает событие подписки и сообщает генератору, что можно посылать данные для клиента. Генератор создаёт сообщения и складывает их в собственную очередь. Бэкенды вычитывают сообщения из очереди генератора и посылают их клиенту. Клиент сохраняет их для последующего анализа пропусков и дублей.

Кроме трёх Java-компонентов нам понадобится RabbitMQ в качестве брокера для STOMP и в качестве транспорта между бэкендом и генератором. Бэкенды мы будем тестировать в режиме, когда используется и обмен реестрами, и очередь неизвестных адресатов. Обернем бэкенды в сервис Kubernetes и установим желаемое число реплик в 3.

Рис. 4 Схема тестового приложения
Рис. 4 Схема тестового приложения

Синими стрелочками на диаграмме показаны потоки обмена данными с generator'ом - в него летят события по регистрации новых клиентов, от него летят сообщения для клиентов. Для этого используются AMQP-очереди RabbitMQ - этот транспорт выбран просто потому что нужен был хоть какой-то способ общения между бэкендами и генератором, он никак не влияет на ту функциональность, которую собираемся проверять. AMQP здесь можно заменить чем угодно, например, REST-интерфейсами в обе стороны.

Так как мы работает с пользовательскими очередями, нам надо как-то аутентифицировать клиента. Для простоты имя пользователя просто будет передаваться в HTTP-заголовке при установке WebSocket-соединения. В коде бэкенда мы реализуем Security-фильтр, который будет обрабатывать этот заголовок.

Еще один важный момент - в отличие от встроенного брокера, который не обращает внимание на имена очередей, RabbitMQ задает четкие правила по тому, как он их будет интерпретировать. Мы будем использовать /topic'и, которые создают по умолчанию неперсистентные очереди.

@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  @Value("${application.stomp.broker.hostname:localhost}")
  private String stompBrokerHost;

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableStompBrokerRelay("/topic")
            .setRelayHost(stompBrokerHost)
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest")
            .setSystemLogin("guest")
            .setSystemPasscode("guest")
            .setUserDestinationBroadcast("/topic/unresolved-user-destination")
            .setUserRegistryBroadcast("/topic/simp-user-registry")
            ;
    config.setApplicationDestinationPrefixes("/app");
    config.setUserRegistryOrder(Ordered.LOWEST_PRECEDENCE - 1);
  }

  @Bean
  SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
    AbstractPreAuthenticatedProcessingFilter filter = new AbstractPreAuthenticatedProcessingFilter() {
      @Override
      protected Object getPreAuthenticatedPrincipal(HttpServletRequest request) {
          String username = request.getHeader("Username");
          if(username != null) {
              return new UserPrincipal(username);
          } else {
              throw new PreAuthenticatedCredentialsNotFoundException("Could not find 'Username' header");
          }
      }

      @Override
      protected Object getPreAuthenticatedCredentials(HttpServletRequest request) {
          return null;
      }
    };
    filter.setAuthenticationManager(authentication -> authentication);
    http.addFilterBefore(filter, UsernamePasswordAuthenticationFilter.class);

    return http
            .sessionManagement(sessionManagement -> sessionManagement.sessionCreationPolicy(SessionCreationPolicy.NEVER))
            .build();
  }
}

Вот так клиенты подписываются на пользовательские очереди:

public CompletableFuture<StompSession.Subscription> subscribe() {
    WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
    headers.add("Username", username);

    return stompClient.connectAsync("ws://localhost:8080/ws", headers, new StompSessionHandlerAdapter() {}, new Object[0])
            .thenApply(session -> session.subscribe("/user/topic/messages", new MessageHandler());
}

private static class MessageHandler extends StompSessionHandlerAdapter {
    @Override
    @SneakyThrows
    public void handleFrame(StompHeaders headers, Object payload) {
        log.debug("{} handles '{}'", username, payload);
        // Parse and save payload
    }
}

А вот так бэкенды отправляют сообщения клиентам (читают из AMQP-очереди и отправляют в STOMP):

private final SimpMessagingTemplate template;
private final ObjectMapper mapper;

@RabbitListener(queues = "messages")
public void listen(MessageDto message) throws JsonProcessingException {
    log.debug("Handle {}", message);
    template.convertAndSendToUser(message.receiver(), "/topic/messages", mapper.writeValueAsString(message));
}

Аномалии доставки сообщений

Для воспроизведения проблем попробуем следующий сценарий: создадим 100 клиентов и настроим генератор, чтобы каждый клиент получил 100 сообщений. В результате нескольких прогонов, проверка сообщений, полученных клиентами, должна хотя бы раз показать такую картину:

ID клиента

Число полученных сообщений

client-4

89

client-6

93

client-77

100

client-81

100

client-74

100

...

...

client-61

100

client-85

104

client-12

106

client-19

124

Как видно, большая часть клиентов действительно получила ровно 100 сообщений. Но ещё у нас есть клиенты, которые получили меньше сообщений, чем должны, и клиенты, которые получили больше сообщений чем должны.

Если включить трассировку на RabbitMQ, то станет понятно, что они пропущены как раз из-за того, что сервер ещё не успел создать очередь для хранения сообщений в момент их публикации.

Вот так в логах выглядит пропущенное сообщение (см. Routed queues):

2023-07-13T19:59:17.191+00:00: Message published
  
Node: rabbit@ff11d633901d
Connection: 172.17.0.1:59146 -> 172.17.0.3:61613
Virtual host: /
User: guest
Channel: 1
Exchange: amq.topic
Routing keys: [<<"messages-userc9fb1ed9b9504f12876622b9235de9a5">>]
**Routed queues: []**
Properties: [{<<"headers">>,table,
[{<<"content-length">>,longstr,<<"189">>},
{<<"simpOrigDestination">>,longstr,
<<"/user/topic/messages">>},
{<<"redelivered">>,longstr,<<"false">>}]},
{<<"contenttype">>,longstr,<<"text/plain;charset=UTF-8">>}]

Payload:
{"receiver":"client-6","createdAt":"2023-07-13T19:59:17.151756Z","sentAt":"2023-07-13T19:59:17.159358518Z","sentFrom":"stomp-cluster-backend-deployment-6c68478d58-6zz4g","text":"message 2"}

А вот так - доставленное:

2023-07-13T19:59:17.195+00:00: Message published

Node: rabbit@ff11d633901d
Connection: 172.17.0.1:59146 -> 172.17.0.3:61613
Virtual host: /
User: guest
Channel: 1
Exchange: amq.topic
Routing keys:  [<<"messages-userc9fb1ed9b9504f12876622b9235de9a5">>]
**Routed queues: [<<"stomp-subscription-78TySI1tzPQC8fmZo7siA">>]**
Properties: [{<<"headers">>,table,
[{<<"content-length">>,longstr,<<"189">>},
{<<"simpOrigDestination">>,longstr,
<<"/user/topic/messages">>},
{<<"redelivered">>,longstr,<<"false">>}]},
{<<"contenttype">>,longstr,<<"text/plain;charset=UTF-8">>}]

Payload:
{"receiver":"client-6","createdAt":"2023-07-13T19:59:17.153455Z","sentAt":"2023-07-13T19:59:17.161305082Z","sentFrom":"stomp-cluster-backend-deployment-6c68478d58-6zz4g","text":"message 7"}

Аналогично, с лишними сообщениями - они получены из очереди неизвестных адресатов, а после этого отправлены в пользовательскую очередь сразу несколькими инстансами.

Оверхед при использовании брокера сообщений

Теперь давайте оценим оверхед, который возникает от очереди неизвестных адресатов (и от внешнего брокера в принципе). Переключим бэкенды в режим, когда используется эта очередь и не используется обмен реестрами, а потом повторим тест, мониторя состояние очередей RabbitMQ.

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
  config.enableStompBrokerRelay("/topic")
          .setRelayHost(stompBrokerHost)
          .setRelayPort(61613)
          .setClientLogin("guest")
          .setClientPasscode("guest")
          .setSystemLogin("guest")
          .setSystemPasscode("guest")
          .setUserDestinationBroadcast("/topic/unresolved-user-destination")
//          .setUserRegistryBroadcast("/topic/simp-user-registry")
          ;

Нас будет интересовать 2 метрики:

  • сколько сообщений попали в STOMP-очереди;

  • сколько сообщений брокер отправил бэкендам.

Сумма этих двух метрик покажет нам общее число сообщений, прошедших через брокер.

Что мы ожидаем? В 1/3 случаев сообщение сразу попадет на бэкенд, обслуживающий клиента. Такие сообщения не обязательно посылать в брокер в силу особенностей реализации пользовательских очередей. В 2/3 случаев бэкендам придётся воспользоваться очередью неизвестных адресатов, которая доставит сообщения на все бэкенды.

Но опять есть нюанс. Судя по коду, при использовании внешнего брокера, сообщение всегда проходит через него, независимо от того, работаем мы с пользовательскими очередями или обычными. То есть, даже если бэкенд сам обслуживает адресата, он отправит сообщение в очередь брокера, сам же из нее прочитает то же сообщение, и только потом перешлет клиенту.

Итого, если у нас есть n реплик, и мы посылаем M сообщений, то через брокер пройдут:

  • M * (1/n входящих + 1/n исходящих) сообщений, которые сразу попали на нужный бэкенд;

  • M * ((n-1) / n входящих + (n-1) / n * n исходящих) сообщений, прошедших через очередь неизвестных адресатов;

  • M * ((n - 1) / n входящих + (n - 1) / n исходящих) сообщений, который прошли через брокер после того, как из очереди неизвестных адресатов попали на нужный бэкенд.

M\cdot(\frac{2}{n}+\frac{n-1}{n}+(n-1)+2\cdot\frac{n-1}{n})=M\cdot(2+n-\frac{1}{n})

Для n=3 имеем 466% сообщений, проходящих через брокер. Давайте посмотрим результаты (посылаем 100 сообщения для 100 клиентов):

Сообщений получено

16982

Сообщений отправлено

30946

Сообщений всего

47928

Как видно, бэкенды действительно в любом случае используют брокер для отправки сообщений. Однако, если мы говорим про оверхед относительно ситуации, когда используем обмен реестрами, то он составляет n - 1 / n, поскольку каждое сообщение в любом случае пройдет через брокер в обоих направлениях.

Заключение

На этом все. Мы исследовали ситуацию, когда нам надо масштабировать бэкенды, обслуживающие WebSocket-соединения с протоколом STOMP. Разобрали, зачем нужен брокер сообщений и какие проблемы возникают при использовании пользовательских очередей. Посмотрели, какие способы решения проблем с пользовательскими очередями предлагает Spring и их слабые стороны. В завершении, несколько практических рекомендаций:

  1. Помните о том, что WebSocket - в принципе не очень надёжный транспорт. Длительные соединения, проходящие через несколько прокси-узлов, склонны разрываться, поэтому вам в любом случае необходимо иметь дополнительные механизмы повышения надёжности. Простейший - схема, когда у вас есть REST-интерфейс для получения полного состояния клиентской сессии и WebSocket для получения изменений. Если вы подозреваете, что часть сообщений были потеряны, обновите состояние целиком. Просто знайте о том, что часть проблем вообще может быть не связана с транспортом. Такие проблемы достаточно редки даже под нагрузкой, но они могут случаться. Особенно стоит обратить внимание, что при определенных настройках вы можете не только терять сообщения, но и получать лишние;

  2. Очередь неизвестных адресатов генерирует на брокер трафик, пропорциональный числу ваших бэкендов. Если у вас используется кластер RabbitMQ, добавьте туда ещё трафик на синхронизацию узлов кластера. Поэтому использовать только этот механизм синхронизации довольно накладно;

  3. Если вам не критична начальная задержка синхронизации реестров пользователей, то используйте обмен реестрами и реализуйте задержку перед использованием WebSocket-соединения (не стартуйте flow клиента, пока не пройдет таймаут для обмена реестрами пользователей);

  4. Если для вас начальная задержка критична, то попробуйте использовать и очередь неизвестных адресатов, и обмен реестрами, но помните про потенциальные дубли сообщений;

  5. Помните, что вы можете и не использовать Spring'овый STOMP даже в Spring-приложениях. Вы можете написать свой протокол поверх чистого WebSocket. Кроме того, и используя STOMP вы можете написать свой более простой аналог пользовательских очередей - достаточно просто использовать в имени очереди какой-то уникальный атрибут пользователя;

  6. Spring'овая реализация STOMP-протокола для вебсокетов... Ну, не блещет. Она безусловно работает, и весьма неплохо, но внутри нее есть простор для оптимизаций, и часть принятых решений вызывает вопросы.

Исходники

Исходный код тестового приложения и инструкции по запуску можно посмотреть тут:

https://github.com/brastak/stomp-cluster

Комментарии (2)


  1. olku
    18.07.2023 17:37
    +1

    Смущает, что проблему балансировки TCP вместо инкапсуляции на четвертом, мы перекладываем на седьмой. Есть ли изящное решение, которое будет удерживать вебсокет и во время деплоя новой версии приложения?


    1. brastak Автор
      18.07.2023 17:37

      Я таких не знаю. Подобное решение можно придумать: делаем "умный" прокси, который "прячет" от клиента смену бэкенда, и сам переподключается к новым. Пока мы работаем с обычными очередями, у нас будут только две проблемы: задержки при переключении бэкендов и проблемы при обновлении самого прокси. Проблемы с потенциальной потерей сообщений можно решить на уровне типа очередей брокера (использовать гарантию at least once). С пользовательскими очередями будет сложнее, но если мы используем внешнюю схему аутентификации типа OAuth2, то тоже все будет работать. Но готовых решений такого типа я не встречал.

      Просто проблема, которую я рассматриваю в статье, не в том, что мы рвем TCP-соединение с конкретным бэком при обновлении. У нас есть 2 независимых API: с клиентом вебсокет-сервера и с генератором событий. А так как API независимы, мы не можем на маршрутизаторах достичь того, чтобы сообщение автоматически попадало на нужный бэкенд. Отсюда и необходимость в брокере.

      Ну, по крайней мере, в общем случае не можем такого достичь. Когда это сделать получается, рассматриваемой проблемы по сути и нет: если сообщение сразу падает на нужный бэкенд, он его может сразу переслать клиенту, брокер просто не нужен (например, если оба API основаны на REST и используют одинаковые балансировшики). Но что делать, когда, как например в рассмотренном примере, за балансировку клиентов отвечает сервис Kubernetes, а за балансировку генераторов - консюмеры AMQP-очередей?