Лет 8 назад я начал работать в команде, которая разрабатывала один сервис. Интерфейс сервиса был достаточно прост, всего 4 метода, и выполнял он одну единственную задачу. В течение всего этого времени код постоянно изменялся: реализовались новые бизнес-правила и ограничения, добавлялась версионность. В один прекрасный момент, front-end‘у понадобился очень небольшой функционал, который был «зарыт» глубоко в сервисе. Реализация необходимой функции была разработана в виде компоненты и не представляло никаких проблем дать к ней доступ из сервиса через дополнительный метод… Кроме одной: нарушалась логическая связанность методов сервиса, то есть его «внутренности» начали становиться «внешностями».


Проблему можно было бы решить, если преобразовать все эти небольшие внутренние компоненты, к которым потребовался доступ извне, в отдельные сервисы. В таком случае, front-end мог бы получить доступ к их функционалу; основной же сервис стал бы более компактным и его роль сводилась к оркестровке вызовов.


Мы использовали WCF для построения сервисов. Разворачивать сервис в 50 строчек кода на WCF, как минимум на 3-4 серверах, с load-balancer‘ом, новыми URL‘ами и прочими наворотами, казалось не очень хорошей идеей. А хотелось какой-то легкости, перспективы…


Несколько лет спустя я принимал участие в другом проекте на Workflow Foundation. Глядя на то, что получалось в XAML-редакторе, я подумал: «А почему-бы не представить весь workflow, как последовательность сообщений»?


Kinoпробы


Поиск по имеющимся решениям, честно говоря, я не делал. На тот момент (4-5 лет назад) об „Orleans“ было мало что известно, а об Akka я узнал уже после начала велосипедостроения. С одной стороны, это плохо, недостойно профессионального разработчика и все такое. С другой стороны, могло получиться что-то новенькое… Насколько хорошо или плохо все получилось, может судить уважаемый читатель.


Итак, я занялся созданием kino: actor-like communication framework на NetMQ. Суффикс «-like» потому, что классические актеры имеют иерархическую организацию, супервизоров, они stateful и, вообще, целая математическая модель там у них… Тут все проще, но, тем не менее, актеры будут и у нас.


Вкратце, что здесь к чему


Основным средством общения в kino является сообщение. Каждое сообщение имеет версию и тип, которые используются для поиска соответствующего обработчика. Есть небольшое отклонение от правила, но пока не будем об этом.


Актеры (Actors) — основные потребители и производители сообщений. Actor объявляет свой интерфейс, указывая тип и версию сообщения, которые он может получать. Есть еще один участник массовки, MessageHub, который также может получать и отправлять сообщения. Однако, между ними есть определенные различия. Actor нужно рассматривать, как сервис: он может ответить только при получении входящего сообщения. MessageHub – это клиент, который может отправить сообщение и (попытаться) получить ответное сообщения, если необходимо. Итак, чаще всего, начальное сообщение отправляется через MessageHub и обрабатывается одним или несколькими Actors.


Для поиска адресатов сообщений необходим MessageRouter. Он хранит таблицу маршрутизации — соответствие версии (Version) и типа сообщения (Identity) со списком Actors, которые его могут обработать. Для одного процесса достаточно одного MessageRouter‘а.


Для выхода за рамки одного процесса/хоста нам необходимо получить знание о внешнем мире, то есть о других MessageRouter‘ах и их таблицах маршрутизации. Источником для получения такого знания является Rendezvous сервис. Это – единственный well-known адрес, который должен быть сконфигурирован для приложения на базе kino. Rendezvous принимает от всех и раздает всем подключенным MessageRouter‘ам информацию о добавлении новых и удалении несуществующих маршрутов, ping‘ует установленные подключения. Rendezvous сервис формирует единую сеть компонентов kino.


Тоже, но более детально


1. Message


Так выглядит типичное сообщение, которое можно отправить гулять по сети kino:


public class MyMessage : Payload
{
    private static readonly byte[] MessageIdentity = "NAMESPACE.MYMESSAGE".GetBytes();
    private static readonly byte[] MessageVersion = "1.0".GetBytes();

    // Здесь идут свойства сообщения, т.е. то, что мы в итоге хотим передать

    public override byte[] Version => MessageVersion;
    public override byte[] Identity => MessageIdentity;
}

Поддерживается 3 способа распределения (Distribution Pattern) сообщений: unicast, broadcast и direct. В первом случае сообщение отправляется только одному обработчику, зарегистрированному в сети. Во втором – всем.


IPayload payload = new MyMessage();
IMessage message = Message.Create(payload, DistributionPattern.Broadcast);

В случае direct distribution, который может быть особо полезен при тестировании, сообщение отправляется конкретному MessageRouter‘у:


IMessage message = Message.CreateFlowStartMessage(new MyMessage());
message.SetReceiverNode(receiverIdentity);
// Теперь можно отправлять сообщение

Добраться до данных в полученном сообщении можно следующим образом:


MyMessage payload = message.GetPayload<MyMessage>();

2. Actors


Для создания своего актера необходимо унаследовать класс от Actor и реализовать в нем хотя бы один метод-обработчик сообщения:


public class MyMessageProcessor : Actor
{
    [MessageHandlerDefinition(typeof (MyMessage))]
    public async Task<IActorResult> MyMessageHandler(IMessage message)
    {
        // тело метода
    }

    [MessageHandlerDefinition(typeof (OtherMessage))]
    public Task<IActorResult> OtherMessageHandler(IMessage message)
    {
        // тело метода
    }
}
```cs

Все актеры по умолчанию регистрируются глобально, то есть доступны во всей сети *kino*. Если вы хотите обрабатывать сообщения только в локальном процессе, можно объявить регистрацию обработчиков локальной:

```cs
public class LocalMessageProcessor : Actor
{
    // Данная регистрация доступна только локально
    [MessageHandlerDefinition(typeof (LocalMessage), true)]
    public async Task<IActorResult> MyMessageHandler(IMessage message)
    {
        // тело метода
    }
}

Фреймворк гарантирует, что метод-обработчик актера получит сообщения только того типа, который задекларирован. Возвращаемый же результат может представлять из себя одно или несколько сообщений любых типов, причем, с разными distribution pattern. То есть, мы можем послать ответ начальному отправителю и одновременно сообщить всем остальным еще о чем-нибудь:


public class MyMessageProcessor : Actor
{
    [MessageHandlerDefinition(typeof (MyMessage))]
    public async Task<IActorResult> MyMessageHandler(IMessage message)
    {
        MyMessage payload = message.GetPayload<MyMessage>();

        var result = await UpdateDb(payload);

        IMessage response = Message.Create(new ResponseMessage(result));
        IMessage notifyRequest = Message.Create(new NotifyMessage(result), DistributionPattern.Broadcast);

        return new ActionResult(response, notifyRequest);
    }    
}

3. ActorHost


Об ActorHost мы еще не говорили. Это компонент, который выполняет несколько функций:


  • хранит ссылки на методы-обработчики всех зарегистрированных у него Actors
  • регистрирует обработчики в MessageRouter
  • принимает сообщения на обработку от MessageRouter к Actors и отправляет ответы обратно в MessageRouter.

Вызов методов-обработчиков в ActorHost происходит в одном потоке (исключение составляют асинхронные методы). Поэтому, ActorHost не поддерживает множественные регистрации обработчиков одного и того же сообщения. Если же необходимо масштабирование одного и того же типа Actor в рамках одного процесса, требуется создание нового экземпляра ActorHost для каждого из них. Все эти сложности по выбору и созданию ActorHosts берет на себя ActorHostManager:


// Создаем первый экземпляр актера MyActor
IActor actor = new MyActor();
// По умолчанию, выбирается первый доступный ActorHost
actorHostManager.AssignActor(actor);
// Нам показалось, что одного мало
actor = new MyActor();
// Указываем, что нам обязательно нужен второй экземпляр MyActor
actorHostManager.AssignActor(actor, ActorHostInstancePolicy.AlwaysCreateNew);

4. MessageHub


Вернемся немного назад, с чего все началось. А началось с того, что появилась необходимость разнести код из одного WCF-сервиса в несколько доступных по сети компонент. В результате, вместо вызовов сотни методов в одном процессе у нас получился некий поток сообщений (message flow), которые, вдобавок, путешествуют по разным серверам. Тем не менее, и функционал и поведение для конечного пользователя сервиса должны, в идеале, оставаться прежними. То есть, если раньше клиент вызывал метод сервиса синхронно и ожидал получение ответа, то со всем этим kino паттерн работы клиента не должен поменяться кардинальным образом. Необходимо из всего этого потока сообщений определить, что является ответом клиенту и доставить его обратно.


MessageHub как раз призван решить эту задачу. С его помощью можно отправить сообщение в сеть kino, не дожидаясь ответа:


IPayload payload = message.GetPayload<MyMessage>();
IMessage message = Message.CreateFlowStartMessage(payload);
messageHub.SendOneWay(message);

А можно так же указать, что отправитель ожидает определенный ответ:


// Создаем сообщение, представляющее запрос
IMessage request = Message.CreateFlowStartMessage(new StartMessage());
// Говорим, что мы заинтересованы в получении ответа определенного типа
ICallbackPoint callbackPoint = CallbackPoint.Create<ResultMessage>();

// Теперь отправляем сообщение и ожидаем результат
using(IPromise promise = messageHub.EnqueueRequest(request, callbackPoint))
{
    if(promise.GetResponse().Wait(timeout))
    {
    // Обрабатываем полученный результат
        ResultMessage result = promise.GetResponse().Result.GetPayload<ResultMessage>();       
    }
    else
    {
    // Попробуем снова…
    }
}

5. MessageRouter


MessageRouter представляет собой узел в сети kino. К нему подключаются другие компоненты, ActorHosts и MessageHubs, для обмена сообщениями. В свою очередь, MessageRouters находят подобных себе и подключаются друг к другу с помощью Rendezvous сервиса, формируя таким образом сеть kino.


В качестве транспорта в kino используется библиотека NetMQ. Она, практически, вбита во фреймворк гвоздями и использовать другой транспорт не планировалось.


Итак, маршрутизация сообщений. Она осуществляется по следующим алгоритмам:


Unicast-сообщение:


НАЙТИ локально зарегистрированный ActorHost или MessageHub, которые могут обработать сообщение
ЕСЛИ найден
    Отправить сообщение на обработку
ИНАЧЕ
    НАЙТИ MessageRouter в сети, который может обработать сообщение
    ЕСЛИ найден
        Отправить сообщение на обработку
    ИНАЧЕ
        Сообщение не обработано!

Broadcast-сообщение:


НАЙТИ все локально зарегистрированные ActorHosts и MessageHubs, которые могут обработать сообщение
ЕСЛИ найдены
    Отправить сообщение на обработку
ЕСЛИ broadcast-сообщение отправлено из локально зарегистрированного Actor
    НАЙТИ все MessageRouter в сети, которые могут обработать сообщение
    ЕСЛИ найдены
        Отправить сообщение на обработку
ЕСЛИ не найдено ни одного обработчика локально или удаленно
    Сообщение не обработано!

Direct-сообщение:


ЕСЛИ unicast-сообщение
    НАЙТИ MessageRouter в сети, указанный в сообщении, как получатель (ReceiverNode), который может обработать сообщение
    ЕСЛИ найден
        Отправить сообщение на обработку
    ИНАЧЕ
        Сообщение не обработано!
ИНАЧЕ
    <маршрутизация broadcast-сообщения>

6. Rendezvous


Rendezvous сервис – единственный well-known сервис, адрес которого должен быть сконфигурирован для всех узлов одной сети kino. Он выполняет следующие функции:


  • broadcast-рассылка сообщений об изменениях в маршрутизации: добавление новых и удаление недействительных маршрутов,
  • broadcast-рассылка PING сообщений для мониторинга подключенных узлов,
  • broadcast-рассылка ответных PONG сообщений от подключенных узлов.

При необходимости, Rendezvous сервис можно установить на кластер серверов. Выбранный на основании консенсуса лидер отвечает за все вышеперечисленные функции. В случае «падения» кластера, сеть kino продолжит работу. Однако, информация об изменениях в маршрутизации будет недоступна. Когда работа Rendezvous сервиса будет восстановлена, узлы получат обновление конфигурации сети.


Открытые вопросы


  • Ну, собственно говоря, увидеть что-то в продакшине. Пока до этого не дошло…
  • Как работать с сообщениями разного wire-формата в одной сети
  • Возможные проблемы при большом количестве подключений к Rendezvous сервису, пакетная обработка PONG-сообщений
  • Объединение нескольких сетей kino, то есть маршрутизация между узлами, подключенными к разным Rendezvous серверам/кластерам

Проект kino на Github'е: https://github.com/iiwaasnet/kino
Wiki: https://github.com/iiwaasnet/kino/wiki

Поделиться с друзьями
-->

Комментарии (4)


  1. Ogoun
    23.05.2016 14:14

    Может невнимательно прочитал, если сообщение обрабатывается больше чем одним получателем, и все возвращают результат, то ResultMessage будет содержать все результаты?
    Когда делал свою реализацию обмена сообщениями (правда только в рамках процесса, но с возможностью общения между доменами), то в качестве результата отправки сообщения делал объект который позволяет получить набор из любого количества ответов, с указанием от какого из обработчиков ответ получен.
    Еще показалось удобным сделать подписку на окончание обработки сообщения. Пример применения, появился файл в каталоге, отправили сообщение, кто-то обработал файл, получаем сообщение от роутера что все получатели сообщений закончили работу, и можем удалить/заархивировать файл.


    1. aosja
      23.05.2016 15:08

      Если будет отправлено несколько одинаковых сообщений-результатов, на тип которых подписан отправитель, то он получит только одно — первое пришедшее. Дело в том, что в противном случае не известно сколько всего ждать ответов и как долго нужно хранить promise в коллекции.
      В приведенном вами примере, если обработка выполняется только одним актером, то он как раз и отправит это уведомдение об окончании работы.


      1. Ogoun
        23.05.2016 15:26

        Так роутер же знает количество подписчиков, значит можно реализовать и ожидание пока все не ответят, и отваливание по таймауту. Хотя я рассматриваю больше с позиции издатель подписчик, а акторы могут в ответ на сообщение создать свое, и ожидание ответов ненужная опция получается.


        1. aosja
          23.05.2016 15:54

          Да, роутер знает. Но, подписка на ответ (promise) хранится не на MessageRouter, а на MessageHub. Кроме того, MessageRouter может иметь устаревшую информацию о доступных актерах, например, кто-то уже мог отвалиться от сети, но таймаут, по которму удаляется маршрут, еще не наступил.
          Я не говорю, что ваш случай «неправильный», просто объясняю, как сейчас сделано :)
          Если вы имеете в виду, что сообщение о результате, должно быть отправлено только при получении определенного количества промежуточных сообщений, что-то типа map-reduce, то это можно сделать с помощью дополнительного актера и CorrelationId. У меня был рабочий пример, но получается немного сложновато.