Данная статья является продолжением опубликованной месяц назад статьи-размышлении "Легко ли добавлять новые фичи в старый фреймворк? Муки выбора на примере развития SObjectizer-а". В той статье описывалась задача, которую мы хотели решить в очередной версии SObjectizer-а, рассматривались два подхода к ее решению и перечислялись достоинства и недостатки каждого из подходов.

Прошло время, один из подходов был воплощен в жизнь и новые версии SObjectizer-а, а также сопутствующего ему проекта so_5_extra, уже, что называется «задышали полной грудью». Можно в буквальном смысле брать и пробовать.

Сегодня же мы поговорим о том, что было сделано, зачем это было сделано, к чему это привело. Если кому-то интересно следить за тем, как развивается один из немногих живых, кросс-платформенных и открытых акторных фреймворков для C++, милости прошу под кат.

С чего все началось?


Начиналось все с попытки решить проблему гарантированной отмены таймеров. Суть проблемы в том, что когда отсылается отложенное или периодическое сообщение, то программист может отменить доставку сообщения. Например:

auto timer_id = so_5::send_periodic<my_message>(my_agent, 10s, 10s, ...);
... // Что-то делаем.
// Понимаем, что периодическое сообщение my_message больше нам не нужно.
timer_id.release(); // Теперь таймер не будет отсылать my_message.

После вызова timer_id.release() таймер больше не будет отсылать новые экземпляры сообщения my_message. Но те экземпляры, которые уже были отосланы и попали в очереди получателей, никуда не денутся. Со временем они будут извлечены из этих самых очередей и будут переданы агентам-получателям для обработки.

Проблема эта является следствием базовых принципов работы SObjectizer-5 и не имеет простого решения из-за того, что SObjectizer не может изымать сообщения из очередей. Не может потому, что в SObjectizer очереди принадлежат диспетчерам, диспетчеры бывают разные, очереди у них также организованы по разному. В том числе бывают диспетчеры, которые не входят в состав SObjectizer-а и SObjectizer в принципе не может знать, как эти диспетчеры работают.

В общем, есть вот такая особенность у родных таймеров SObjectizer-а. Не то, чтобы она слишком уж портила жизнь разработчикам. Но некоторую дополнительную внимательность нужно проявлять. Особенно новичкам, которые только знакомятся с фреймворком.

И вот, наконец, руки дошли до того, чтобы предложить решение для этой проблемы.

Какой путь решения был выбран?


В предыдущей статье рассматривалось два возможных варианта. Первый вариант не требовал модификаций механизма доставки сообщений в SObjectizer-е, но зато требовал от программиста явным образом изменять тип отсылаемого/получаемого сообщения.

Второй вариант требовал модификации механизма доставки сообщений SObjectizer-а. Именно этот путь и был выбран, поскольку он позволял прятать от получателя сообщения тот факт, что сообщение было отослано каким-то специфическим образом.

Что изменилось в SObjectizer?


Новое понятие: конверт с сообщением внутри


Первая составляющая реализованного решения — это добавление в SObjectizer такого понятия, как конверт (envelope). Конверт — это специальное сообщение, внутри которого лежит актуальное сообщение (payload). SObjectizer доставляет конверт с сообщением до получателя почти что обычным способом. Принципиальная разница в обработке конверта обнаруживается лишь на самом последнем этапе доставки:

  • при доставке обычного сообщения у агента-получателя просто ищется обработчик для данного типа сообщения и, если такой обработчик найден, то найденный обработчик вызывается и ему отдается доставленное сообщение в качестве параметра;
  • а при доставке конверта с сообщением после того, как обработчик будет найден, сперва делается попытка достать сообщение из конверта. И только если конверт отдал хранящееся в нем сообщение, только тогда вызывается обработчик.

Здесь есть два ключевых момента, которые оказывают серьезнейшее влияние на то, для чего и как могут использоваться конверты с сообщениями.

Первый ключевой момент в том, что у конверта сообщение запрашивается только тогда, когда у получателя найден обработчик для сообщения. Т.е. только тогда, когда сообщение действительно доставлено до получателя и получатель вот прямо здесь и сейчас будет это сообщение обрабатывать.

Второй ключевой момент здесь в том, что конверт может не отдать находящееся в нем сообщение. Т.е., например, конверт может проверить текущее время и решить, что все сроки доставки были пропущены и, поэтому, сообщение перестало быть актуальным и обрабатывать его нельзя. Посему конверт не отдаст сообщение наружу. Соответственно, SObjectizer просто проигнорирует этот конверт и никаких дополнительных действий предпринимать не будет.

Что из себя представляет конверт?


Конверт — это реализация интерфейса envelope_t, который определен следующим образом:

class SO_5_TYPE envelope_t : public message_t
   {
   public:
      ... // Конструкторы-деструкторы.

      // Хук для случая, когда сообщение доставлено до получателя и
      // получатель готов обработать его.
      virtual void handler_found_hook(
         handler_invoker_t & invoker ) noexcept = 0;

      // Хук для случая, когда сообщение должно быть трансформированно
      // из одно представления в другое.
      virtual void transformation_hook(
         handler_invoker_t & invoker ) noexcept = 0;

   private :
      kind_t so5_message_kind() const noexcept override
         { return kind_t::enveloped_msg; }
   };

Т.е. конверт — это, по сути такое же сообщение, как и все остальные. Но со специальным признаком, который и возвращается методом so5_message_kind().

Программист может разрабатывать свои конверты наследуясь от envelope_t (или, что более удобно, от so_5::extra::enveloped_msg::just_envelope_t) и переопределяя методы-хуки handler_found_hook() и transformation_hook().

Внутри методов-хуков разработчик конверта решает, хочет ли он отдать находящееся внутри конверта сообщение для обработки/трансформации или не хочет. Если хочет, то разработчик должен вызвать метод invoke() и объекта invoker. Если не хочет, то не вызывает, в этом случае конверт и его содержимое будет проигнорированно.

Как с помощью конвертов решается проблема с отменой таймеров?


Решение, которое сейчас реализовано в so_5_extra в виде пространства имен so_5::extra::revocable_timer, очень простое: при особой отсылке отложенного или периодического сообщения создается специальный конверт, внутри которого находится не только само сообщение, но и атомарный флаг revoked. Если этот флаг сброшен, то сообщение считается актуальным. Если выставлен, то сообщение считается отозванным.

Когда у конверта вызывается метод-хук, то конверт проверяет значение флага revoked. Если флаг выставлен, то конверт не отдает сообщение наружу. Тем самым, обработка сообщения не выполняется даже если таймер уже успел поместить сообщение в очередь получателя.

Расширение интерфейса abstract_message_box_t


Добавление интерфейса envelope_t — это только одна часть реализации конвертов в SObjectizer. Вторая часть — это учет факта существования конвертов в механизме доставки сообщений внутри SObjectizer-а.

Тут, к сожалению, не обошлось без внесения видимых для пользователя изменений. В частности, в класс abstract_message_box_t, который определяет интерфейс всех почтовых ящиков в SObjectizer-е, потребовалось добавить еще один виртуальный метод:

virtual void do_deliver_enveloped_msg(
   const std::type_index & msg_type,
   const message_ref_t & message,
   unsigned int overlimit_reaction_deep );

Этот метод отвечает за доставку до получателя конверта message с сообщением типа msg_type внутри. Такая доставка может отличаться в деталях реализации в зависимости от того, что это за mbox.

При добавлении do_deliver_enveloped_msg() в abstract_message_box_t у нас был выбор: сделать его чистым виртуальным методом или же предложить какую-то реализацию по умолчанию.

Если бы мы сделали do_deliver_enveloped_msg() чистым виртуальным методом, то мы бы поломали совместимость между версиями SObjectizer в ветке 5.5. Ведь тогда тем пользователям, которые написали собственные реализации mbox-ов, пришлось бы при переходе на SObjectizer-5.5.23 модифицировать собственные mbox-ы, иначе бы не удалось пройти компиляцию с новой версией SObjectizer-а.

Нам этого не хотелось, поэтому мы не стали делать do_deliver_enveloped_msg() чистым виртуальным методом в v.5.5.23. Он имеет реализацию по умолчанию, которая просто бросает исключение. Т.о., кастомные пользовательские mbox-ы смогут нормально продолжать работу с обычными сообщениями, но будут автоматически отказываться принимать конверты. Мы сочли такое поведение более приемлемым. Тем более, что на начальном этапе вряд ли конверты с сообщениями будут применяться широко, да и маловероятно что в «дикой природе» часто встречаются кастомные реализации SObjectizer-овских mbox-ов ;)

Кроме того, существует далеко не нулевая вероятность, что в последующих мажорных версиях SObjectizer-а, где мы не будем оглядываться на совместимость с веткой 5.5, интерфейс abstract_message_box_t претерпит серьезные изменения. Но это мы уже забегаем далеко вперед…

Как отсылать конверты с сообщениями


Сам SObjectizer-5.5.23 не предоставляет простых средств отсылки конвертов. Предполагается, что под конкретную задачу разрабатывается конкретный тип конверта и соответствующие инструменты для удобной отсылки конвертов конкретного типа. Пример этого можно увидеть в so_5::extra::revocable_timer, где нужно не только отослать конверт, но и отдать пользователю специальный timer_id.

Для более простых ситуаций можно воспользоваться средствами из so_5::extra::enveloped_msg. Например, вот так выглядит отсылка сообщения с заданным ограничением на время его доставки:


// make создает экземпляр сообщения для доставки.
so_5::extra::enveloped_msg::make<my_message>(... /* Параметры для конструктора */)
   // envelope помещает созданное только что сообщение в конверт нужного типа.
   // Значение 5s передается в конструктор конверта вместе с экземпляром сообщения.
   .envelope<so_5::extra::enveloped_msg::time_limited_delivery_t>(5s)
   // А вот и отсылка конверта с сообщением адресату.
   .send_to(destination);

Чтобы было совсем весело: конверты в конвертах


Конверты предназначены для переноса внутри себя каких-то сообщений. Но каких?

Любых.

И это подводит нас к интересному вопросу: а можно ли вложить конверт внутрь другого конверта?

Да, можно. Сколько угодно. Глубина вложенности ограничена только здравым смыслом разработчика и глубиной стека для рекурсивного вызова handler_found_hook/transformation_hook.

При этом SObjectizer идет навстречу разработчикам собственных конвертов: конверт не должен думать о том, что у него внутри — конкретное сообщение или другой конверт. Когда у конверта вызывают метод-хук и конверт решает, что он может отдать свое содержимое, то конверт просто вызывает invoke() у handler_invoker_t и передает в invoke() ссылку на свое содержимое. А уже invoke() внутри сам разберется, с чем он имеет дело. И если это еще один конверт, то invoke() сам вызовет у этого конверта нужный метод-хук.

С помощью уже показанного выше инструментария из so_5::extra::enveloped_msg пользователь может сделать несколько вложенных конвертов вот таким образом:

so_5::extra::enveloped_msg::make<my_message>(...)
   // Конверт, который будет внутри и который содержит сообщение my_message.
   .envelope<inner_envelope_type>(...)
   // Конверт, который будет содержать конверт типа inner_envelope_type.
   .envelope<outer_envelope_type>(...)
   .send_to(destination);

Несколько примеров использования конвертов


Теперь, после того, как мы прошлись по внутренностям SObjectizer-5.5.23 пора бы уже перейти к более полезной для пользователей, прикладной части. Ниже рассматривается несколько примеров, которые либо базируются на том, что уже реализовано в so_5_extra, либо используют инструменты из so_5_extra.

Отзывные таймеры


Поскольку вся эта кухня с конвертами затевалась ради решения проблемы гарантированного отзыва таймерых сообщений, то давайте посмотрим, что в итоге получилось. Будем использовать пример из so_5_extra-1.2.0, который задействует инструменты из нового пространства имен so_5::extra::revocable_timer:

Код примера с отзывными таймерами
#include <so_5_extra/revocable_timer/pub.hpp>

#include <so_5/all.hpp>

namespace timer_ns = so_5::extra::revocable_timer;

class example_t final : public so_5::agent_t
{
   // Набор сигналов, которые мы будем использовать для отсылки
   // отложенных и периодического сообщения.
   struct first_delayed final : public so_5::signal_t {};
   struct second_delayed final : public so_5::signal_t {};
   struct last_delayed final : public so_5::signal_t {};

   struct periodic final : public so_5::signal_t {};

   // Идентификаторы для таймерных сообщений.
   timer_ns::timer_id_t m_first;
   timer_ns::timer_id_t m_second;
   timer_ns::timer_id_t m_last;
   timer_ns::timer_id_t m_periodic;

public :
   example_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) }
   {
      so_subscribe_self()
         .event( &example_t::on_first_delayed )
         .event( &example_t::on_second_delayed )
         .event( &example_t::on_last_delayed )
         .event( &example_t::on_periodic );
   }

   void so_evt_start() override
   {
      using namespace std::chrono_literals;

      // Отсылаем три сигнала как отложенные сообщения...
      m_first = timer_ns::send_delayed< first_delayed >( *this, 100ms );
      m_second = timer_ns::send_delayed< second_delayed >( *this, 200ms );
      m_last = timer_ns::send_delayed< last_delayed >( *this, 300ms );
      // ...и один как периодическое сообщение.
      m_periodic = timer_ns::send_periodic< periodic >( *this, 75ms, 75ms );

      // Блокируем агента на 220ms. За это время в очередь агента
      // должны попасть сигналы first_delaye, second_delayed и
      // несколько экземпляров сигнала periodic.
      std::cout << "hang the agent..." << std::flush;
      std::this_thread::sleep_for( 220ms );
      std::cout << "done" << std::endl;
   }

private :
   void on_first_delayed( mhood_t<first_delayed> )
   {
      std::cout << "first_delayed received" << std::endl;

      // Отменяем доставку second_delayed и periodic.
      // Агент не должен получить эти сигналы не смотря на то, что
      // они уже стоят в очереди сообщений агента.
      m_second.revoke();
      m_periodic.revoke();
   }

   void on_second_delayed( mhood_t<second_delayed> )
   {
      std::cout << "second_delayed received" << std::endl;
   }

   void on_last_delayed( mhood_t<last_delayed> )
   {
      std::cout << "last_delayed received" << std::endl;
      so_deregister_agent_coop_normally();
   }

   void on_periodic( mhood_t<periodic> )
   {
      std::cout << "periodic received" << std::endl;
   }
};

int main()
{
   so_5::launch( [](so_5::environment_t & env) {
      env.register_agent_as_coop( "example", env.make_agent<example_t>() );
   } );

   return 0;
}


Что мы здесь имеем?

У нас есть агент, который сперва инициирует несколько таймерных сообщений, а потом блокирует свою рабочую нить на некоторое время. За это время таймер успевает поставить в очередь агента несколько заявок в результате сработавших таймеров: несколько экземпляров periodic, по одному экземпляру first_delayed и second_delayed.

Соответственно, когда агент разблокирует свою нить, он должен получить первый periodic и first_delayed. При обработке first_delayed агент отменяет доставку periodic-а и second_delayed. Поэтому эти сигналы до агента доходить не должны вне зависимости от того, есть ли они уже в очереди агента или нет (а они есть).

Смотрим на результат работы примера:

hang the agent...done
periodic received
first_delayed received
last_delayed received

Да, так и есть. Получили первый periodic и first_delayed. Затем нет ни periodic-а, ни second_delayed.

А вот если в примере заменить «таймеры» из so_5::extra::revocable_timer на штатные таймеры из SObjectizer, то результат будет другой: до агента все-таки дойдут те экземпляры сигналов periodic и second_delayed, которые уже попали к агенту в очередь.

Сообщения с ограничениями на время доставки


Еще одна полезная, временами, штука, которая станет доступной в so_5_extra-1.2.0 — это доставка сообщения с ограничением по времени. Например, агент request_handler отсылает сообщение verify_signature агенту crypto_master. При этом request_handler хочет, чтобы verify_signature был доставлен в течении 5 секунд. Если это не произошло, то смысла в обработке verity_signature уже не будет, агент request_handler уже прекратит свою работу.

А агент crypto_master — это такой товарищ, который любит оказываться «бутылочным горлышком»: временами начинает притормаживать. В такие момент у него в очереди скапливаются сообщения, вроде вышеуказанного verify_signature, которые могут ждать до тех пор, пока crypto_master-у не полегчает.

Предположим, что request_handler отослал сообщение verify_signature агенту crypto_master, но тут crypto_master-у поплохело о он «залип» на 10 секунд. Агент request_handler уже «отвалился», т.е. уже отослал всем отказ в обслуживании и завершил свою работу. Но ведь сообщение verify_signature в очереди crypto_master-а осталось! Значит, когда crypto_master «отлипнет», то он возьмет данное сообщение и будет это сообщение обрабатывать. Хотя это уже не нужно.

С помощью нового конверта so_5::extra::enveloped_msg::time_limited_delivery_t мы можем решить данную проблему: агент request_handler отошлет verify_signature вложенное в конверт time_limited_delivery_t с ограничением на время доставки:

so_5::extra::enveloped_msg::make<verify_signature>(...)
   .envelope<so_5::extra::enveloped_msg::time_limited_delivery_t>(5s)
   .send_to(crypto_master_mbox);

Теперь если crypto_master «залипнет» и не успеет добраться до verify_signature за 5 секунд, то конверт просто не отдаст это сообщение на обработку. И crypto_master не будет делать работу, которая уже никому не нужна.

Отчеты о доставке сообщений до получателя


Ну и напоследок пример любопытной штуки, которая не реализована штатно ни в SObjectizer, ни в so_5_extra, но которую можно сделать самостоятельно.

Иногда хочется получать от SObjectizer-а что-то вроде «отчета о доставке» сообщения до получателя. Ведь одно дело, когда сообщение до получателя дошло, но получатель по каким-то своим причинам на него не среагировал. Другое дело, когда сообщение вообще до получателя не дошло. Например, было заблокировано механизмом защиты агентов от перегрузки. В первом случае сообщение, на которое мы не дождались ответа, можно не перепосылать. А вот во втором случае может иметь смысл перепослать сообщение спустя некоторое время.

Сейчас мы рассмотрим, как посредством конвертов можно реализовать простейший механизм «отчетов о доставке».

Итак, сначала сделаем необходимые подготовительные действия:

#include <so_5_extra/enveloped_msg/just_envelope.hpp>
#include <so_5_extra/enveloped_msg/send_functions.hpp>

#include <so_5/all.hpp>

using namespace std::chrono_literals;

namespace envelope_ns = so_5::extra::enveloped_msg;

using request_id_t = int;

Теперь мы можем определить сообщения, которые будут использоваться в примере. Первое сообщение — это запрос для выполнения каких-то нужных нам действий. А второе сообщение — это подтверждение того, что первое сообщение дошло до получателя:

struct request_t final
{
   request_id_t m_id;
   std::string m_data;
};

struct delivery_receipt_t final
{
   // Это значение request_t::m_id из соответствующего request_t.
   request_id_t m_id;
};

Далее мы можем определить агента processor_t, который будет обрабатывать сообщения типа request_t. Но обрабатывать будет с имитацией «залипания». Т.е. он обрабатывает request_t, после чего меняет свое состояние с st_normal на st_busy. В состоянии st_busy он ничего не делает и игнорирует все сообщения, которые к нему прилетают.

Это означает, что если агенту processor_t отослать подряд три сообщения request_t, то первое он обработает, а два других будут выброшены, т.к. при обработке первого сообщения агент уйдет в st_busy и проигнорирует то, что к нему будет приходить пока он находится в st_busy.

В st_busy агент processor_t проведет 2 секунды, после чего вновь вернется в st_normal и будет готов обрабатывать новые сообщения.

Вот как агент processor_t выглядит:

class processor_t final : public so_5::agent_t
{
   // Нормальное состояние агента. В этом состоянии выполняется
   // обработка входящих сообщений.
   state_t st_normal{this, "normal"};
   // Состояние "я занят". Новые сообщения игнорируются.
   state_t st_busy{this, "busy"};

public:
   processor_t(context_t ctx) : so_5::agent_t{std::move(ctx)}
   {
      this >>= st_normal;

      st_normal.event(&processor_t::on_request);

      // Для этого состояния нет подписок, но есть лимит времени.
      // Через 2 секунды после входа, автоматический возврат в st_normal.
      st_busy.time_limit(2s, st_normal);
   }

private:
   void on_request(mhood_t<request_t> cmd)
   {
      std::cout << "processor: on_request(" << cmd->m_id << ", "
            << cmd->m_data << ")" << std::endl;

      this >>= st_busy;
   }
};

Теперь мы можем определить агента requests_generator_t, у которого есть пачка запросов, которые нужно доставить до processor_t. Агент request_generator_t раз в 3 секунды отправляет всю пачку, а затем ждет подтверждения о доставке в виде delivery_receipt_t.

Когда delivery_recept_t приходит, агент requests_generator_t выбрасывает доставленный запрос из пачки. Если пачка совсем опустела, то работа примера завершается. Если же еще что-то осталось, то оставшаяся пачка будет отослана заново когда наступит следующее время перепосылки.

Итак, вот код агента request_generator_t. Он довольно объемный, но примитивный. Обратить внимание можно разве что на внутренности метода send_requests(), в котором отсылаются сообщения request_t, вложенные в специальный конверт.

Код агента requests_generator_t
class requests_generator_t final : public so_5::agent_t
{
   // Почтовый ящик обработчика запросов.
   const so_5::mbox_t m_processor;

   // Пачка запросов, для которых еще нет подтверждения о доставке.
   std::map<request_id_t, std::string> m_requests;

   struct resend_requests final : public so_5::signal_t {};

public:
   requests_generator_t(context_t ctx, so_5::mbox_t processor)
      :  so_5::agent_t{std::move(ctx)}
      ,  m_processor{std::move(processor)}
   {
      so_subscribe_self()
         .event(&requests_generator_t::on_delivery_receipt)
         .event(&requests_generator_t::on_resend);
   }

   void so_evt_start() override
   {
      // Формируем первоначальную пачку запросов.
      m_requests.emplace(0, "First");
      m_requests.emplace(1, "Second");
      m_requests.emplace(2, "Third");
      m_requests.emplace(3, "Four");

      // Начинаем рассылку.
      send_requests();
   }

private:
   void on_delivery_receipt(mhood_t<delivery_receipt_t> cmd)
   {
      std::cout << "request delivered: " << cmd->m_id << std::endl;
      m_requests.erase(cmd->m_id);

      if(m_requests.empty())
         // Запросов больше не досталось. Работу прекращаем.
         so_deregister_agent_coop_normally();
   }

   void on_resend(mhood_t<resend_requests>)
   {
      std::cout << "time to resend requests, pending requests: "
            << m_requests.size() << std::endl;

      send_requests();
   }

   void send_requests()
   {
      for(const auto & item : m_requests)
      {
         std::cout << "sending request: (" << item.first << ", "
               << item.second << ")" << std::endl;

         envelope_ns::make<request_t>(item.first, item.second)
               .envelope<custom_envelope_t>(so_direct_mbox(), item.first)
               .send_to(m_processor);
      }

      // Отложенное сообщение чтобы повторить отсылку через 3 секунды.
      so_5::send_delayed<resend_requests>(*this, 3s);
   }
};

Вот теперь у нас есть сообщения и есть агенты, которые с помощью этих сообщений должны общаться. Осталась самая малость — как-то заставить прилетать сообщения delivery_receipt_t при доставке request_t до processor_t.

Делается это с помощью вот такого конверта:

class custom_envelope_t final : public envelope_ns::just_envelope_t
{
   // Куда присылать отчет о доставке.
   const so_5::mbox_t m_to;

   // ID доставленного запроса.
   const request_id_t m_id;

public:
   custom_envelope_t(so_5::message_ref_t payload, so_5::mbox_t to, request_id_t id)
      :  envelope_ns::just_envelope_t{std::move(payload)}
      ,  m_to{std::move(to)}
      ,  m_id{id}
   {}

   void handler_found_hook(handler_invoker_t & invoker) noexcept override
   {
      // Раз этот хук вызван, значит сообщение до получателя дошло.
      // Можно отсылать отчет о доставке.
      so_5::send<delivery_receipt_t>(m_to, m_id);
      // Всю остальную работу делает базовый класс.
      envelope_ns::just_envelope_t::handler_found_hook(invoker);
   }
};

В общем-то, здесь нет ничего сложного. Мы наследуемся от so_5::extra::enveloped_msg::just_envelope_t. Это вспомогательный тип конверта, который хранит вложенное в него сообщение и предоставляет базовую реализацию хуков
handler_found_hook() и transformation_hook(). Поэтому нам остается только сохранить внутри custom_envelope_t нужные нам атрибуты и отослать delivery_receipt_t внутри хука handler_found_hook().

Вот, собственно, и все. Если запустить данный пример, то получим следующее:

sending request: (0, First)
sending request: (1, Second)
sending request: (2, Third)
sending request: (3, Four)
processor: on_request(0, First)
request delivered: 0
time to resend requests, pending requests: 3
sending request: (1, Second)
sending request: (2, Third)
sending request: (3, Four)
processor: on_request(1, Second)
request delivered: 1
time to resend requests, pending requests: 2
sending request: (2, Third)
sending request: (3, Four)
processor: on_request(2, Third)
request delivered: 2
time to resend requests, pending requests: 1
sending request: (3, Four)
processor: on_request(3, Four)
request delivered: 3

В качестве дополнения нужно сказать, что на практике такой простой custom_envelope_t для формирования отчетов о доставке вряд ли подойдет. Но если кому-то интересна эта тема, то ее можно обсудить в комментариях, а не увеличивать объем статьи.

Что еще можно было бы делать с помощью конвертов?


Отличный вопрос! На который у нас самих пока нет исчерпывающего ответа. Вероятно, возможности ограничиваются разве что фантазией пользователей. Ну а если для воплощения фантазий в SObjectizer-е чего-то не хватает, то об этом можно сказать нам. Мы всегда прислушиваемся. И, что немаловажно, временами даже делаем :)

Интеграция агентов с mchain-ами


Если же говорить чуть более серьезно, то есть еще одна фича, которую хотелось бы временами иметь и которая даже планировалась для so_5_extra-1.2.0. Но которая, скорее всего, в релиз 1.2.0 уже не попадет.

Речь идет о том, чтобы упростить интеграцию mchain-ов и агентов.

Дело в том, что первоначально mchain-ы были добавлены в SObjectizer для того, чтобы упростить общение агентов с другими частями приложения, которые написаны без агентов. Например, есть главный поток приложения, на котором с помощью GUI идет взаимодействие с пользователем. И есть несколько агентов-worker-ов, которые выполняют фоновую «тяжелую» работу. Отослать сообщение агенту из главного потока не проблема: достаточно вызвать обычный send. А вот как передать информацию назад?

Для этого и были добавлены mchain-ы.

Но со временем выяснилось, что mchain-ы могут играть гораздо большую роль. Можно, в принципе, делать многопоточные приложения на SObjectizer-е вообще без агентов, только на mchain-ах (подробнее здесь). А еще можно использовать mchain-ы как средство балансировки нагрузки на агентов. Как механизм решения проблемы producer-consumer.

Проблема producer-consumer заключается в том, что если producer генерирует сообщения быстрее, чем их может обрабатывать consumer, то нас ждут неприятности. Очереди сообщений будут расти, со временем может деградировать производительность или вообще произойдет вылет приложения из-за исчерпания памяти.

Обычное решение, которое мы предлагали использовать в этом случае — это использовать пару агентов collector-performer. Так же можно использовать и message limits (либо как основной механизм защиты, либо как дополнение к collector-performer). Но написание collector-performer требует дополнительной работы от программиста.

А вот mchain-ы могли бы использоваться для этих целей с минимальными усилиями со стороны разработчика. Так, producer бы помещал очередное сообщение в mchain, а consumer бы забирал сообщения из этого mchain.

Но проблема в том, что когда consumer — это агент, то агенту не очень удобно работать с mchain-ом посредством имеющихся функций receive() и select(). И вот это неудобство можно было бы попробовать устранить с помощью какого-то инструмента для интеграции агентов и mchain-ов.

При разработке такого инструмента нужно будет решить несколько задачек. Например, когда сообщение приходит в mchain, то в какой момент оно должно быть из mchain-а извлечено? Если consumer свободен и ничего не обрабатывает, то можно забрать сообщение из mchain-а сразу и отдать его агенту-consumer-у. Если consumer-у уже было отослано сообщение из mchain-а, он это сообщение еще не успел обработать, но в mchain уже приходит новое сообщение… Как быть в этом случае?

Есть предположение, что конверты могут помочь в этом случае. Так, когда мы берем первое сообщение из mchain-а и отсылаем его consumer-у, то мы оборачиваем это сообщение в специальный конверт. Когда конверт видит, что сообщение доставлено и обработано, он запрашивает следующее сообщение из mchain-а (если таковое есть).

Конечно, здесь все не так просто. Но пока что выглядит вполне решаемо. И, надеюсь, подобный механизм появится в одной из следующих версий so_5_extra.

Уж не ящик ли Пандоры мы собираемся открыть?


Нужно отметить, что у нас самих добавленные возможности вызывают двойственные чувства.

С одной стороны, конверты уже позволили/позволяют сделать вещи, о которых ранее говорилось (а о чем-то просто мечталось). Например, это гарантированная отмена таймеров и ограничение на время доставки, отчеты о доставки, возможность отзыва ранее отосланного сообщения.

С другой стороны, непонятно, к чему это приведет впоследствии. Ведь из любой возможности можно сделать проблему, если начать эту возможность эксплуатировать где нужно и где не нужно. Так может мы приоткрываем ящик Пандоры и сами еще не представляем, что нас ждет?

Остается только набраться терпения и посмотреть, куда это все нас приведет.

О ближайших планах развития SObjectizer-а вместо заключения


Вместо заключения хочется рассказать о том, каким мы видим самое ближайшее (и не только) будущее SObjectizer-а. Если кого-то что-то в наших планах не устраивает, то можно высказаться и повлиять на то, как SObjectizer-5 будет развиваться.

Первые бета-версии SObjectizer-5.5.23 и so_5_extra-1.2.0 уже зафиксированы и доступны для загрузки и экспериментов. К релизу нужно будет проделать еще много работы в области документации и примеров использования. Поэтому официальный релиз планируется в первой декаде ноября. Если получится раньше, сделаем раньше.

Релиз SObjectizer-5.5.23, судя по всему, будет означать, что эволюция ветки 5.5 подходит к своему финалу. Самый первый релиз в этой ветке состоялся четыре года назад, в октябре 2014-го. С тех пор SObjectizer-5 эволюционировал в рамках ветки 5.5 без каких-либо серьезных ломающих изменений между версиями. Это было непросто. Особенно с учетом того, что все это время нам приходилось оглядываться на компиляторы, в которых была далеко не идеальная поддержка C++11.

Сейчас мы уже не видим смысла оглядываться на совместимость внутри ветки 5.5 и, особенно, на старые C++ компиляторы. То, что можно было оправдать в 2014-ом, когда C++14 еще только готовились официально принять, а C++17 еще не было на горизонте, сейчас уже выглядит совсем по-другому.

Плюс к тому, в самом SObjectizer-5.5 уже накопилось изрядное количество граблей и подпорок, которые появились из-за этой самой совместимости и которые затрудняют дальнейшее развитие SObjectizer-а.

Поэтому мы в ближайшие месяцы собираемся действовать по следующему сценарию:

1. Разработка следующей версии so_5_extra, в которую хочется добавить инструментарий для упрощения написания тестов для агентов. Будет ли это so_5_extra-1.3.0 (т.е. с ломающими изменениями относительно 1.2.0) или это будет so_5_extra-1.2.1 (т.е. без ломающих изменений) пока не понятно. Посмотрим, как пойдет. Понятно только, что следующая версия so_5_extra будет базироваться на SObjectizer-5.5.

1a. Если для следующей версии so_5_extra потребуется сделать что-то дополнительное в SObjectizer-5.5, то будет выпущена следующая версия 5.5.24. Если же для so_5_extra не нужно будет вносить доработки в ядро SObjectizer-а, то версия 5.5.23 окажется последней значимой версией в рамках ветки 5.5. Мелкие bug-fix релизы будут выходить. Но само развитие ветки 5.5 прекращается на версии 5.5.23 или 5.5.24.

2. Затем будет выпущена версия SObjectizer-5.6.0, которая откроет новую ветку. В ветке 5.6 мы вычистим код SObjectizer-а от всех накопившихся костылей и подпорок, а также от старого хлама, который давным давно помечен, как deprecated. Вероятно, какие-то вещи подвергнуться рефакторингу (например, может быть изменен abstract_message_box_t), но вряд ли кардинальному. Основные же принципы работы и характерные черты SObjectizer-5.5 в SObjectizer-5.6 останутся в том же виде.

SObjectizer-5.6 будет требовать уже C++14 (хотя бы на уровне GCC-5.5). Компиляторы Visual C++ ниже VC++ 15 (который из Visual Studio 2017) поддерживаться не будут.

Ветка 5.6 рассматривается нами как стабильная ветка SObjectizer-а, которая будет актуальна до тех пор, пока не появится первая версия SObjectizer-5.7.

Релиз версии 5.6.0 хотелось бы сделать в начале 2019-го года, ориентировочно в феврале.

3. После стабилизации ветки 5.6 мы бы хотели начать работать над веткой 5.7, в которой можно было бы пересмотреть какие-то базовые принципы работы SObjectizer-а. Например, совсем отказаться от публичных диспетчеров, оставив только приватные. Переделать механизм коопераций и их взаимоотношений родитель-потомок, тем самым избавившись от узкого места при регистрации/дерегистрации коопераций. Убрать деление на message/signal. Оставить для отсылки сообщений только send/send_delayed/send_periodic, а методы deliver_message и schedule_timer упрятать «под капот». Модифицировать механизм диспетчеризации сообщений так, чтобы либо совсем убрать dynamic_cast-ы из этого процесса, либо свести их к самому минимуму.

В общем, тут есть где развернуться. При этом SObjectizer-5.7 уже будет требовать C++17, без оглядки на C++14.

Если смотреть на вещи без розовых очков, то хорошо, если релиз 5.7.0 состоится в конце осени 2019. Т.е. основной рабочей версией SObjectizer-а на 2019-й будет ветка 5.6.

4. Параллельно всему этому будет развиваться so_5_extra. Вероятно, вместе с SObjectizer-5.6 будет выпущена версия so_5_extra-2, которая на протяжении 2019-го года будет вбирать в себя новый функционал, но на базе SObjectizer-5.6.

Таким образом мы сами видим для SObjectizer-5 поступательную эволюцию с постепенным пересмотром каких-то из базовых принципов работы SObjectizer-5. При этом мы постараемся делать это как можно более плавно, чтобы можно было с минимальными болевыми ощущениями переходить с одной версии на другую.

Однако, если кто-то хотел бы от SObjectizer-а более кардинальных и значимых изменений, то у нас есть соображения на этот счет. Если совсем коротко: можно переделать SObjectizer как угодно, вплоть до того, чтобы реализовать SObjectizer-6 для другого языка программирования. Но делать это полностью за свой счет, как это происходит с эволюцией SObjectizer-5, мы не будем.

На этом, пожалуй, все. В комментариях к предыдущей статье получилось хорошее и конструктивное обсуждение. Нам было бы полезно, если бы подобное обсуждение случилось и в этот раз. Как всегда мы готовы ответить на любые вопросы, а на толковые, так и с удовольствием.

А самым терпеливым читателям, добравшимся до этих строк большое спасибо за потраченное на прочтение статьи время.

Комментарии (2)


  1. svr_91
    19.10.2018 11:03
    +1

    Насколько я помню, одной из проблем была интеграция отзываемых сообщений с limit-ами в chain-ах, чтобы отозванные сообщения не увеличивали size chain-а. Удалось это решить?


    1. eao197 Автор
      19.10.2018 11:05

      Было принято такое решение — что раз уж сообщение из очереди не изымается физически и место оно там все равно занимает, то пусть учитываются и в лимитах, и в размерах chain-ов. Так что сейчас, даже если сообщение отозвано, но все еще стоит в очереди, то в лимитах оно учитывается.

      Здесь ничего лучше не придумалось. И, вроде как, в обсуждении такой подход категорического неприятия не вызвал.