Привет! Меня зовут Павел Агалецкий, я старший инженер в команде архитектуры. Одна из областей ответственности нашей команды — синхронное (RPC) и асинхронное взаимодействие между сервисами. 

В этой статье я расскажу о том, как можно связать сервисы друг с другом и как это делаем мы в платформе Авито. 

Первая половина статьи будет посвящена паттернам — шаблонам проектирования связи для самых распространённых ситуаций при взаимодействии сервисов, когда удобнее всего использовать асинхронный обмен сообщениями. 

Во второй половине текста я расскажу об особенностях обработки сообщений при асинхронном обмене и гарантии доставки сообщений.  

Как можно организовать связь между сервисами

Для передачи данных из одного сервиса в другой есть четыре способа:

  • Опубликовать файл с данными. Для этого нужно общее хранилище данных, в которое один сервис записывает файл, а второй его считывает. Этот способ удобно использовать, если нужно, например, передать несколько сотен ГБ данных.

  • Загрузить данные в единую базу. Вся выходная информация из всех сервисов в системе публикуется в общей базе. Каждый сервис, когда ему нужно, подключается к базе и забирает данные. Этот способ подходит для ситуаций, когда в системе сложная схема взаимодействия сервисов.

  • Использовать механизм удалённого вызова процедур. RPC позволяет отправить из одного сервиса запрос в другой и получить ответ с нужными данными. Это удобно, когда сервисы нужно связать друг с другом напрямую.

  • Отправить асинхронное сообщение через общую шину данных. В этом случае сервис не ждёт ответ на запрос, а продолжает работать в обычном режиме. 

Первые два способа сложно масштабировать, они не подходят для такой крупной системы, как платформа Авито с сотнями сервисов. Поэтому мы используем только RPC и отправку сообщений — более современные и гибкие способы взаимодействия. 

RPC и обмен сообщениями: в чём разница 

Главное отличие удалённого вызова процедур и обмена сообщениями — синхронность процессов и связанность системы.

Сервис А удалённо вызывает процедуру из B и получает в ответ данные
Сервис А удалённо вызывает процедуру из B и получает в ответ данные

Удалённый вызов процедур подразумевает, что один сервис обращается к другому и ждёт ответ. Отправитель запроса приостанавливает работу и ожидает в течение ограниченного времени. Сервис получает ответ и обрабатывает его, затем продолжает работу. Между отправкой запроса и получением ответа не происходит никаких операций. Такой процесс можно считать синхронным: если запрос отправлен, на него должны ответить.

С точки зрения сервиса, вызов удалённой процедуры не отличается от обычного вызова функции внутри сервиса. Но разработчику нужно учитывать, что сервис-получатель может быть недоступен. Например, он отключён или удалён. Если запрос уйдёт к такому сервису, отправитель не сможет продолжить работу, потому что никогда не получит ответ.

Использовать RPC нужно, если точно известны отправитель и получатель запроса. Это делает систему очень связанной. Если из одного сервиса убрали вызываемый метод, то нужно переписать все другие сервисы, которые могут обращаться к этому методу. Иначе запросы будут оставаться без ответа, сервисы не смогут работать, и вся система рискует сломаться.

Производительности системы может не хватить для обработки всех запросов, если нужно делать много вызовов, а обмен данными происходит медленно. Поэтому RPC сложно масштабировать: чем больше запросов, тем дольше отвечает система.

Сервисы посылают запросы на шину данных. 
Каждый может читать все сообщения независимо от других
Сервисы посылают запросы на шину данных. Каждый может читать все сообщения независимо от других

Обмен сообщениями подразумевает, что сервис отправляет запрос через шину данных, не ждёт ответ и продолжает работать. Сервис может даже не знать, кто будет получателем сообщения. Для его работы это не имеет значения. Например, это может быть сервис для сбора данных от клиентов, который просто публикует их на шине для других сервисов. 

Обмен сообщениями — это асинхронный процесс: запрос отправлен, но отвечать на него не обязательно. Это может быть нужно для того, чтобы другие сервисы могли в будущем прочитать сообщение с шины данных и каким-то образом его обработать. Для сервиса-отправителя это неважно. Использовать обмен сообщениями можно почти в любых ситуациях, если нет причин выбрать RPC. 

Асинхронный обмен сообщениями позволяет легко масштабировать систему. Для шины данных нет большой разницы, два получателя у сообщения или сто. Поэтому через неё можно связывать любое число источников и получателей. 

Асинхронный обмен не влияет на работу системы. Даже если ни один из получателей не доступен, сервис, который отправил запрос, продолжит работу. По этой же причине система становится менее связанной. Это значит, что изменения одного сервиса не влияют на работоспособность других.

Асинхронный обмен сообщениями на платформе Авито реализован через шину данных Data Bus — специальную систему передачи данных между сервисами. Шина данных позволяет любому сервису в любой момент читать какое угодно сообщение. По сути это лог всех сообщений внутри системы. 

Иногда сервису нужно отправить сообщение самому себе. В этом случае асинхронный обмен реализован через Queues, или очередь. Такой способ подходит, если, например, сервис хочет оставить отложенные сообщения. К очереди нельзя подключиться из другого сервиса. 

Паттерны асинхронных обменов сообщениями

При работе сервисов возникает несколько типовых ситуаций, для которых мы выработали шаблоны решений — паттерны. Выбрать подходящий можно по числу отправителей и получателей сообщений. 

Отправлять асинхронные сообщения можно:

  • между двумя сервисами; 

  • от многих отправителей;

  • для многих получателей;

  • для нескольких получателей в одном сервисе;

  • для двустороннего обмена через очереди.

Сервис А отправляет запрос на шину данных, сервис В его читает
Сервис А отправляет запрос на шину данных, сервис В его читает

Обмен сообщениями между двумя сервисами. Это самый простой вариант взаимодействия: сервис А хочет отправить запрос сервису В. Для этого A публикует его в шину данных. B получает сообщение и может его обработать. 

Здесь возможны два варианта реализации: 

  • Сообщение предназначено только для сервиса В. Если два сервиса взаимодействуют только друг с другом, то лучше использовать не асинхронный обмен, а RPC. 

  • Сообщение получает любой сервис на платформе. Если сообщение можно переиспользовать в дальнейшем, его нужно отправлять через шину данных. Например, в будущем появится новый сервис, которому нужны те же данные, которые уже записаны на шине. Не нужно будет писать новое сообщение — достаточно взять то, которое уже есть.

Несколько отправителей. Data Bus не ограничивает количество различных отправителей для конкретного сообщения. Любой сервис может отправлять запрос, если делает это в корректном формате. 

Правильный формат сообщения важен, чтобы запрос могли отправлять и получать любые сервисы. В сообщении нужно передавать идентификаторы сущностей и минимально необходимые поля. 

Все детали лучше передавать через RPC уже после обработки асинхронного сообщения. Например, сервис может получить запрос, который опубликован на шине данных несколько месяцев назад. После обработки сообщения сервис отправит RPC и в ответ получит актуальные данные, а не те, что были месяц назад.

Если множество отправителей публикуют одинаковое сообщение на регулярной основе, лучше разделить события названиями. Если в системе есть разные копии платёжных систем, то лучше присвоить их сообщениям OrderPaid разные названия: например, OrderPaymentOnePaid или OrderPaymentTwoPaid.

Сервис А отправляет запрос на шину данных. 
Сервисы В и С получают от шины одинаковые сообщения
Сервис А отправляет запрос на шину данных. Сервисы В и С получают от шины одинаковые сообщения

Несколько получателей. Нормальная ситуация, для которой и придумана шина данных. В Data Bus каждый получатель работает независимо от других сервисов. Допустим, A опубликовал сообщение на шине. Сервисы В и С получат полностью идентичные комплекты сообщений. 

Сервис А отправляет запрос двум получателям на одном сервере через шину данных
Сервис А отправляет запрос двум получателям на одном сервере через шину данных

Несколько получателей в одном сервисе. У каждого сервиса есть уникальный Client ID — идентификатор получателя сообщений. Если внутри сервиса есть несколько получателей, то сообщения от шины данных будут распределены между ними. 

Можно получать несколько копий сообщений от шины, если это необходимо. Для этого нужно вручную присвоить каждому сервису личный Client ID. Тогда для каждого получателя внутри сервиса будет сделана отдельная копия всех сообщений от Data Bus.

Сервис А отправляет информацию на шину данных.
Сервис B может ответить ему запросом RPC или сообщением
Сервис А отправляет информацию на шину данных. Сервис B может ответить ему запросом RPC или сообщением

Двусторонний обмен через очереди. Иногда нужно, чтобы сервис всё-таки получил ответ на сообщение. Для этого паттерна возможны два решения: 

  • Добавить в сервис B вызов RPC. То есть, получив сообщение от шины данных, B отправит к A идентификатор или статус обработанного сообщения. Для этого нужно, чтобы B всегда знал, кто именно отправляет ему сообщения.

  • Сервис B отправит сообщение для A через шину данных. В нём так же, как и в вызове RPC, будет статус или идентификатор, который сервис получит после обработки исходного сообщения. 

Во втором случае нужно дополнительно продумать поле-идентификатор. По нему получатели сообщений будут понимать, что конкретно это сообщение — ответ на запрос, который был отправлен ранее. Но при этом сервису B не нужно знать, кто ему отправил сообщение: можно просто передать свой ответ в шину данных.

Обработка сообщений при асинхронном обмене

Асинхронный обмен подразумевает, что сервис-отправитель сообщения не ждёт ответ на него, а продолжает работу в обычном режиме. Но когда сообщение доходит до сервиса-получателя, он сообщает об этом шине данных. 

Если сервис получает от шины данных Data Bus одно сообщение, то возвращает для него один из двух ответов: 

  • ACK (acknowledgement);

  • NACK (negative acknowledgement).

Результат ACK означает, что сообщение обработано и отправлять его повторно не нужно. То есть сервис-получатель должен отправлять ACK только тогда, когда выполнит все действия с сообщением и зафиксирует их. Например, этот ответ нужно отправлять после того, как закоммитили транзакцию в базу данных.

Результат NACK означает, что сообщение не может быть обработано. В этом случае шина данных отправит его повторно тому же получателю. То есть через заданный шиной промежуток времени то же самое сообщение придёт снова.

Если после отправки NACK получатель отключится. Затем шина данных отправит сообщение следующему получателю.

Сервис может получить несколько сообщений, то есть батч. В этом случае он отправляет шине данных значение счётчика processed. Это количество успешно обработанных сообщений, если считать от начала батча. Все остальные сообщения из этого батча будут считаться необработанными, как если бы сервис отправил для них значение NACK.

Батч сообщений, в котором три успешно обработаны, а последние два нет
Батч сообщений, в котором три успешно обработаны, а последние два нет

Например, вы получили в батче пять сообщений, успешно обработали три из них. Затем произошёл какой-то сбой, и последние два сообщения сервис не обработал. На шину данных нужно отправить processed, который равен трём: сообщения m1, m2, m3 закоммичены как успешно обработанные. Сообщения m4 и m5 не обработаны, поэтому шина данных отправит их повторно. 

Для асинхронных очередей внутри одного сервиса (Queues) есть ещё один вариант ответа на сообщение. 

DEFER означает, что сообщение нужно не просто повторно отправить, а переложить в конец очереди. DEFER не блокирует получение всех последующих сообщений.

Перекладывать сообщения можно в отложенную очередь (DLQ). То есть повторно запрос придёт, например, через 5 или 10 минут. Задержку можно задать с точностью до одной секунды.

DLQ нужно настраивать в конфигурационном файле app.toml, отдельно для каждого сервиса. 

Пример сервиса «item.landed.on.mars», у которого есть три отложенные очереди: на 5, 10 и 30 минут
Пример сервиса «item.landed.on.mars», у которого есть три отложенные очереди: на 5, 10 и 30 минут

Если сервис из примера на иллюстрации получит сообщение и ответит на него DEFER, то оно попадёт сначала в первую отложенную очередь. Через 5 минут сообщение будет отправлено повторно. Если сервис снова ответит DEFER, то сообщение переходит во вторую очередь, и так далее. 

В Queues можно отправлять батчи сообщений и отвечать на них DEFER. Сервис, который получил батч, ответит на него счётчиком processed, так же как и в Data Bus. Если после счётчика сервис отправит DEFER, то одно сообщение попадёт в конец очереди или в отложенную очередь, если она есть. Все остальные сообщения будут считаться необработанными.

Батч сообщений в Queues: три успешно обработаны, одно отправлено в конец очереди 
и одно не обработано
Батч сообщений в Queues: три успешно обработаны, одно отправлено в конец очереди и одно не обработано

В примере на иллюстрации сервис получил пять сообщений из Queues. Первые три из них обработал и вернул processed=3. Потом сервис отправил DEFER на сообщение m4, оно будет переложено в очередь. Ответ на сообщение m5 будет NACK. 

Механика доставки сообщений Queues реализована таким образом, что сообщение m5 будет отправлено сразу после обработки этого батча.

Способы и результаты обработки невалидных сообщений

Иногда в сервисах возникают баги и ошибки, которые влияют на валидность сообщений. То есть некоторые сообщения становятся неправильными с точки зрения структуры или бизнес-логики.

Структурная невалидность появится, например, если сервис ожидает в каком-то поле сообщения число, а там будет строка. Обработать такое сообщение невозможно.

Сообщение может стать невалидным по структуре, если:

  • в программном коде сервиса-отправителя, Data Bus или Queues есть ошибки;

  • разработчик вручную вмешался в работу сервиса, например остановил его;

  • в новой версии сервиса используется другая структура сообщения. 

Для того, чтобы избежать невалидности такого типа, на платформе Авито есть механизм валидации схем Data Bus/Queues. Он действует и для отправителей, и для получателей сообщений. 

Логическая невалидность появляется, если сообщение не соответствует бизнес-логике сервиса. Например, сообщение указывает, что нужно оплатить заказ, а в системе он уже помечен как оплаченный. 

Если сервис получил невалидное по логике сообщение, можно:

  • остановить работу сервиса;

  • пропустить сообщение;

  • отправить сообщение в очередь для повторной обработки;

  • отправить сообщение в очередь для недоставленных сообщений.

Остановить работу сервиса. Это самый простой вариант. Сервис получил сообщение с ошибкой в логике, разработчик завершил работу сервиса и отправил NACK. Если поступить так, сервис не пропустит сообщения. 

Метод подходит для ошибок, которые легко исправить. Например, сервис просто потерял связь с базой данных и из-за этого получил невалидное сообщение. Тогда можно остановить сервис и запустить его снова. Возможно, база данных станет доступна, и обработка продолжится с того же самого места, на котором остановилась. Если нет, то нужно вмешаться вручную, чтобы разобраться, в чём причина и что необходимо делать.

Но иногда остановка может привести к тому, что отправка всех сообщений в очереди будет заблокирована. В результате очередь сообщений вырастет, бизнес-процессы остановятся. Такую проблему нельзя решить, если не проанализировать причины и не исправить ошибки вручную.

Пропустить сообщение. Невалидное сообщение можно просто проигнорировать в сервисе, только сделать запись о нём в лог. Это решение гарантирует, что сервис никогда не заблокируется: невалидные сообщения никак не повлияют на его работу. 

Но если сервис пропускает все невалидные сообщения, то может потерять важные данные. Допустим, сообщение о том, что заказ оплачен, по какой-то причине оказалось невалидным. Если его пропустить, запись об оплате заказа не появится. Это серьёзная проблема с точки зрения бизнеса. 

Пропускать сообщения можно, например, если сервис считывает поток логов и потеря одного сообщения не будет критичной. В остальных случаях лучше выбрать другой вариант.

Отправить сообщение в очередь для повторной обработки. Все невалидные сообщения можно переносить в Retry Queues — отдельную очередь. Для неё в сервисе нужен свой обработчик.

Схема реализации отложенной очереди, которая повторно обрабатывает сообщения для Queues
Схема реализации отложенной очереди, которая повторно обрабатывает сообщения для Queues

Сейчас в Data Bus нет механизма отложенных очередей, но его можно реализовать, если совместить Data Bus и Queues. То есть внутри сервиса-получателя создать отложенную очередь Queues специально для невалидных сообщений. 

Схема реализации очереди для повторной обработки для Data Bus
Схема реализации очереди для повторной обработки для Data Bus

Отправить сообщение в очередь для недоставленных сообщений. Иногда сообщение невалидно само по себе. Например, оно случайно сохранилось от предыдущего релиза, а в текущем уже не используется. В сервисе нет для него подходящего обработчика, поэтому неизвестно, как с ним поступить.

В таком случае очередь повторных обработок забьётся и будет бесконечно крутить одни и те же невалидные сообщения, которых со временем может стать больше, чем валидных. 

Для таких ситуаций нужна ещё одна очередь, реализованная на основе DLQ. В неё стоит переносить сообщения, которые не удалось обработать, например, за 5 попыток. 

Сервис не будет обрабатывать сообщения из DLQ. По сути это накопитель, своеобразный лог для невалидных сообщений. 

Для очереди недоставленных сообщений нужно предусмотреть инструмент просмотра и очистки. Разработчик использует его, чтобы просмотреть очередь. Например, чтобы узнать, какие сообщения в неё попали, и разобраться, что с ними не так. Может оказаться, что некоторые сообщения нужные, а в работе сервиса есть баг, который делает их невалидными. 

Платформа Авито позволяет предусмотреть многие ошибки в архитектуре сервисов, упрощает их разработку. Паттерны асинхронных обменов и способы обработки сообщений помогают предупредить большое число проблем. Например, сбои в работе сервиса из-за невалидных сообщений.

Поэтому самым важным паттерном для разработчика остаётся правильное проектирование. Специалисту нужно понимать, что полную корректность работы системы может обеспечить только он сам. Если какой-то критерий очень важен для работы сервиса, например способ обработки невалидных сообщений, его нужно обязательно продумать в бизнес-логике, а не полагаться на инструменты платформы.

Комментарии (9)


  1. blood_develop
    31.03.2022 06:17

    Может, тупой вопрос, но.. Зачем между сервисами может понадобиться гонять несколько сотен гигабайт данных?


    1. ewolf Автор
      31.03.2022 06:20
      +1

      В общем смысле ответ таков. Каждый сервис занимается своими задачами, поддерживается отдельной командой и отвечает за выделенную предметную область.

      Но различные процессы в одних сервисах могут запускаться как реакция на изменения в других. Именно для отправки сообщений о подобных изменениях и нужна шина данных


      1. blood_develop
        31.03.2022 06:24
        +1

        Я понимаю зачем нужна шина данных. Меня смутил в статье пункт про обмен файлами через общее хранилище данных


        1. ewolf Автор
          31.03.2022 06:28
          +1

          Это один из паттернов возможных обменов. Например, недоступна шина, сервисы живут на одном физическом сервере или имеется особый характер данных, скажем вы выгружает видео файл и передаёте его другому сервису.

          Файл это просто носитель информации


        1. RPG18
          31.03.2022 13:05
          +1

          Выложили доклад: Kafka. Как мы строили корпоративную шину данных, которая обрабатывает до 3 млн сообщ./сек. / И.Гаас. Не все системы любят мегабайтные блобы, поэтому такие вещи передаются через файловый сторадж, например s3.


  1. ToTheHit
    31.03.2022 09:20
    +1

    В статье описано применение RPC, но что насчёт обычных REST запросов? То есть каждый микросервис имеет свои эндпоинты и другие микросервисы шлют на него запрос как на какой-нибудь удаленный сервис, не зная, что они находятся в "одной экосистеме". Большая ли разница в производительности, и может есть еще какие-нибудь подводные камни?


    1. ewolf Автор
      31.03.2022 09:26
      +2

      Разница между RPC и REST лишь в понятийной области. Вполне можно использовать REST, но обычно сложно уложить многообразие действий в парадигму REST, да и не особо нужно.

      В нашем случае, да, у каждого сервиса свой набор эндпоинтов, которые вызываются другими. Другие сервисы, разумеется, знают в какой сервис они шлют запросы.

      Производительность ниже, чем если бы это был монолит, но зато мы можем разрабатывать такие системы отдельно друг от друга. Также никто не отменяет кэши и другие оптимизации


    1. MMgo
      31.03.2022 10:35
      +1

      на правах ИМХО

      REST он для внешних клиентов(наших пользователей, браузеров итд), не для внутренних

      Внутренним лучше подходит именно RPC - меньше ограничений навязанных парадигмой и протоколом.


  1. Stas911
    02.04.2022 05:09

    Ну Enterprise Service Bus бывают и вполне себе синхронные. А на случай изменения интерфейсов - версионирование. Видел такое в банке (там Оракл EA) и оно даже работало лет 10 назад.