Разработка бесплатного фреймворка для нужд разработчиков — это специфическая тема. Если при этом фреймворк живет и развивается довольно долго, то специфики прибавляется. Сегодня я попробую показать это на примере попытки расширить функциональность «акторного» фреймворка для C++ под названием SObjectizer.

Дело в том, что фреймворк этот уже довольно таки старый, несколько раз менялся кардинально. Даже его текущая инкарнация, SObjectizer-5, претерпела множество изменений, как серьезных, так и не очень. Плюс к тому, мы достаточно трепетно относимся к совместимости и внесение ломающих совместимость изменений для нас — это слишком серьезный шаг, чтобы на него просто так решиться.

Прямо сейчас нам нужно решить как именно добавить новую фичу в очередную версию. В процессе поиска подходящего решения вырисовалось два варианта. Оба выглядят вполне себе реализуемыми. Но очень уж сильно отличаются друг от друга. Как по сложности и трудоемкости реализации, так и по своему «внешнему виду». Т.е. то, с чем будет иметь дело разработчик, в каждом из вариантов будет выглядеть по-разному. Наверное, даже принципиально по-разному.

И вот нам, как разработчикам фреймворка, приходится делать выбор в пользу одного или другого решения. Или же нужно признать, что ни одно из них удовлетворительным не является и, поэтому, нужно придумывать что-то другое. Такие решения за время истории SObjectizer-а приходилось принимать неоднократно. Если кому-то интересно почувствовать себя в шкуре разработчика подобного фреймворка, то милости прошу под кат.

Исходная проблема


Итак, коротко суть исходной проблемы. В SObjectizer, с самого начала его существования, была следующая особенность: таймерное сообщение не так-то легко отменить. Под таймерным далее будет пониматься, в первую очередь, отложенное сообщение. Т.е. сообщение, которое не сразу должно быть отослано получателю, а спустя какое-то время. Например, мы делаем send_delayed с паузой в 1s. Это говорит о том, что реально сообщение будет отослано по таймеру через 1s после вызова send_delayed.

Отложенное сообщение, в принципе, можно отменить. Если сообщение все еще находится во владении таймера, то сообщение после отмены никуда не уйдет. Оно будет выброшено таймером и все. Но вот если таймер уже отослал сообщение и оно сейчас находится в очереди заявок для агента-получателя, то отмена таймера не сработает. Нет в SObjectizer-е механизма изъять сообщение из очереди заявок.

Проблема усугубляется, как минимум, двумя факторами.

Во-первых, в SObjectizer-е поддерживается доставка в режиме 1:N, т.е. если сообщение было отослано в Multi-Consumer mbox, то сообщение будет стоять не в одной очереди, а сразу в нескольких очередях для N получателей.

Во-вторых, в SObjectizer-е используется механизм диспетчеров и диспетчеры могут быть самыми разными, в том числе и написанными пользователем под свои специфические нужды. Очередями заявок управляют диспетчеры. И в интерфейсе диспетчера не заложена функциональность изъятия заявки, которая уже была передана диспетчеру. Но даже если такая функциональность в интерфейс была бы заложена, то далеко не факт, что ее можно было бы реализовать эффективно во всех случаях. Не говоря уже о том, что такая функциональность бы увеличила трудоемкость разработки новых диспетчеров.

В общем, объективно, если таймер уже сделал отсылку отложенного сообщения получателю(ям), то заставить SObjectizer не доставлять этот экземпляр сообщения, на данный момент невозможно.
На самом деле эта проблема актуальна и для периодических сообщений (т.е. сообщений, которые таймер должен отсылать периодически через заданные интервалы времени). Но на практике отмена периодических сообщений нужна гораздо реже, чем отмена отложенного сообщения. По крайней мере в нашей практике это так.

Что можно сделать прямо сейчас?


Итак, проблема эта не нова и уже давно существуют рекомендации о том, как с ней можно бороться.

Уникальный id внутри отложенного сообщения


Самый простой способ — это ведение счетчика. У агента есть счетчик, при отсылке отложенного сообщения в сообщении отсылается текущее значение счетчика. При отмене сообщения счетчик у агента инкрементируется. При получении сообщения сравнивается текущее значение счетчика в агенте со значением из сообщения. Если значения не совпали, то сообщение отвергается:

class demo_agent : public so_5::agent_t {
   struct delayed_msg final {
      int id_;
      ...
   };

   int expected_msg_id_{};
   so_5::timer_id_t timer_;

   void on_some_event() {
      // Отсылаем отложенное сообщение.
      // Делаем вызов send_periodic, т.к. только эта функция
      // возвращает timer_id для последующей отмены.
      timer_ = so_5::send_periodic<delayed_msg>(*this,
            25s, // Через сколько сообщение должно прийти.
            0s, // Повторять сообщение не нужно.
            // Далее идут параметры для конструктора delayed_msg,
            // первым из которых будет уникальный id для этого сообщения.
            ++expected_msg_id_,
            ... // Остальные значения.
            );
      ...
   }

   void on_cancel_event() {
      // Тут мы понимаем, что отложенное сообщение нам больше не нужно
      // и отменяем его. В два шага:
      timer_.reset(); // Таймер не будет обрабатывать сообщение.
      ++expected_msg_id_; // Обеспечиваем несовпадение id-шников.
      ...
   }

   void on_delayed_msg(mhood_t<delayed_msg> cmd) {
      // Обрабатываем сообщение только если id за прошедшее время
      // не изменился.
      if(expected_msg_id_ == cmd->id_) {
         ... // Обработка сообщения.
      }
   }
};

Проблема этого способа в том, что разработчику агента нужно озадачится ведением этих счетчиков. А если в качестве отложенного сообщения нам нужно отсылать чужое сообщение, которое делал кто-то другой, и в котором нет поля id_, то мы оказываемся в непростой ситуации.

Хотя, с другой стороны, это самый эффективный способ из существующих на данный момент.

Использовать уникальный mbox для отложенного сообщения


Еще один способ, который хорошо работает, — это использование уникального почтового ящика (mbox-а) для отложенного сообщения. В этом случае мы создаем новый mbox для каждого отложенного сообщения, подписываемся на него и отсылаем отложенное сообщение в этот mbox. Когда сообщение нужно отменить, то мы просто удаляем подписки на mbox.

class demo_agent : public so_5::agent_t {
   struct delayed_msg final {
      ... // Здесь поле id_ уже не нужно.
   };

   so_5::mbox_t timer_mbox_; // Куда отсылаем сообщение.
   so_5::timer_id_t timer_;

   void on_some_event() {
      // Для отсылки отложенного сообщения нам нужен новый mbox
      // и созданные для него подписки.
      timer_mbox_ = so_environment().create_mbox();
      some_state.event(time_mbox_, ...);
      another_state.event(time_mbox_, ...);
      ...
      // Теперь можно отослать сообщение.
      timer_ = so_5::send_delayed<delayed_msg>(
            so_environment(),
            timer_mbox_, // Обязательно указываем куда идет сообщение.
            25s,
            0s,
            ... // Параметры для конструктора delayed_msg.
            );
   }

   void on_cancel_event() {
      // Отменяем таймер и убираем подписки на временный mbox.
      timer_.reset();
      so_drop_subscription_for_all_states(timer_mbox_);
   }

   void on_delayed_msg(mhood_t<delayed_msg> cmd) {
      // Тут просто обрабатываем сообщения зная, что это
      // сообщение не было отменено.
      ...
   }
};

Этот способ уже может работать с чужими сообщениями, внутри которых нет никакого уникального идентификатора. Но он так же требует труда и внимания от разработчика.

Например, в показанном выше варианте нет защиты от того, что одно отложенное сообщение уже было отослано ранее. По-хорошему, перед отсылкой нового отложенного сообщения нужно всегда выполнять действия из on_cancel_event(), иначе у агента будут оставаться ненужные ему подписки.

Почему эта проблема не была решена ранее?


Тут все довольно просто: на самом деле это не такая серьезная проблема, как может показаться. По крайней мере, в реальной жизни с ней сталкиваться приходится не часто. Обычно отложенные и периодические сообщения не отменяются вообще (именно поэтому, кстати говоря, функция send_delayed не возвращает timer_id). А когда необходимость в отмене возникает, то можно воспользоваться одним из описанных выше способов. Или даже использовать какой-то другой. Например, создавать отдельных агентов, которые будут обрабатывать отложенное сообщение. Этих агентов можно дерегистрировать когда отложенное сообщение требуется отменить.

Так что на фоне других задач, которые стояли перед нами, упрощение гарантированной отмены отложенного сообщения не было столь приоритетным, чтобы тратить свои ресурсы на решение этой проблемы.

Почему проблема стала актуальной сейчас?


Тут так же все просто. С одной стороны, наконец-то дошли руки.

С другой стороны, когда SObjectizer начинают использовать новые люди, не имевшие опыта работы с ним, то эта особенность с отменой таймеров их сильно удивляет. Не то, чтобы приятно удивляет. А раз так, то хотелось бы минимизировать негативные впечатления от знакомства с нашим инструментом.

Кроме того, нас были свои задачи, нам не нужно было постоянно отменять отложенные сообщения. А у новых пользователей свои задачи, может там все наоборот.

Новая постановка задачи


Практически сразу, как только началось обдумывание возможности «гарантированной отмены таймера» в голову пришла мысль, что задачу можно расширить. Можно попробовать решить задачу отзыва вообще любого из ранее отосланных сообщений, не обязательно отложенных и периодических.

Время от время данная возможность оказывается востребованной. Например, представим себе, что у нас есть несколько взаимодействующих агентов двух типов: entry_point (принимает запросы от клиентов), и processor (обрабатывает запросы):



Агенты entry_point отсылают запросы агенту processor, тот их по мере сил обрабатывает и отвечает агентам entry_point. Но временами entry_point может обнаружить, что обработка ранее отосланного запроса больше не нужна. Например, клиент прислал команду cancel или же клиент «отвалился» и обрабатывать его запросы уже не нужно. Сейчас, если сообщения request стоят в очереди агента processor, то отозвать их нельзя. А было бы полезно.

Поэтому нынешний заход к решению проблемы «гарантированной отмены таймера» выполняется именно как добавление поддержки «отзывных сообщений». Специальным образом отсылаем любое сообщение, получаем на руки некий хендл, с помощью которого затем сообщение можно отозвать. И уже не столь принципиально, отзывается ли обычное сообщение или отложенное.

Попытка придумать реализацию «отзывных сообщений»


Итак, нужно ввести понятие «отзывного сообщения» и поддержать это понятие в SObjectizer. Причем так, чтобы остаться в рамках ветки 5.5. Первая версия из этой ветки, 5.5.0, вышла практически четыре года назад, в октябре 2014-го. С тех пор каких-то серьезных ломающих изменений в 5.5 не было. Проекты, которые уже перешли или же сразу стартовали на SObjectize-5.5 могут переходить на новые релизы в ветке 5.5 без каких-либо проблем. Такую совместимость нужно сохранить и в этот раз.

В общем, все просто: нужно взять и сделать.

Что понятно как делать


После первого подхода к проблеме стали понятны две вещи по поводу реализации «отзывных сообщений».

Атомарный флаг и его проверка перед обработкой сообщения


Во-первых, очевидно, что в рамках текущей архитектуры SObjectizer-5.5 (а может и более глобально: в рамках принципов работы самого SObjectizer-5) нельзя изымать сообщения из очередей заявок диспетчеров, где сообщения дожидаются, пока агенты-получатели обработают их. Попытка сделать это убьет всю идею разнородных диспетчеров, которые даже пользователь может делать собственные, под специфику своей задачи (например, вот такой). Кроме того, в случае рассылки сообщения в режиме 1:N, где N будет большим, дорого будет хранить список указателей на экземпляр отосланного сообщения во всех очередях.

Это означает, что вместе с сообщением должен передаваться какой-то атомарный флажок, который нужно будет анализировать сразу после того, как сообщение извлекается из очереди заявок, но перед тем, как сообщение передается на обработку агенту-получателю. Т.е. сообщение попадает в очередь и никуда оттуда оно уже не изымается. Но когда до сообщения доходит очередь, проверяется его флаг. И если флаг говорит, что сообщение отозвано, то обработка сообщения не выполняется.

Соответственно, сам отзыв сообщения заключается в выставлении специального значения для атомарного флага внутри сообщения.

Объект revocable_handle_t<M>


Во-вторых, пока(?) очевидно, что для отсылки отзывного сообщения должны применяться не обычные методы отсылки сообщений, а специальный объект под условным названием revocable_handle_t.

Для того, чтобы отослать отзывное сообщение пользователь должен создать экземпляр revocable_handle_t, после чего вызвать у этого экземпляра метод send. А если сообщение нужно отозвать, то это делается посредством метода revoke. Что-то вроде:

struct my_message {...};
...
so_5::revocable_handle_t<my_message> msg;
// Конструируем и отсылаем сообщение.
msg.send(target, // Куда отсылается.
    ... // Параметры для конструктора my_message.
    );
...
// Захотели отозвать сообщение.
msg.revoke();

Четких деталей реализации revocable_handle_t пока нет, что не удивительно, т.к. сам механизм работы отзывных сообщений пока еще не выбран. Но принцип работы состоит в том, что в revocable_handle_t сохраняется умная ссылка на отосланное сообщение и на атомарный флаг для него. В методе revoke() происходит попытка заменить значение флага. Если это удается, то сообщение после извлечения из очереди заявок, подвергаться обработке уже не будет.

С чем это не будет дружить


К сожалению, есть пара вещей, с которыми отзыв сообщений не получится нормально увязать. Как раз из-за того, что отозванное сообщение продолжает оставаться в тех очередях, куда оно уже попало.

message_limits


Такая важная фича SObjectizer-а, как message_limits, предназначена для защиты агентов от перегрузки. Работают message_limits на основе подсчета сообщений в очереди. Поставили сообщение в очередь — увеличили счетчик. Достали из очереди — уменьшили.

Т.к. при отзыве сообщения оно продолжает оставаться в очереди, то на message_limits отзыв сообщения влияния не оказывает. Поэтому может получится так, что в очереди стоит предельное количество сообщений типа M, но все они отозваны. По факту, ни одно из них не будет обработано. Но поставить новое сообщение типа M в очередь не получится, т.к. произойдет превышение лимита.

Ситуация нехорошая. Но как из нее выкрутиться? Непонятно.

mchain-ы с фиксированным размером очереди


В SObjectizer сообщение можно отослать не только в mbox, но и в mchain (это наш аналог CSP-ного канала). А mchain-ы могут иметь фиксированный размер своих очередей. Попытка поместить в полный mchain новое сообщение для mchain-а с фиксированным размером должна привести к какой-то реакции. Например, к ожиданию освобождения места в очереди. Или к выталкиванию самого старого сообщения.

В случае же с отзывом сообщения оно останется внутри очереди mchain-а. Получится, что сообщение уже не нужно, но место оно в очереди mchain-а занимает. И препятствует отсылке в mchain новых сообщений.

Такая же нехорошая ситуация, как и в случае с message_limits. И снова непонятно, как ее можно исправить.

Что непонятно как делать


Вот мы и подобрались к выбору между двумя (пока?) вариантами реализации отзывных сообщений. Первый вариант прост в реализации и не требует переделки потрохов SObjectizer-а. Второй вариант гораздо сложнее, но зато в нем получатель сообщения даже не знает, что имеет дело с отзывными сообщениями. Коротко рассмотрим каждый из них.

Получение отзывных сообщений в виде revocable_t<M>


Первое решение, которое выглядит, во-первых, реализуемым и, во-вторых, достаточно практичным, — это введение специальной обертки revocable_t<M>. Когда пользователь отсылает отзывное сообщение типа M через revocable_handle_t<M>, то отсылается не само сообщение M, а сообщение M внутри специальной обертки revocable_t<M>. И, соответственно, получать и обрабатывать пользователь будет не сообщение типа M, а сообщение revocable_t<M>. Например, таким образом:

class processor : public so_5::agent_t {
public:
   struct request { ... }; // Сообщение, которое может быть отозвано.

   void so_define_agent() override {
      // Подписываемся на сообщение.
      so_subscribe_self().event(
         // Вот эта сигнатура явно показывает, что мы работаем
         // с отзывным сообщением.
         [this](mhood_t< revocable_t<request> > cmd) {
            // Обрабатываем, но только если сообщение не отозвали.
            cmd->try_handle([this](mhood_t<request> msg) {
               ...
               });
         });
      ...
   }
   ...
};

Метод revocable_t<M>::try_handle() проверяет значение атомарного флага и, если сообщение не отозвано, вызывает переданную ему лямбда-функцию. Если же сообщение отозвано, то try_handle() ничего не делает.

Плюсы и минусы этого подхода


Главный плюс в том, что этот поход легко реализуется (по крайней мере пока так представляется). Фактически, revocable_handle_t<M> и revocable_t<M> будут всего лишь тонкой надстройкой над SObjectizer-ом.

Вмешательство во внутренности SObjectizer-а может потребоваться для того, чтобы подружить revocable_t и mutable_msg. Дело в том, что в SObjectizer есть понятие иммутабельных сообщений (они могут отсылаться как в режиме 1:1, так и в режиме 1:N). И есть понятие мутабельных сообщений, которые могут отсылаться только в режиме 1:1. При этом SObjectizer специальным образом трактует маркер mutable_msg<M> и выполняет соответствующие проверки в run-time. В случае с revocable_t<mutable_msg<M>> нужно будет научить SObjectizer трактовать эту конструкцию как mutable_msg<M>.

Еще один плюс в том, что дополнительные накладные расходы (как на метаданные отзывного сообщения, так на проверку атомарного флага), будут только в тех местах, где без этого не обойтись. Там же, где отзывные сообщения не используются, никаких дополнительных накладных расходов не будет вообще.

Ну а главный минус идеологический. В этом подходе факт использования отзывных сообщений сказывается как на отправителе (использование revocable_handle_t<M>), так и на получателе (использование revocable_t<M>). А вот как раз получателю-то и незачем знать, что он получает отзывные сообщения. Тем более, что в качестве получателя у вас может быть уже готовый сторонний агент, который написан без revocable_t<M>.

Кроме того, здесь остаются идеологические вопросы о, например, возможности перепосылки таких сообщений. Но, по первым прикидкам, эти вопросы решаемы.

Получение отзывных сообщений в виде обычных сообщений


Второй подход состоит в том, чтобы на стороне получателя видеть только сообщение типа M и не иметь представления о существовании revocable_handle_t<M> и revocable_t<M>. Т.е. если processor должен получать request, то он и должен видеть только request, без каких-то дополнительных оберток.

Вообще-то, и в этом подходе без каких-то оберток не обойтись, но они будут спрятаны внутри SObjectizer-а и пользователь их видеть не должен. После того как заявка будет извлечена из очереди SObjectizer сам определит, что это специально обернутое отзывное сообщение, проверит флаг актуальности сообщения, развернет сообщение если оно еще актуально. После чего отдаст сообщение агенту на обработку как будто это было обычное сообщение.

Плюсы и минусы этого подхода


Главный плюс данного подхода очевиден — получатель сообщения не знает, с какими сообщениями он работает. Что позволяет отправителю сообщений спокойно отзывать сообщения для любых агентов, даже тех, которые были написаны другими разработчиками.

Еще один немаловажный плюс — это возможность интеграции с механизмом message delivery tracing (здесь роль этого механизма описана подробнее). Т.е. если msg_tracing включен и отправитель отзывает сообщение, то следы этого можно будет отыскать в логе msg_tracing-а. Что очень удобно при отладке.

А вот главным минусом является трудоемкость реализации этого подхода. При которой нужно будет учитывать несколько факторов.

Во-первых, накладные расходы. Разного рода.

Скажем, можно сделать специальный флаг внутри сообщения, который будет указывать отзывное это сообщение или нет. А затем проверять этот флаг перед началом обработки каждого сообщения. Грубо говоря, в механизм доставки сообщений добавляется еще один if, который будет отрабатывать при обработке каждого(!) сообщения.

Я уверен, что в реальных приложениях потери на этот if будут едва ли заметны. Но вот просадка в синтетических бенчмарках обязательно проявится. Причем, чем абстрактней бенчмарк, чем меньше реальной работы он делает, тем сильнее просядет. И это плохо с маркетинговой точки зрения, т.к. есть некоторое количество индивидов, которые делают выводы о фреймворке по показателям синтетических бенчмарков. Причем делают это специфически: не разбираясь с тем, что это за бенчмарк, что он в принципе показывает, на каком железе он работает, но сравнивая итоговые цифры с показателями какого-то специализированного инструмента, на другом сценарии, на другом железе и т.д., и т.п.

В общем, поскольку мы делаем универсальный фреймворк, о котором, так уж получается, судят по абстрактным цифрам в абстрактных бенчмарках, то не хочется терять, скажем, 5% производительности в механизме доставки всех сообщений из-за добавления фичи, которая нужна будет лишь время от времени и то не всем пользователям.

Поэтому нужно сделать так, чтобы уже при отсылке сообщения получателю SObjectizer понимал, что при извлечении сообщения с ним нужно обращаться особым образом. В принципе, когда сообщение доставляется агенту, то SObjectizer сохраняет вместе с сообщением указатель на функцию, которая будет использоваться при обработке сообщения. Нужно это сейчас для того, чтобы по-разному обрабатывать асинхронные сообщения и синхронные запросы. Собственно, вот так выглядит заявка для сообщения, адресованного агенту:

struct execution_demand_t
{
	//! Receiver of demand.
	agent_t * m_receiver;
	//! Optional message limit for that message.
	const message_limit::control_block_t * m_limit;
	//! ID of mbox.
	mbox_id_t m_mbox_id;
	//! Type of the message.
	std::type_index m_msg_type;
	//! Event incident.
	message_ref_t m_message_ref;
	//! Demand handler.
	demand_handler_pfn_t m_demand_handler;
...
};

Где demand_handler_pfn_t — это обычный указатель на функцию:
typedef void (*demand_handler_pfn_t)(
	current_thread_id_t,
	execution_demand_t & );

Этот же механизм можно использовать и для того, чтобы специальным образом обрабатывать отзываемое сообщение. Т.е. когда mbox отдает агенту сообщение, то агент знает, отдается ли ему асинхронное сообщение или синхронный запрос. Точно так же агенту можно специальным образом отдавать асинхронное отзываемое сообщение. И агент сохранит вместе с сообщением указатель на функцию, которая знает, как должно обрабатывать отзывные сообщения.

Вроде бы все хорошо, но есть два больших «но»… :(

Во-первых, существующий интерфейс mbox-ов (а именно класс abstract_message_mbox_t) не имеет методов для отсылки отзывных сообщений. Значит этот интерфейс нужно расширять. Причем так, чтобы не поломались чужие реализации mbox-ов, которые завязаны на abstract_message_box_t из SObjectizer-5.5 (в частности, ряд mbox реализован в so_5_extra и ломать их просто так не хочется).

Во-вторых, сообщения могут отсылаться не только в mbox-ы, за которыми спрятаны агенты, но и в mchain-ы. Которые являются нашими аналогами CSP-шных каналов. А там до сих пор заявки лежали без каких-либо дополнительных указателей на функции. Вводить в каждый элемент очереди заявок mchain-а дополнительный указатель… Можно, конечно, но выглядит довольно дорогим решением. Кроме того, сами реализации mchain-ов пока что не предусматривали ситуации, при которой извлеченное сообщение нужно проверить и, возможно, выбросить.

Если попытаться все вышеописанные проблемы резюмировать, то главная проблема этого подхода в том, что не так-то просто сделать его реализацию, чтобы она была дешевой для случаев, когда отзывные сообщения не используются.

А что же с гарантированной отменой отложенных сообщений?


Боюсь, в дебрях технических деталей потерялась исходная проблема. Допустим, отзывные сообщения есть, как же будет происходить отмена отложенных/периодических сообщений?

Тут, как говорится, возможны варианты. Например, работа с отложенными/периодическими сообщениями может быть частью функциональности revocable_handle_t<M>:

revocable_handle_t<my_mesage> msg;
msg.send_delayed(target, 15s, ...);
...
msg.revoke();

Или же можно будет сделать поверх revocable_handle_t<M> дополнительный вспомогательный класс cancelable_timer_t<M>, который и будет предоставлять методы send_delayed/send_periodic.

Белое пятно: синхронные запросы


SObjectizer-5 поддерживает не только асинхронное взаимодействие между сущностями в программе (через посылку сообщений в mbox-ы и mchain-ы), но и синхронное взаимодействие через request_value/request_future. Это синхронное взаимодействие работает не только для агентов. Т.е. можно не только отослать синхронный запрос агенту через его mbox. В случае с mchain-ами так же можно делать синхронные запросы, например, к другой рабочей нити, на которой вызвали receive() или select() для mchain-а.

Так вот, пока еще непонятно, следует ли разрешать использовать синхронные запросы совместно с отзывными сообщениями. С одной стороны, может быть в этом и есть какой-то смысл. И выглядеть это может, например, вот так:

revocable_handle_t<my_request> msg;
auto f = msg.request_future<my_reply>(target, ...);
...
if(some_condition)
  msg.revoke();
...
f.get(); // Получим исключение в случае предыдущего revoke().

С другой стороны, с отзывными сообщениями пока и так очень много непонятного, так что вопрос с синхронным взаимодействием отложен до лучших времен.

Выбирай, но осторожно. Но выбирай


Итак, есть понимание проблемы. Есть два варианта ее решения. Которые на данный момент кажутся реализуемыми. Но сильно отличаются уровнем удобства, предоставляемого пользователю, а еще сильнее они отличаются стоимостью реализации.

Между этими двумя вариантами нужно выбрать. Или же придумать что-то другое.

В чем сложность выбора?

Сложность в том, что SObjectizer — это бесплатный фреймворк. Денег он нам напрямую не приносит. Мы его делаем, что называется, за свои. Поэтому чисто из экономических предпочтений более простой и быстрый в реализации вариант выгоднее.

Но, с другой стороны, не все измеряется деньгами и в долгосрочной перспективе качественно сделанный инструмент, фичи которого нормально увязаны друг с другом, лучше, чем лоскутное одеяло из налепленных кое-как заплаток. Качество оценивают как пользователи, так и мы сами, когда в последствии сопровождаем свою разработку и добавляем в нее новые фичи.

Так что выбор, по сути, идет между сиюминутной выгодой и долгосрочными перспективами. Правда, в современном мире у C++ инструментов с долгосрочными перспективами как-то туманно. Что делает выбор еще более сложным.

Вот в таких условиях и приходится выбирать. Осторожно. Но выбирать.

Заключение


В данной статье мы попытались немного показать процесс проектирования и внедрения новых фич в наш фреймворк. Такой процесс происходит у нас регулярно. Раньше почаще, т.к. в 2014-2016гг SObjectizer развивался гораздо активнее. Сейчас темпы выпуска новых версии снизились. Что объективно, в том числе и потому, что добавлять новую функциональность ничего не поломав, с каждой новой версией становится сложнее.

Надеюсь, было интересно заглянуть к нам за кулисы. Спасибо за внимание!

Комментарии (13)


  1. Ryppka
    22.09.2018 17:20

    Я, наверное, безбожно торможу. В чем смысл отзыва отложенного сообщения, уже попавшего в очередь? Разве смысл отложенного сообщения не означает, что, получив его, адресат меняет свое состояние т.к. наступил какой-то существенный момент времени? Я имею в виду «логическое» время, которое «порядок следования событий»?
    А в целом отмена событий кажется мне очень сомнительной практикой, которая может скорее все запутать, чем помочь что-то решить.


    1. eao197 Автор
      22.09.2018 17:49

      С отложенными сообщениями время от времени люди наступают вот на какие грабли. Допустим, есть агент для выполнения какой-то сложной операции. В процессе ее выполнения агент должен провзаимодействовать с другими агентами. И у него, плюс к тому, есть ограничение на время выполнения всей операции. Если время истекло, а ответ от стороннего агента не получен, то всю операцию нужно отменить и возвратить отрицательный результат. Обычно это выглядит как-то так:

      class op_performer : public so_5::agent_t {
        // Отложенное сообщение для ограничения времени операции.
        struct timeout final : public so_5::signal_t {};
      
        // Состояние ожидания следующей операции.
        state_t st_free{this};
        // Состояние выполнения операции.
        state_t st_working{this};
        ... // возможно, еще и вложенные подсостояния для st_working.
      
        void so_define_agent() override {
          // Подписываемся на отложенное сообщение в st_working
          // дабы отменить операцию.
          st_working.event(&op_performer::on_timeout);
          ...
        }
      
        void on_perform_operation(mhood_t<start_operation> cmd) {
           // Получили команду на начало новой операции.
           this >>= st_working;
           // Ограничиваем время выполнения операции.
           so_5::send_delayed<timeout>(this, 25s);
           ... // Начинаем операцию.
        }
        void on_completion(mhood_t<completion_message> cmd) {
          ... // Завершение операции и отсылка ответа кому нужно.
          this >>= st_free; // Возвращаемся в исходное состояние.
        }
        void on_timeout(mhood_t<timeout>) {
          ... // Отменяем выполнение операции.
          ... // Отсылаем отрицательный ответ кому нужно.
          this >>= st_free; // Возвращаемся в исходное состояние.
        }
      ...
      };

      В таком коде есть проблема: когда операция завершается не по тайм-ауту, то отложенное сообщение никто не отменяет. Поэтому оно до агента таки дойдет. Возможно, в это время агент уже будет выполнять новый запрос. И агент подумает, что это истек тайм-аут для текущей, а не для предыдущей операции.

      Очевидная попытка исправить эту проблему: отмена отложенного сообщения при возврате в st_free:
      class op_performer : public so_5::agent_t {
        ...
        so_5::timer_id_t timeout_timer_;
        ...
        void so_define_agent() override {
          st_free.on_enter([this]{ timeout_timer_.reset(); });
          st_working.on_enter([this] {
            timeout_timer_ = so_5::send_periodic<timeout>(this, 25s, 0s);
          });
          ...
        }
      };

      Но, к сожалению, сейчас в SObjectizer возможна ситуация, когда отмена отложенного сообщения фактически не сработает, т.к. сообщение уже будет стоять в очереди агента-получателя. Т.е. представьте себе, что в очереди op_performer-а уже стоят: completion_message (для текущей операции), start_operation (для новой операции) и timeout (для текущей операции). Агент обработает completion_message и start_operation, вновь войдет в st_working и получит timeout, который относился к предыдущей операции.

      Вероятность такого стечения событий невелика. Но, к сожалению, когда в приложении работает несколько десятков тысяч агентов и летают десятки миллионов сообщений, иногда эта вероятность воплощается в жизнь. И, что особенно плохо, происходит это редко и без какой-либо системы. Поэтому разбираться с такими проблемами особенно сложно.


      1. Ryppka
        22.09.2018 18:18

        Т.е. речь идет не об акторах с состоянием, а о worker'ах, работающих в реальном времени с гарантиями best effort? Они отстают от реального времени (в очереди накапливаются задачи), а таймауты не маркируются идентификатором задачи, к которой относятся?
        Мне представляются два способа решить проблему.
        Маркировать таймаут идентификатором задачи, к которой он относится.
        Обеспечить для события истечения таймаута гарантии доставки: очередь с приоритетами или выделенная очередь и т.д.
        Мне кажется, что если речь идет о реальном времени, то надо быть жестким, причем тем жестчее, чем жестчее это самое реальное время)))). Все-таки гарантии реального времени как-то ортогональны к остальной логике выполнения и смешивать, как мне кажется, не стоит. Лучше сделать для этих гарантий явные и отдельные механизмы.


        1. eao197 Автор
          22.09.2018 18:50
          +1

          Маркировать таймаут идентификатором задачи, к которой он относится.
          Как раз такой способ первым в статье и описан :) И это, кстати говоря, один из возможных подходов к решению проблемы отмены отложенных сообщений. Если пусть с отзывными сообщениями зайдет в тупик, то можно будет сделать просто какой-то штатный механизм в SObjectizer-е, который будет отсылать отложенные сообщения с автоматической генерацией для них ID-шников.
          Обеспечить для события истечения таймаута гарантии доставки: очередь с приоритетами или выделенная очередь и т.д.
          ИМХО, здесь даже не в реальном времени дело, а в самом факте существования очереди. Т.е. как только у нас может выстроится комбинация сообщений (completion_message, start_operation, timeout) так сразу же мы натыкаемся на эту проблему. Причем, у нас реально может быть правильный порядок появления сообщений: completion_message в момент времени t, start_operation в момент времени (t+1us) и timeout в момент времени (t+2us). Не говоря уже о том, что если мы работаем не в рамках ОС реального времени, а в обычной многозадачной системе, то мы запросто можем наткнуться на ситуацию, когда в момент времени t возникает timeout, но это сообщение не успевает встать в очередь, поскольку нить таймера прерывают (или же в попытке захватить замок очереди нить таймера оказывается не первой). А в очередь встает completion_message в момент времени (t+1us), а затем start_message в момент времени (t+2us). И лишь затем туда попадет timeout.


          1. Ryppka
            22.09.2018 19:08
            +1

            Как раз такой способ первым в статье и описан :)

            Я так и знал, что по пэрвому вопросу сущэственных расхождэний у нас нэ будэт! (С) Сосо Джугашвили)))
            Видимо, статью читал недостаточно внимательно, каюсь. Честно говоря, для решения конкретной узкой задачи, когда выполнение текущей задачи прерывается «отставшим» таймаутом от уже заверенной предыдущей — другого разумного решения я не вижу. А вот как его красиво оформить… Сделать специальный производный класс для агентов с неявным состоянием типа id-текущей задачи и в нем утилити-функцию для обработки таймаутов?.. Хм, что не подумаю, не шибко элегантно получается.
            Что касается нарушения последовательности сообщений, то полностью согласен с Вами: в реальных условиях, да еще на ОС общего назначения… Я последнее время все на C да на C, да на микроконтроллерах в реальном режиме с аппаратными таймерами — так даже там…


            1. eao197 Автор
              22.09.2018 19:21

              А вот как его красиво оформить…
              Возможно, это будет что-то похожее на so_5::extra::async_op::time_limited.
              Ну и, в принципе, этот самый async_op::time_limited уже в каких-то сценариях будет тем самым механизмом с гарантированной отменой таймеров. Собственно, под это он и создавался с подачи ув.тов. PavelVainerman


        1. PavelVainerman
          22.09.2018 19:57

          Речь не обязательно о работе в реальном времени. Речь о любой работе (задаче) связанной с ожиданием (с таймаутами). Не знаю как везде, но как минимум в АСУ, «надёжное программирование» подразумевает, что любая операция должна быть конечной во времени. Поэтому чтобы вы не делали, что-либо включали/запускали
          или что-то запрашивали, у Вас обязательно должен быть «защитный таймер» который гарантирует, что вы не застрянете на ожидании ответа «навечно». Т.е. у Вас по сути всегда, для какого-либо действия возникает необходимость произвести действие и засечь таймер (таймаут). И дальше либо «ответ»(обратная связь) придёт о том, что действие выполнено (успешно или нет), либо первее сработает таймер (таймаут) и вы будете выполнять какую-то обработку этой ситуации (например повторите попытку ещё несколько раз)… Тут-то и возникает описанная проблема, что если к Вам пришло вперёд сообщение о, допустим, успешном выполнении операции, то Вам сообщение от таймера уже не нужно. Точнее нужно, чтобы оно уже не приходило. Нужен механизм позволяющий отказаться от ранее заказанного таймера.

          А eao197 как раз описал проблему. Что если механизм не гарантирует отказ от таймера (типа он уже в очереди, его оттуда не убрать), то почти гарантировано, что возникнет ситуация, когда Вы решите повторить попытку выполнить команду, засекаете вновь таймер, а сообщение приходит в следующее мгновенье, от старого таймера, потому-что оно уже было в очереди в этот момент…
          Т.е. мне кажется, что при активной работе с таймерами (а они нужна практически всегда в реальном асинхронном взаимодействии, т.к. всегда должен быть защитный таймер на любую длительную операцию), отмена таймера (или игнорирование «старых» сообщений от таймеров) очень нужны и важны. И конечно, хочется чтобы это поддерживалось на уровне фреймворка.
          Будет ли это механизм с маркированием каждого таймера или же с возможностью вытащить его из очереди на обработку, для меня как пользователя фреймворка не важно. Главное чтобы это было «незаметно» и в идеале не требовало от меня добавлять в свои сообщения доп. поля, по крайней мере явным образом.


      1. Ryppka
        22.09.2018 18:42
        +1

        Не могу удержаться и дополняю предыдущий пост)))
        Я бы оформил предоставление агенту возможность реагировать на таймеры реального времени явно, например, как дополнительный параметр шаблона. В смысле по дефолту этот параметр работает как сейчас, но если его явно задать на специальное значение, то, к примеру, для таймеров создается своя высокоприоритетная очередь или еще как-то.
        Это к вопросу об ортогональности аспектов: по умолчанию сообщение от таймера может застревать в общей очереди, отставая от реального времени, а если надо, то агент получает более строгие гарантии доставки связанных с реальным временем сообщений. В некотором смысле, все сообщения, связанные с реальным временем — таймаут. Ну, или каждое такое сообщение — немножечко таймаут)))


        1. eao197 Автор
          22.09.2018 18:53

          Интересная мысль. Но, наверное, в рамках SObjectizer-а это можно обеспечить за счет специализированных диспетчеров, которые знают типы сообщений и их приоритеты. По аналогии с тем, что описывалось в этой статье.


          1. Ryppka
            22.09.2018 18:56

            Да, я написал про агенты, но в контексте SO речь шла, конечно, про диспетчеры. Добавление РТ (гарантированные таймауты, время жизни сообщений, обработка просроченных сообщений) — это все про планировщики.


    1. eao197 Автор
      22.09.2018 17:58

      А в целом отмена событий кажется мне очень сомнительной практикой, которая может скорее все запутать, чем помочь что-то решить.
      В общем, да, это не такая штука, которая должна использоваться повсеместно. Но бывают ситуации, когда агент A попросил агента B выполнить какую-то операцию, которая может занимать довольно много времени. Например, агент B проверяет рисковость платежа и этот агент единственный, тогда как агентов A несколько и запросы от A к B могут скапливаться в очереди. Пока B обрабатывает предыдущие запросы агент A может понять, что операцию проводить уже не нужно. Скажем, клиент сам отказался от платежа.

      По-хорошему, уже отосланный B запрос нужно отозвать. Но как это сделать?

      Можно использовать схему collector-performer. Тогда B будет представлять из себя пару из двух агентов: B_collector с очередью запросов и B_performer. Пока B_performer выполняет длительную операцию, B_collector только разбирается с очередью ждущих запросов. В этом случае агент A может послать сообщение на отмену запроса, это сообщение получит B_collector и выбросит запрос, если этот запрос еще ждет своей очереди.

      Но создание такой пары B_collector/B_performer — это работа, которую нужно сделать. По хорошему, ее нужно сделать. Однако, если бы делаем какой-то proof-of-concept на коленке, то нам может быть удобно, если бы SObjectizer предоставлял функциональность по отзыву ранее отосланных сообщений.

      Так что, да, фича не однозначная. Но т.к. многие насущные потребности разработчиков уже реализованы, то есть возможность позаниматься и вот такими вопросами.


      1. Ryppka
        22.09.2018 18:09
        +1

        Будь на то моя воля, я бы не заморачивался этой фичей, нужной, как Вы говорите, исключительно в контексте быстрого прототипирования. Нехай collector/performer пилят! Умные и так поймут, почему, а глупцам давать в руки способ выстрелить себе в ногу и поднять крик «SObjectizer г&^%но..!» я бы не стал.


        1. eao197 Автор
          22.09.2018 18:15

          Ну вот поэтому эта фича пока не сделана, а только рассматривается. Ну и статья появилась в том числе и для того, чтобы фидбэк какой-то получить. Так что большое спасибо за то, что нашли время написать свое мнение.