Сообщество .NET-разработчиков Райффайзенбанка продолжает краткий разбор содержимого ViennaNET. О том, как и зачем мы к этому пришли, можно почитать в первой части.

В этой статье пройдемся по еще не рассмотренным библиотекам для работы с распределенными транзакциями, очередями и БД, которые можно найти в нашем репозитории на GitHub (исходники лежат здесь), а Nuget-пакеты здесь.



ViennaNET.Sagas


Когда в проекте происходит переход на DDD и микросервисную архитектуру, то при разнесении бизнес-логики по разным сервисам, возникает проблема, связанная с необходимостью реализации механизма распределенных транзакций, ведь многие сценарии часто затрагивают сразу несколько доменов. С такими механизмами подробнее можно познакомиться, например, в книге «Microservices Patterns», Chris Richardson.

В наших проектах мы реализовали простой, но полезный механизм: сага, а точнее сага на основе оркестрации. Суть ее в следующем: есть некий бизнес-сценарий, в котором необходимо последовательно совершить операции в разных сервисах, при этом, в случае возникновения каких-либо проблем на любом шаге, необходимо вызвать процедуру отката всех предыдущих шагов, где она предусмотрена. Таким образом, в конце выполнения саги, независимо от успешности, мы получаем консистентные данные во всех доменах.

Наша реализация пока сделана в базовом виде и не завязана на использовании каких-либо способов взаимодействия с другими сервисами. Применять её несложно: достаточно сделать наследника от базового абстрактного класса SagaBase<Т>, где T – это ваш класс контекста, в котором можно хранить исходные данные, необходимые для работы саги, а также некоторые промежуточные результаты. Экземпляр контекста будет пробрасываться во все шаги во время выполнения. Сама сага является stateless классом, поэтому экземпляр может быть помещен в DI как Singleton, чтобы получить необходимые зависимости.

Пример объявления:

public class ExampleSaga : SagaBase<ExampleContext>
{
  public ExampleSaga()
  {
    Step("Step 1")
      .WithAction(c => ...)
      .WithCompensation(c => ...);
	
    AsyncStep("Step 2")
      .WithAction(async c => ...);
  }
}

Пример вызова:

var saga = new ExampleSaga();
var context = new ExampleContext();
await saga.Execute(context);

Полноценные примеры разных реализаций можно посмотреть здесь и в сборке с тестами.

ViennaNET.Orm.*


Набор библиотек для работы с различными БД через Nhibernate. У нас используется подход DB-First с применением Liquibase, поэтому здесь присутствует только функционал по работе с данными в готовой БД.

ViennaNET.Orm.Seedwork и ViennaNET.Orm – главные сборки, содержащие базовые интерфейсы и их реализации соответственно. Остановимся на их содержимом подробнее.

Интерфейс IEntityFactoryService и его реализация EntityFactoryService являются главной отправной точкой для работы с БД, так как здесь создается Unit of Work, репозитории для работы с конкретными сущностями, а также исполнители команд и прямых SQL-запросов. Иногда удобно ограничить возможности класса по работе с БД, например, дать возможность только для чтения данных. Для таких случаев у IEntityFactoryService есть предок – интерфейс IEntityRepositoryFactory, в котором объявлен только метод для создания репозиториев.

Для непосредственного обращения к БД используется механизм провайдеров. Для каждой используемой у нас в командах СУБД есть своя реализация: ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle, ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql.

При этом в одном приложении может быть зарегистрировано несколько провайдеров одновременно, что позволяет, например, в рамках одного сервиса без каких-либо затрат на доработку инфраструктуры провести пошаговую миграцию с одной СУБД на другую. Механизм выбора необходимого подключения и, следовательно, провайдера для конкретного класса-сущности (для которого и пишется маппинг на таблицы БД) реализован через регистрацию сущности в классе BoundedContext (содержит метод для регистрации доменных сущностей) или его наследника ApplicationContext (содержит методы для регистрации аппликационных сущностей, прямых запросов и команд), где в качестве аргумента принимается идентификатор подключения из конфигурации:

"db": [
  {
    "nick": "mssql_connection",
    "dbServerType": "MSSQL",
    "ConnectionString": "...",
    "useCallContext": true
  },
  {
    "nick": "oracle_connection",
    "dbServerType": "Oracle",
    "ConnectionString": "..."
  }
],

Пример ApplicationContext:

internal sealed class DbContext : ApplicationContext
{
  public DbContext()
  {
    AddEntity<SomeEntity>("mssql_connection");
    AddEntity<MigratedSomeEntity>("oracle_connection");
    AddEntity<AnotherEntity>("oracle_connection");
  }
}

Если идентификатор подключения не указан, то будет использоваться подключение с именем «default».

Непосредственно маппинг сущностей на таблицы БД реализуется стандартными средствами NHibernate. Можно использовать описание как через xml-файлы, так и через классы. Для удобного написания репозиториев-заглушек в Unit-тестах, имеется библиотека ViennaNET.TestUtils.Orm.

Полноценные примеры использования ViennaNET.Orm.* можно найти здесь.

ViennaNET.Messaging.*


Набор библиотек для работы с очередями.

Для работы с очередями был выбран такой же подход, что и с различными СУБД, а именно – максимально возможный унифицированный подход с точки зрения работы с библиотекой, независимо от используемого менеджера очередей. Библиотека ViennaNET.Messaging как раз отвечает за эту унификацию, а ViennaNET.Messaging.MQSeriesQueue, ViennaNET.Messaging.RabbitMQQueue и ViennaNET.Messaging.KafkaQueue содержат реализации адаптеров для IBM MQ, RabbitMQ и Kafka соответственно.

В работе с очередями есть два процесса: получение сообщения и отправка.

Рассмотрим получение. Здесь есть 2 варианта: для постоянного прослушивания и для получения единичного сообщения. Для постоянного прослушивания очереди необходимо для начала описать класс процессора, унаследованный от IMessageProcessor, который будет отвечать за обработку входящего сообщения. Далее его необходмо «привязать» к определенной очереди, делается это через регистрацию в IQueueReactorFactory с указанием идентификатора очереди из конфигурации:

"messaging": {
    "ApplicationName": "MyApplication"
},
"rabbitmq": {
    "queues": [
      {
        "id": "myQueue",
        "queuename": "lalala",
        ...
      }
    ]
},

Пример запуска прослушивания:

_queueReactorFactory.Register<MyMessageProcessor>("myQueue");
var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue");
queueReactor.StartProcessing();

Затем, при старте сервиса и вызова метода для начала прослушивания, все сообщения из указанной очереди будут попадать в соответствующий процессор.

Для получения единичного сообщения в интерфейсе-фабрике IMessagingComponentFactory есть метод CreateMessageReceiver, который создаст получателя, ожидающего сообщения из указанной ему очереди:

using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue"))
{
    var message = receiver.Receive();
}

Для отправки сообщения необходимо воспользоваться всё той же IMessagingComponentFactory и создать отправителя сообщения:

using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue"))
{
    sender.SendMessage(new MyMessage { Value = ...});
}

Для сериализации и десереализации сообщения есть три готовых варианта: просто текст, XML и JSON, но при необходимости спокойно можно сделать свои реализации интерфейсов IMessageSerializer и IMessageDeserializer.

Мы постарались сохранить уникальные возможности каждого менеджера очередей, например, ViennaNET.Messaging.MQSeriesQueue позволяет отправлять не только текстовые, но и байтовые сообщения, а ViennaNET.Messaging.RabbitMQQueue поддерживает роутинг и создание очередей “на лету”. В нашей обертке адаптера для RabbitMQ также реализовано некоторое подобие RPC: отправляем сообщение и ожидаем ответа из специальной временной очереди, которая создается только для одного ответного сообщения.

Вот пример использования очередей с основными нюансами подключения.

ViennaNET.CallContext


Мы используем очереди не только для интеграции между разными системами, но и для общения между микросервисами одного приложения, например, в рамках саги. Это привело к необходимости передачи вместе с сообщением таких вспомогательных данных, как логин пользователя, идентификатор запроса для сквозного логирования, ip-адрес источника и авторизационые данные. Для реализации пробрасывания этих данных разработали библиотеку ViennaNET.CallContext, которая позволяет хранить данные из входящего в сервис запроса. При этом то, каким образом был сделан запрос, через очередь или через Http, не играет роли. Затем, перед отправкой исходящего запроса или сообщения, достаются данные из контекста и помещаются в заголовки. Таким образом, следующий сервис получает вспомогательные данные и аналогично ими распоряжается.

Спасибо за внимание, ждём ваших комментариев и pull request-ов!