Пару дней назад мы зафиксировали версию 5.8.1 открытого проекта SObjectizer. В данной статье поговорим о новых возможностях, которые появились в SObjectizer благодаря пожеланиям пользователей, и упомянем исправление не выявленного ранее недочета. Кому интересно, милости прошу под кат.

Для тех же, кто ни разу не слышал про SObjectizer, очень кратко: это относительно небольшой C++17 фреймворк, который позволяет использовать в С++ программах такие подходы, как Actor Model, Publish-Subscribe и Communicating Sequential Processes (CSP). Основная идея, лежащая в основе SObjectizer, — это построение приложения из мелких сущностей-агентов, которые взаимодействуют между собой через обмен сообщениями. SObjectizer при этом берет на себя ответственность за:

  • доставку сообщений агентам-получателям внутри одного процесса;

  • управление рабочими нитями, на которых агенты обрабатывают адресованные им сообщения;

  • механизм таймеров (в виде отложенных и периодических сообщений);

  • возможности настройки параметров работы перечисленных выше механизмов.

Составить впечатление о этом инструменте можно ознакомившись вот с этой обзорной статьей.

Новый тип message sink-а: transform_then_redirect

Принципиальным нововведением релиза 5.8.0 были message sink-и: если раньше подписчиком для сообщения мог быть только агент, то сейчас можно сделать реализацию интерфейса abstract_message_sink_t и подписать на сообщение кого угодно. Подробнее об этой функциональности рассказывалось в предыдущей статье. Там же говорилось и о том, что со временем могут появиться неочевидные для нас применения message sink-ов...

Одно из таких применений не заставило себя ждать: issue #67: bind_and_transform. Пользователь хотел иметь возможность преобразовать одно сообщение в другое прямо в момент его отправки получателю. Допустим, у нас есть сообщение:

struct compound_data
{
   first_data_part m_first;
   second_data_part m_second;
};

Оно отсылается в mbox_A.

И есть агент F, который хочет получить не всё сообщение compound_data целиком, а только compound_data::m_first. Т.е. агент F хочет получить сообщение типа first_data_part:

class F : public agent_t
{
   ...
   void so_define_agent() override {
      so_subscribe_self().event([](const first_data_part & msg) {...});
   }
};

И вот тут возникает вопрос: как же сделать так, чтобы при отсылке сообщения compound_data в mbox_A произошло формирование сообщения first_data_part и его отправка напрямую агенту F?

Связать mbox_A с собственным mbox-ом агента F не сложно, для этого есть вспомогательные классы single_sink_binding_t и multi_sink_binding_t:

const so_5::mbox_t mbox_A = ... // Получение mbox-а для compound_data.
const so_5::mbox_t mbox_F = ... // Получение mbox-а агента F.

so_5::multi_sink_binding_t<> binding;
binding.bind<compound_data>(mbox_A, so_5::wrap_to_msink(mbox_F));

Но такая связь доставляет агенту F исходное сообщение compound_data, тогда как нужно доставить только compound_data::m_first.

Т.е. нужно трансформировать исходное сообщение в новое сообщение другого типа.

И тут можно вспомнить, что в одном месте в SObjectizer подобная трансформация уже есть. Она является частью механизма защиты агентов от перегрузки:

class message_limits_demo final : public so_5::agent_t
{
public:
   message_limits_demo(context_t ctx)
      : so_5::agent_t{ ctx +
            // Говорим SObjectizer-у, что если количество
            // ждущих в очереди сообщений типа compound_data
            // будет больше 3-х, то новые сообщения нужно
            // преобразовать и переслать на другой mbox.
            limit_then_transform(3u, [this](const compound_data & msg) {
               return so_5::make_transformed<first_data_part>(
                  // Куда отсылать.
                  new_destination_mbox(),
                  // А это параметры для конструирования нового
                  // экземпляра сообщения first_data_part.
                  msg.m_first);
            })
            + ... }
       {}
...
};

Т.е. у нас в SObjectizer уже есть специальный тип so_5::transformed_message_t<Msg> и вспомогательная функция so_5::make_transformed<Msg, ...Args> предназначенные для преобразования сообщений с их последующей переадресацией. Так почему бы этим не воспользоваться?

В результате появилась вспомогательная функция so_5::bind_transformer, которая позволяет связать лямбду-трансформатор с сообщением из конкретного mbox-а. Благодаря bind_transformer наша задача решается следующим образом:

const so_5::mbox_t mbox_A = ... // Получение mbox-а для compound_data.
const so_5::mbox_t mbox_F = ... // Получение mbox-а агента F.

so_5::multi_sink_binding_t<> binding;
so_5::bind_transformer(binding, mbox_A,
   [mbox_F](const compound_data & msg) {
      return so_5::make_transformed<first_data_part>(mbox_F, msg.m_first);
   });

Интересное впечатление оставила реализация этой фичи: по субъективным ощущениям (учет времени, понятное дело, не велся) проектирование и реализация заняла всего лишь около 1/10 от всех затрат. Т.е. на тестирование и документирование полученной реализации ушло чуть ли не на порядок больше времени. Да еще и при разработке тестов удалось свалить VC++ в internal compiler error, чего уже давненько на нашем коде видеть не приходилось ???? В общем, отличный пример того, что "сделать хорошо" (т.е. с тестами, примерами, описаниями) обходится реально в десять раз дороже чем "просто сделать".

Методы agent_t::so_this_agent_disp_binder() и agent_t::so_this_coop_disp_binder()

В класс so_5::agent_t было добавлено два метода, которые дают доступ к disp_binder-ам, связанным с агентом и его кооперацией. Потребоваться это может при создании дочерних коопераций.

Допустим, мы создаем родительскую кооперацию со thread_pool-диспетчером:

// Этот агент будет затем создавать дочерние кооперации.
class parent_agent final : public so_5::agent_t {
   ...
};

// Регистрируем родительскую кооперацию.
env.introduce_coop(
   // Этот диспетчер будет использоваться для родительской кооперации
   // по умолчанию (т.е. если агент не привязан к какому-то другому
   // диспетчеру явно, то будет использоваться этот диспетчер).
   so_5::disp::thread_pool::make_dispatcher(env, 8u).binder(),
   [&](so_5::coop_t & coop) {
      // Этот агент будет использовать собственный диспетчер.
      coop.make_agent_with_binder<parent_agent>(
         so_5::disp::one_thread::make_dispatcher(env).binder(),
         ...);

      // Этот агент будет привязан к thread_pool-диспетчеру
      // кооперации, поскольку собственного disp_binder-а ему
      // не дали.
      coop.make_agent<worker>(...);

      ... // Создание остальных агентов родительской кооперации.
   });

Нам нужно, чтобы новые дочерние кооперации были привязаны к тому же thread_pool диспетчеру, который использовался и для родительской кооперации. Сейчас для этого достаточно воспользоваться новым методом agent_t::so_this_coop_disp_binder():

void parent_agent::make_new_child_coop() {
   so_5::introduce_child_coop(*this,
      // Новая кооперация будет использовать тот же диспетчер,
      // что и родительская кооперация.
      so_this_coop_disp_binder(),
      [&](so_5::coop_t & coop) {
         ... // Наполнение дочерней кооперации.
      });
}

Метод so_this_agent_disp_binder() возвращает disp_binder, который использовался для агента. И этот disp_binder может отличаться от disp_binder-а кооперации. Так, если в нашем примере мы напишем:

void parent_agent::make_new_child_coop() {
   so_5::introduce_child_coop(*this,
      // Новая кооперация будет использовать тот же диспетчер,
      // что и этот конкретный агент.
      so_this_agent_disp_binder(),
      [&](so_5::coop_t & coop) {
         ... // Наполнение дочерней кооперации.
      });
}

то дочерняя кооперация будет привязана не к thread_pool-диспетчеру родительской кооперации, а к one_thread-диспетчеру агента parent_agent. Что, очевидно, даст принципиально другой эффект.

Нужно сказать, что эту фичу у нас первый раз попросили довольно давно, еще в 2020-ом году. Так что ждать, как и предсказывала народная мудрость, пришлось три года ????

В свое оправдание остается сказать, что в то время агент вообще не знал какой disp_binder используется для привязки агента к диспетчеру. Эта информация была только в кооперации, и для гипотетической реализации so_this_agent_disp_binder() потребовалось бы несколько усложнить реализацию взаимоотношений агента и его кооперации.

Однако не так давно, в версии 5.7.5, для устранения проблем с преждевременным удалением объектов агент получил disp_binder в собственное владение. Так что теперь реализация so_this_agent_disp_binder() стала тривиальной, что и было использовано при работе над версией 5.8.1.

Можно сказать, что старую просьбу мы не проигнорировали и не забыли, а удовлетворили когда представилась возможность.

Исправили то, на что долго не обращали внимание

В процессе работы над so_5::bind_transformer был выявлен досадный недочет на который никто в течении многих лет не обращал внимания: метод limit_then_transform нельзя было использовать с мутабельными сообщениями. Т.е. написать вот так:

class message_limits_demo final : public so_5::agent_t
{
public:
   message_limits_demo(context_t ctx)
      : so_5::agent_t{ ctx +
            // Говорим SObjectizer-у, что если количество
            // ждущих в очереди сообщений типа compound_data
            // будет больше 3-х, то новые сообщения нужно
            // преобразовать и переслать на другой mbox.
            limit_then_transform<so_5::mutable_msg<compound_data>>(3u,
               [this](compound_data & msg) {
                   return so_5::make_transformed<first_data_part>(
                      // Куда отсылать.
                      new_destination_mbox(),
                      // А это параметры для конструирования нового
                      // экземпляра сообщения first_data_part.
                      std::move(msg.m_first));
               })
            + ... }
       {}
...
};

до версии 5.8.1 было просто нельзя: возникла бы ошибка времени компиляции.

Как же так получилось, что этой проблеме уже много лет, а ее до сих пор никто не обнаружил?

Сперва мы недоглядели, потом никто не использовал limit_then_transform для мутабельных сообщений. Как-то так.

Но теперь нашли и исправили. Лучше поздно...

Заключение

Данный релиз отлично иллюстрирует мысль, которая ранее неоднократно озвучивалась в статьях про SObjectizer: если нам сообщить о том, чего вам в SObjectizer не хватает, то тогда есть шанс, что это когда-нибудь появится. Пусть даже и через три года ???? А вот если вы нам не расскажите что вам хотелось бы увидеть, то этого, скорее всего, и не будет.

Так что милости прошу озвучивать свои хотелки в issues или в discussions на GitHub-е. Или же в Google-группе.

Кстати говоря, если вам чего-то не хватает в SObjectizer-е, то можно заглянуть в проект-компаньон so5extra, мы там собрали разные полезные вещи, которые не хотели помещать в ядро SObjectizer-а.


С большим удовольствием упомяну серию статей о SObjectizer, которую начал публиковать Марко Арена (кто-то может знать его по Italian C++ Community). Найти эту серию можно в блоге Марко или на сайте dev.to. Мне самому было очень интересно читать написанные им статьи, как будто смотришь на давно привычные тебе вещи совсем с другой стороны. Так что рекомендую. На данный момент опубликовано три части, но это только начало.


И совсем в завершении, на правах саморекламы: изобретаю велосипеды для себя, могу изобрести и для вас.

Комментарии (0)