Привет, коллеги! Так получилось, что наше приложение написано на java stack, но хостится в Azure. И мы пытаемся по максимуму использовать менеджмент сервисы клауд провайдера.

Один из них это Azure Service Bus и сегодня я хочу рассказать про особенности его использовать в обычном Spring Boot приложении.

Если вы хотите прочитать про граблиособенности — листайте в конец статьи

Что такое Azure Service Bus


Пару слов про Azure Service Bus — это облачный брокер сообщений (облачная замена RabbitMQ, ActiveMQ). Поддерживает очереди (сообщение доставляется одному получателю) и топики (механизм publish/subscribe) — более детально тут

Заявлена поддержка:

  1. Ordered messages — в документации написано что это FIFO, НО реализовано это на концепции message sessions — группы сообщений, а не всей очереди. Если вам надо гарантировать очередность сообщений, то вы объединяете сообщения в группу, и вот уже сообщения в группе будут доставляться как FIFO. Так что Azure Service Bus Queue не есть FIFO — оно доставляет ваши сообщения рандомно, как ему удобно
  2. Dead-letter queue — тут все просто, не смогли успешно доставить сообщение через N попыток или период времени — переместили в DLQ
  3. Scheduled delivery — можно установить задержку перед доставкой
  4. Message deferral — скрывает сообщения в очереди, сообщение не будет доставлено автоматически, но его можно получить по ID. Надо где-то хранить этот ID

Как интегрироваться с Azure Service Bus


Azure Service Bus поддерживает протокол AMQP 1.0 это означает, что он не совмести с RabbitMQ клиентами т.к. кролик использует AMQP 0.9.1

Единственный «стандартный» клиент который может работать с Service Bus — Apache Qpid.

Существует 3 способа как подружить ваше Spring Boot приложение с Service Bus:

  1. JMS + QPID — про него пишут в официальной документации, но оно не работает — QPID не переподключается к серверу если сервер закрыл соединение — баг.
    Решение проблемы с timeout для producer — отключить кеш — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — майки напили кучку своих интеграций, ссылка на репозиторий тут. Ну и собсвенно интеграция для Service Bus
    Мы используем это решение (версия 1.2.6) — работает без нареканий, под капотом оно использует azure service bus java sdk.

    Интеграция через Spring Integration позволяет использовать только базовый набор функциональности — отправить и получить сообщение, «Scheduled delivery» и «Message deferral» недоступны.

    Рекомендую ознакомится с кодом sdk, практически весь код сосредоточен в классе MessageAndSessionPump
  3. Голый azure service bus java sdk — писать свой велосипед, но есть доступ ко всем фичам сервиса

Интеграция через Spring Cloud — Azure Service Bus


Остановлюсь на этом способе более детально и расскажу про особенности использования
Пример приложения есть в официальном репозитории поэтому дублировать код нет смысла — репозиторий с примером тут.

Т.к. это Spring Integration Messaging то все сводится к Channel, MessageHandler, MessagingGateway, ServiceActivator.

И еще есть ServiceBusQueueTemplate.

Отправка сообщений


У нас должен быть Channel в который мы пишем сообщение которое хотим отправить, на другом конце сидит MessageHandler который его отправляет в Service Bus.

В качестве MessagHandler выступает com.microsoft.azure.spring.integration.core.DefaultMessageHandler — это и есть конектор к внешнему сервису.

Как его привязать к каналу? — добавляем аннотацию — @ServiceActivator(inputChannel = OUTPUT_CHANNEL) и теперь наш MessagHandler слушает канал OUTPUT_CHANNEL.

Далее в канал надо как-то написать наше сообщение — тут снова магия спринга — объявляем MessagingGateway и биндим его к каналу по имени.

Кусочек из примера:

@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}

Bот и все: Gateway -> Channel -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter.

В коде остается заинжектить наш gateway и вызвать метод send.

Я не просто так упомянул ServiceBusMessageConverter в цепочке вызовов — если вы захотите добавить кастомные заголовки (например CORRELATION_ID) в сообщение это то самое место где их надо переложить из org.springframework.messaging.MessageHeaders в azure message.
Специальный метод setCustomHeaders.

В этом случае ваш gateway будет выглядеть как-то так:

@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}

Получение сообщений


Хорошо, мы умеем отправлять сообщения, как их теперь получить?

Тут все так же — MessageProducer -> Channel -> Handler

В качестве MessageProducer выcтупает com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter — это наш конектор во внешний сервис. Внутри все тот же ServiceBusQueueTemplate с ServiceBusMessageConverter где можно прочитать кастомные хидеры и переложить их в spring integration message.

Канал в него уже устанавливает руками:

@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}

А вот сам Handler атачится к каналу посредством @ServiceActivator.

@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......

Можно и сразу получить строку:

@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......

Вы могли заметить странный параметр Checkpointer checkpointer он используется для ручного подтверждения обработки сообщения
Если при создании ServiceBusQueueInboundChannelAdapter вы установили CheckpointMode.MANUAL вы должны сами отправить acknowledge для сообщения.

Если используете CheckpointMode.RECORD то подтверждение будет отправлено автоматически — детали в коде ServiceBusQueueTemplate.

Особенности использования


Итак, список «граблей» и «фишек» по которым мы уже прошли.

ReceiveMode.PEEKLOCK


Azure Service Bus поддерживает режим PEEKLOCK — консьюмер берет сообщение, оно лочится в service bus, никому недоступно в течении определенного времени (lock duration), но не удаляется из него. Если за отведенное время консьюмер не прислал подтверждение обработки — success/abandon или не продлил лок — сообщение считается вновь доступным и будет осуществлена новая попытка доставки.

Интерестно что abandon — просто сбрасывает лок и сообщение становится достпуным мгновенно для re-delivery.

ServiceBusQueueTemplate по умолчанию создает QueueClient в режиме ReceiveMode.PEEKLOCK.

Если в нашем обработчике вылетает unhandled exception — никакой acknowledge не будет отправлен на сервер и сообщение останется залочено и будет re-delivery по timeout.
При этом счетчик доставки увеличится, что логично.

Не знаю баг это или фича — но очень удобно делать delay между retry для ситуаций когда это необходимо.

В случае если сообщение не может быть обработано даже при retry — надо ловить исключения и помечать сообщение как обработанное и дописывать дополнительную логику в приложение, иначе оно будет доставляться вновь и вновь пока не достигнет предела re-delivery number (конфигурируется при создании queue в service bus)

Concurrency and Prefetch message count


Как вы уже догадались настройка concurrency отвечает за количество параллельных обработчиков сообщений, а prefetch message count — сколько сообщений в буфер мы заберем с сервера

По умолчанию ServiceBusQueueTemplate автоконфигурируется (AzureServiceBusQueueAutoConfiguration) со значением 1 для обоих параметров т.е. по умолчанию каждая queue будет иметь один поток обработки, хотя концепция service bus с acknowledge для каждого отдельного сообщения предполагает много параллельных обработчиков. Тем более это важно если у вас долгая обработка запроса.

К сожалению эти настройки нельзя устанавливать через конфиг приложения (application.yml/application.properties) и возможно установить только в коде. Но даже через код у вас не получится установить разные настройки для разных очередей.

Поэтому если вам необходимо делать разные настройки — придется создавать несколько бинов ServiceBusQueueTemplate для каждого ServiceBusQueueInboundChannelAdapter

CompletableFuture внутри azure service bus java sdk


Сам azure service bus java sdk реализован вокруг CompletableFuture и CachedThreadPool executorMessagingFactory.INTERNAL_THREAD_POOL поэтому будьте осторожны со всякими thread local beans

Ordered messages


Мы используем service bus как job queue — некоторые задания зависят друг от друга и поэтому должны выполнятся в той очередности как были созданы.

Как я упоминал выше майки используют концепцию message sessions — когда сообщения группируются в сессию по ключу (передается в header), сессия существует пока существует хотя бы одно сообщение с ключом сессии — детально в документации
Service bus гарантирует доставку сообщений внутри такой группы в порядке добавления на сервере (т.е. в порядке в котором сервер service bus записал их в хранилище).

Еще стоит упомянуть если вы создали sessions enabled queue — это означает что все сообщения должны иметь header с ключом сессии.

Сразу мы очень обрадовались возможности service bus выстроить сообщения в FIFO очередь — хоть и для группы сообщений.

Но спустя некоторое время мы начали замечать проблемы:

  • некоторые сообщения начали доставляться бесконечное количество раз
  • замедлилась обработка очереди
  • в статистике service bus половина запросов помечена как failed, причем failed запросы появляются даже на пустой очереди в момент простоя

Заглянув в код sdk — выяснилось особенность работы с сессиями:

  1. консьюмер захватывает сессию и начинает вычитывать все имеющиеся сообщения в ней
  2. одновременно обрабатывается количество сессий равное параметру concurrency
  3. если вылетает unhandled exception — то консьюмер останавливается на 1 минуту (никак не конфигурируется и не зависит от настроек времени лока сообщения на сервере) и пробуем пулить все туже сессию заново — при этом счетчик re-delivery не увеличивается? он по прежнему равен 0 и если у вас баг в приложении и exception перманентный — то все встанет пока не протухнет ttl сообщения.
  4. посмотрев код и перечитав документацию — выяснили что обязательно нужно помечать сообщение как success или abandon. Вот только проблема — вы теряете delay на re-delivery
    т.к. после вызова abandon — сообщение доставляется моментально снова, но delivery counter увеличивается.
    Про влияние на delivery count можно прочесть в доке

Как итог — отказались от этой возможности service bus — и написали велосипед, а service bus выполняет роль триггера.

Как только отказались от sessions enabled queue — пропали ошибки в статистике запрос к service bus.

В связке JMS + Qpid — этот функционал недоступен.

Потенциальные проблемы с размером очереди больше 1G


Пока не встречали, но слышал что начинает работать нестабильно если размер очереди больше 1G.

Если вы сталкивались с этим или наоборот все работает — пишите в комментариях.

Проблемы с трейсингом запросов


Стандартный azure application insights agent не умеет трекать отправку сообщений как dependency и входящие сообщения как requests.

Пришлось дописывать код.

Итоги


Если вам нужен job queue с долгой обработкой сообщения и не требуется очередность — можете использовать.

Если обработка сообщений быстрая — используйте Azure Event Hub — обычная Kafka, стандартный клиент работает отлично.