Статья за авторством Александра Романова, разработчика интеграционных решений.

В процессе интеграции систем мы часто сталкиваемся с необходимостью гарантированной доставки сообщений между системами. В таких случаях на помощь нам приходят очереди. Но не все задачи так просты, как доставка сообщений из системы А в систему Б. Бывают случаи, когда нужно обогатить доставляемые сообщения данными из смежных, участвующих в интеграции систем. Которые не всегда могут интегрироваться через очереди, а имеют лишь синхронные сервисы. И вот уже в нашей интеграции возникают такие явления, как недоступность, отказы и другие «приятные» особенности использования «синхронов». Можно было бы переложить обработку промежуточных отказов на систему-источник, но это некультурно, да и невозможно, если мы публикуем события сразу для нескольких систем (в топик).



Удобным и работающим решением проблемы, с нашей точки зрения, является асинхронная пошаговая обработка сообщения через внутреннюю очередь с вызовом внешнего сервиса на каждом шаге. В случае отказа сервиса по ошибке или из-за его временной неработоспособности сообщение попадает во внутреннюю очередь отказов и отправляется повторно после разборов с возникшими в сервисе проблемами.



Данное решение так же устраняет проблему невозможности отката транзакции при работе с внешними сервисами. Никакой вызов не пройдет дважды — обработка начинается именно с того шага, на котором произошел сбой.

Всё вышеописанное очень легко реализуется на интеграционной шине, в которой асинхронное взаимодействие между компонентами через внутренние очереди идёт «из коробки». Но слишком высокие цены за «коробку» могут сильно затруднить использование интеграционной шины. Мы приведем пример реализации простого приложения на Spring Integration (далее SI) + Rabbit MQ. Оговоримся, что Rabbit MQ у себя в production мы не используем из-за невозможности его работы с XA.

Сердцем всего приложения является spring-integration-context.xml. Там описана компонентная модель, инициализируются ресурсные бины и менеджер транзакций для работы с MQ. Опишем его подробнее.

Подключаем встроенный в SI драйвер и прописываем ресурсы:

  <rabbit:queue name="si.test.queue.to"/>
  <rabbit:queue name="si.test.queue.from"/>

Нам необходим низкоуровневый бин amqpTemplate, через который осуществляется взаимодействие с ресурсами. Данный бин мы используем напрямую в тестах, и он требуется для компонент SI, которые работают с Rabbit MQ. ConnectionFactory, необходимый для подключения к ресурсам, конфигурит Spring Boot по настройкам из application.yml (см. org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration).

  <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/>

Для транзакционной работы с Rabbit MQ требуется TransactionManager (нужен для отката сообщения обратно в очередь, если в процессе работы произойдет ошибка). К сожалению, Rabbit MQ не поддерживает XA-транзакции, иначе менеджер транзакций сконфигурил бы Spring Boot. Конфигурим предоставляемый Spring-ом вручную.

 <bean id="rabbitTransactionManager"
          class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
        <constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/>
    </bean>

А теперь самое приятное. «Рисуем» flow! В кавычках, потому что пишем в виде xml, что менее приятно.

Flow


Нам потребуются:

  • inbound-adapter для чтения из входной очереди;
  • асинхронные компоненты для вызовов REST-сервисов;
  • outbound-channel-adapter для записи в выходную очередь.

Для организации гарантированной доставки в данном приложении мы используем асинхронные вызовы через внутреннюю очередь. Так как компонент много, требуется несколько очередей, что неудобно ни с точки зрения разработки, ни администратору. Постараемся обойтись одной.

Рассмотрим сценарий взаимодействия между двумя компонентами. SomeComponentOne получает сообщение из канала, вызывает некий синхронный REST-сервис (работает с БД, пишет в файл и т.п.) и отправляет сообщение на дальнейшую обработку, которой должна заниматься SomeComponentTwo. Если SomeComponentOne не смогла выполнить порученный ей кусок работы, она должна откатить транзакцию и вернуть полученное сообщение туда, откуда она его забрала. Если всё хорошо — отправить сообщение во внутреннюю очередь и завершить транзакцию. SomeComponentOne забирает сообщение из внутренней очереди и отправляет сообщение в неё же, при этом не обязательно в том же виде, в котором получила. Сообщение может быть обогащено или изменено, нам не важно. Оно предназначено для работы компоненты SomeComponentTwo. Возникает проблема роутинга. Сообщение попадает во внутреннюю очередь и должно забираться оттуда нужной в данный момент времени компонентой. Другими словами, необходим роутинг.

В данном приложении продемонстрирован роутинг, основанный на заголовках сообщения. У сообщения заполняется кастомный заголовок PartnerComponent, указывающий на компоненту, которая должна работать с сообщением.

Распишем технические детали представленного flow.

Адаптер для чтения из входной очереди. Получает сообщение и в транзакции бросает его сразу во внутреннюю очередь.

<int-amqp:inbound-channel-adapter channel="innerChannel"
                                      queue-names="si.test.queue.to"
                                      connection-factory="rabbitConnectionFactory"
                                      transaction-manager="rabbitTransactionManager"/>
<int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner"                                                            connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/>

Мы использовали специализированный для работы с очередями асинхронный канал, предоставляемый Spring-ом. Получили интерфейс SI-channel, а хранение сообщений непосредственно в очереди, в нашем случае во внутренней mq-очереди приложения. При получении сообщения из данного канала-очереди будет открываться транзакция, т.к. мы подключили наш менеджер транзакций.

К данному каналу-очереди подключаем SI-роутер, работающий на заголовках сообщений.

<int:header-value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent"
                            default-output-channel="component1Channel">
        <int:mapping value="ComponentTwo" channel="component2Channel"/>
        <int:mapping value="ComponentThree" channel="component3Channel"/>
        <int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/>
    </int:header-value-router>

Новое для flow сообщение не имеет технического заголовка PartnerComponent, поэтому по-умолчанию будет обрабатываться компонентой someComponentOne, обязанностью которой является указание в заголовке сообщения PartnerComponent следующей компоненты и отправка сообщения во внутреннюю очередь. Роутер вновь забирает сообщение из внутренней очереди и отправляет его на обработку в указанную компоненту.

Описание компонент, в которые отправляются сообщения из роутера.

    <int:channel id="component1Channel"/>
    <int:service-activator input-channel="component1Channel"
                           ref="someComponentOne" method="process"/>
    <int:channel id="component2Channel"/>
    <int:service-activator input-channel="component2Channel"
                           ref="someComponentTwo" method="process"/>
    <int:channel id="component3Channel"/>
    <int:service-activator input-channel="component3Channel"
                           ref="someComponentThree" method="process"/>
    <int:channel id="outboundRabbitChannel"/>
    <int:service-activator input-channel="outboundRabbitChannel"
                           ref="outboundRabbitComponent" method="process"/

Адаптер для отправки в выходную очередь.
    <int:channel id="toRabbit"/>
    <int-amqp:outbound-channel-adapter channel="toRabbit"                                                                                                amqp-template="amqpTemplate"                                                                                                 routing-key="si.test.queue.from"/>

Сборка (pom.xml)


Старый добрый Maven. Стандартная сборка от Spring Boot. Зависимости от SI и AMQP предоставляют все необходимые библиотеки. Также подключаем spring-boot-starter-test для реализации проверочных кейсов на JUnit.

Работа SomeComponent*.java


Транзакционные бины, подключенные как service-activator к flow SI. Вызов REST через RestTemplate и отправка во внутреннюю очередь через innerChannel. Достаточно, чтобы продемонстрировать работу с сервисом и удобно за-mock-ить в тестах.

Тестируем


В тесте testHappyPath мы проверили работоспособность flow, когда нет сбоев при вызове REST. Mock-аем все вызовы REST-сервисов без сбоев, бросаем сообщение во входную очередь, ждем в выходной, проверяем прохождение всех компонент по контенту тела полученного сообщения.

В тесте testGuaranteedDelivery мы проверили гарантированную доставку при сбое в одном из REST. Эмулируем однократный сбой при вызове сервиса из третьей компоненты, ждём доставку сообщения до выходной очереди, проверяем тело полученного сообщения.

Заключение


Мы сделали приложение с гарантированной доставкой. Остался ряд мелких нерешенных вопросов: повторная отправка сообщений происходит бесконечно и неуправляемо, кто поддерживает данное решение — администратор или эту поддержку можно автоматизировать? Мы решаем данные вопросы при помощи самописных приложений и индивидуальных настроек для каждого типа сообщения. Возможно, в будущем мы опишем эти решения.

Gриложение целиком на git-hub
Поделиться с друзьями
-->

Комментарии (4)


  1. sshikov
    22.11.2016 15:48
    +3

    Ряд мелких нерешенных вопросов? :) Если бы…

    По-моему тут наблюдается минимум одна крупная проблема, которую вы обошли вниманием — как только используется такое решение для «гарантированной» доставки, как правило сразу становится невозможным сохранение порядка сообщений — потому что первое сообщение застряло на втором REST сервисе, а второе например прошло успешно — потому что сервис уже починили.

    Неуправляемая повторная отправка только усугубляет данную проблему.


    1. selenite
      23.11.2016 18:48

      Рано или поздно такая транзакция будет найдена и откачена.
      Блокчейн поможет :D (за счет того, что высокая вычислительная стоимость обеспечивает неплохое количество возможностей транзакции «прийти в себя»).

      Если исходные данные, которые доставляются, являются немутабельными, фиксированными в каком-то промежутке — достаточно слать их с историей версий и как-то синхронизировать между получателями. Дорого, да. Но на практике вполне юзабельно.


  1. tas
    24.11.2016 11:22
    +1

    Так и не понял, в чем инновационность этого решения, или чем оно лучше ActiveMQ, RabbitMQ или IBM WebSphere MQ?


  1. Throwable
    25.11.2016 10:35
    +2

    Оговоримся, что Rabbit MQ у себя в production мы не используем из-за невозможности его работы с XA.

    Вместе с тем вы используете Rest, который вообще как бы не транзакционный. Странная мотивация.


    Данное решение так же устраняет проблему невозможности отката транзакции при работе с внешними сервисами. Никакой вызов не пройдет дважды — обработка начинается именно с того шага, на котором произошел сбой.

    Вопрос на засыпку: что делать, если мой любимый http вернул timeout? Rest обработал изменения или нет?
    В вашей схеме каждый Rest-сервис должен локально записывать транзакции и автоматически игнорировать их при повторном вызове.