SObjectizer — это небольшой фреймворк для C++, который дает возможность разработчику использовать такие подходы, как Actor Model, Communicating Sequential Processes и Publish/Subscribe.


Одной из ключевых концепций в SObjectizer являются диспетчеры. Диспетчеры определяют, где и как акторы (агенты в терминологии SObjectizer-а) обрабатывают свои события. Диспетчеры в SObjectizer бывают разных типов, и пользователь может создавать в своем приложении столько разнообразных диспетчеров, сколько ему потребуется.


При этом у пользователя есть возможность написать свой собственный тип диспетчера, если ничего из стандартных диспетчеров ему не подходит. Два с половиной года назад уже рассказывалось о том, как можно сделать диспетчер для SObjectizer-а со специфическими свойствами.


Сегодня мы еще раз поговорим об этом. На примере уже другой задачи. Да и реализация будет отличаться, поскольку за прошедшее время SObjectizer успел обновиться сперва до версии 5.6, а затем и 5.7. И в этих версиях много отличий от версии 5.5, про которую в основном и рассказывалось в прошлом. В том числе и в механизме диспетчеров.


О решаемой задаче в двух словах


Предположим, что у нас есть агент, который ожидает два сообщения: msg_result с финальным результатом ранее начатой агентом операции и msg_status с промежуточной информацией о том, как протекает начатая операция.


Сообщения msg_status могут идти большим потоком. Например, на одно msg_result может приходиться до 1000 msg_status. И нам бы хотелось, чтобы когда в очереди уже стоит 900 сообщений msg_status, новое сообщение msg_result вставало не в конец очереди, а в самое ее начало. Чтобы msg_result не ждало, пока разгребутся 900 старых msg_status.



Вроде бы простая задача. Но с ее реализацией могут возникнуть неожиданные сложности. И чтобы понять, что это за сложности и как с ними справиться, нужно посмотреть на SObjectizer-овские диспетчеры и понять, что это за звери такие. После чего можно будет обсудить возможные способы реализации доставки сообщений с учетом их приоритетов. Воплотить один из вариантов в жизнь и обсудить его реализацию.


Как диспетчеры связаны с политикой доставки сообщений до агентов?


Диспетчер в SObjectizer-е — это сущность, которая определяет, где и когда агенты будут обрабатывать свои сообщения.


Допустим, у нас есть агент Bob, который подписывается на сообщение msg_result из почтового ящика mbox. Когда в mbox отсылается msg_result, то это сообщение должно встать в очередь Bob-а, откуда оно будет рано или поздно извлечено и обработано. И вот здесь уже в полный рост встают диспетчеры, поскольку у агентов нет никаких очередей, тогда как у диспетчеров есть.


Когда агент регистрируется в SObjectizer-е, то агент привязывается к какому-то диспетчеру. И в момент привязки агенту дается указатель на сущность event_queue. Это интерфейс, за которым скрыта некая машинерия по передаче сообщения, адресованного агенту, именно тому диспетчеру, к которому агент привязан.


Когда сообщение передается диспетчеру, тот формирует заявку (demand) на обработку сообщения агентом и сохраняет заявку где-то у себя (очередь заявок диспетчера называется demand_queue).


Процесс доставки сообщения до агента в SObjectizer выглядит следующим образом:



Когда сообщение отсылается в mbox, то mbox видит Bob-а в подписчиках и говорит Bob-у: вот тебе новое сообщение. Bob берет это сообщение и передает его в свой event_queue. И уже этот event_queue формирует для диспетчера заявку (demand) на обработку сообщения для агента Bob. Заявка сохраняется в demand_queue диспетчера.


Диспетчер владеет одной или несколькими рабочими нитями, которые занимаются тем, что извлекают заявки из demand_queue и отправляют их на обработку. Эти рабочие нити диспетчера называются рабочим контекстом на котором будут работать агенты.


В общем, получается, что диспетчеры и предоставляют рабочий контекст для агентов, и хранят в своих demand_queue адресованные агентам сообщения.


А это означает, что когда нам нужна приоритетная доставка сообщений (т.е. msg_result вперед msg_status), то нам потребуется диспетчер, который эту самую приоритетную доставку реализует.


А приоритетов для сообщений в SObjectizer-5 и нет :(


Совсем.


Вот так вот.


Приоритеты для сообщений были в SObjectizer-4. Но они на практике использовались всего раз или два. Зато хлопот с их существованием и поддержкой было много.


Поэтому при разработке SObjectizer-5 от приоритетов для сообщений агента было решено отказаться. И за 10 лет развития и использования SObjectizer-5 пожалеть об этом пока что не пришлось.


Однако время от времени задачи, в которых требуется какое-либо приоритетное обслуживание, все-таки встречаются. Поэтому некоторое время назад в SObjectizer-5 было добавлено понятие приоритета для агента. Именно для агента, а не для отдельного сообщения. Для поддержки приоритетов агентов в SObjectizer-5 появились три диспетчера, которые реализуют разные политики обслуживания приоритетов.


Так как можно решить задачу с msg_result и msg_status?


Использование двух агентов с разными приоритетами и диспетчера one_thread::strictly_ordered


Самое "простое" и "искаробочное" решение.


Логика агента A размазывается на двух агентов A_result и A_status. Агенты A_result и A_status получают разные приоритеты и привязываются к одному и тому же диспетчеру типа one_thread::strictly_ordered. Агент A_result подписывается на msg_result, тогда как агент A_status подписывается на msg_status.


Общие для агентов данные выносятся в какой-то общий объект, которыми они совместно владеют. Через shared_ptr, например. Получится что-то вроде:


struct A_data { ... };

class A_result final : public so_5::agent_t {
   std::shared_ptr<A_data> m_data;
public:
   A_result(context_t ctx, std::shared_ptr<A_data> data) {...}

   void so_define_agent() override {
      so_subscribe(m_data->m_mbox, &A_result::on_result);
   }

private:
   void on_result(const msg_result & msg) {...}
};

class A_status final : public so_5::agent_t {
   std::shared_ptr<A_data> m_data;
public:
   A_status(context_t ctx, std::shared_ptr<A_data> data) {...}

   void so_define_agent() override {
      so_subscribe(m_data->m_mbox, &A_status::on_status);
   }

private:
   void on_status(const msg_status & msg) {...}
};

Разделять общие данные между агентами A_result и A_status безопасно до тех пор, пока они работают на one_thread::strictly_ordered диспетчере.


Собственно говоря, это тот подход, который я бы и рекомендовал использовать, если вам потребовалась приоритетная обработка событий.


Но у этого способа есть очевидный недостаток: трудоемкость и размазывание логики по нескольким сущностям. Эта трудоемкость будет увеличиваться по мере увеличения количества сообщений, которые вам потребуется обрабатывать с разными приоритетами. Так что в какой-то момент может показаться, что двигаться в этом направлении слишком дорого и что нужно искать другое решение.


Собственный диспетчер с приоритетами для сообщений


Другой подход состоит в том, чтобы сделать собственный диспетчер, который будет доставлять сообщения до агентов-получателей с учетом приоритетов сообщений. Благо, если не заморачиваться вопросами мониторинга работы диспетчера и сбора статистики, то это делается не так уж и сложно.


Если такой диспетчер будет написан, то можно избавить себя от недостатков подхода с несколькими разноприоритетными агентами: вся логика будет находится в одном месте, поэтому добавлять новые обрабатываемые сообщения или избавляться от каких-то старых сообщений будет гораздо проще.


Именно по этому пути мы и пойдем в данной статье.


Что именно мы попытаемся сделать?


Мы попытаемся посмотреть на проблему немного шире (как это и положено русским программистам, которые ищут универсальные решения там, где можно обойтись методом грубой силы).


Начнем с того, что нам нужен собственный диспетчер. Который сможет упорядочивать сообщения в своей очереди согласно их приоритетов.


И тут возникает вопрос, а как должны определяться эти приоритеты?


А это зависит от задачи.


Где-то приоритеты будут определяться типом сообщения и эти приоритеты можно зафиксировать прямо в compile-time. Скажем, приоритет у msg_result единичка, а у msg_status — нолик.


Где-то приоритеты могут зависеть от типа сообщения и агента-получателя. Скажем, для агента A сообщение msg_result будет иметь приоритет 1, а сообщение msg_status — 0. Тогда как для агента L оба эти сообщения будут иметь приоритет 0.


Где-то приоритеты могут зависеть только от почтового ящика, из которого они были получены. Скажем, все сообщения из mbox_A должны быть приоритетнее, чем сообщения из mbox_B.


А где-то приоритеты почтовых ящиков должны быть дополнены еще и приоритетами сообщений. Т.к. msg_result из mbox_A имеет приоритет 2, msg_status из mbox_A — 1, msg_result из mbox_B — 1, msg_status из mbox_B — 0.


Ну и другие сочетания параметров также возможны.


Значит ли это, что под каждое такое сочетание нужно делать свой диспетчер?


Скорее всего нет.


Возможно, диспетчер должен быть один, просто он должен уметь настраиваться на любой способ вычисления приоритета сообщений. Например, пользователь параметризует диспетчер объектом priority_detector, который и определяет приоритет очередного сообщения: при получении очередной заявки диспетчер дергает priority_detector и располагает заявку в очереди согласно вычисленного priority_detector-ом приоритета.


Уже хорошо.


Но можно попробовать пойти еще дальше.


Диспетчеры были придуманы для того, чтобы дать возможность разработчику распределять своих агентов по тем рабочим контекстам, которые нужны разработчику. Скажем, захотел программист, чтобы агенты Alice и Bob работали на одной общей рабочей нити, значит зачем-то ему это нужно. Не суть важно зачем: для экономии ресурсов или для того, чтобы облегчить Alice и Bob совместную работу над общими разделяемыми данным. Разработчик хочет привязать Alice и Bob к общему контексту, значит он должен суметь это сделать.


А теперь представим себе, что Alice и Bob должны работать на общем рабочем контексте, но приоритеты доставки сообщений до Alice и Bob должны вычисляться по-разному. Скажем, приоритет сообщений для Alice зависит только от их типа. Тогда как приоритет для Bob-а зависит и от типа, и от почтового ящика.


Тут можно пойти, как минимум, двумя путями.


Первый путь. Диспетчер владеет одной приоритетной очередью. А мы пишем сложный priority_detector-а, который разбирается кто получатель сообщения, Alice или Bob, затем для каждого получателя определяет приоритет по тем или иным критериям.


Вполне понятное направление для движения. Но его трудоемкость будет расти по мере увеличения количества разнотипных агентов, которые нам нужно будет привязать к одному диспетчеру.


Второй путь. А что если у каждого из агентов будет своя собственная приоритетная очередь? И в каждой очереди будет собственный priority_detector со своими правилами? Тогда мы сможем привязывать к одном диспетчеру агентов с разными политиками доставки сообщений. И в этом случае нам нужно научить диспетчера работать не с одной-единственной очередью, в которую сваливается вообще все, а с набором отдельных очередей.


Мы пойдем вторым путем. Поскольку он открывает интересные возможности. Ведь если мы можем научить диспетчер обслуживать агентов с собственными очередями, то не обязательно это будут очереди с приоритетом. Например, это могут быть очереди фиксированного размера с автоматическим выбрасыванием самых старых (или самых новых) элементов при попытке добавления заявки в уже полную очередь. Или же очереди с контролем времени пребывания: скажем, если заявка простояла в очереди дольше 250ms, то она уже не актуальна и должна быть проигнорирована.


Демо проект so5_custom_queue_disps


Для иллюстрации предлагаемого решения на GitHub-е создан демо-проект so5_custom_queue_disps, который содержит исходные тексты обсуждаемого ниже диспетчера.


На данный момент в нем есть реализация только одного типа диспетчера — one_thread, который привязывает всех агентов к одной-единственной рабочей нити. Если данная тема кого-нибудь заинтересует, то туда же можно будет добавить и реализации thread_pool и/или adv_thread_pool диспетчеров. Для иллюстрации. Ну или для того, чтобы их можно было скопипастить, если вдруг кому-нибудь понадобятся.


Общая идея: список из непустых очередей


Идея, положенная в основу so5_custom_queue_disps, состоит в том, что есть N независимых друг от друга очередей заявок. В пределе у каждого агента может быть своя очередь заявок. Эти очереди живут до тех пор, пока кто-то их использует, как только очередь стала никому не нужна она уничтожается.


Диспетчер хранит у себя список указателей на очереди заявок. Если этот список пуст, значит все очереди заявок пусты. Как только в какую-то из них попадает заявка, эта очередь добавляется в конец списка диспетчера.


Диспетчер спит, пока в его списке ничего нет. Но как только какая-то из очередей добавляется в список, то диспетчер просыпается и начинает обслуживать очереди из своего списка.


Диспетчер изымает из списка первую очередь. Извлекает заявку из очереди. Если после извлечения заявки очередь оказывается пустой, то диспетчер забывает про эту очередь. Если же в очереди что-то остается, то диспетчер возвращает эту очень в свой список, в самый его конец.


После чего диспетчер запускает обработку извлеченной заявки. А после завершения обработки проверяет свой список: если список не пуст, то обрабатывается следующий его элемент. Если пуст, то диспетчер засыпает.



Несколько агентов с одной очередью заявок и FIFO для агентов


Для большей гибкости в so5_custom_queue_disps есть возможность связать нескольких агентов с одной общей очередью заявок. Например, если агенты должны использовать общую политику доставки сообщений (скажем, у них общие приоритеты).


Но можно к one_thread диспетчеру привязать и агентов, у каждого из которых будет своя собственная очередь заявок. Получится, что агенты исполняются на одном рабочем контексте, но заявки для них диспетчер будет брать из разных очередей.


И тут возможны сюрпризы с обеспечением FIFO для агентов.


Предположим, что мы используем простые очереди заявок. Без приоритетов, без ограничения на размер, без выбрасывания чего-либо. Просто очереди.


Предположим, что агенты Alice и Bob разделяют одну общую очередь. И мы отсылаем сообщение M1 агенту Alice, а затем отсылаем сообщение M2 агенту Bob. Доставка сообщений до агентов произойдет в таком же порядке: сперва M1, затем M2.


Но вот если агенты Alice и Bob имеют независимые очереди заявок, то при отсылке вначале M1 агенту Alice, а затем M2 агенту Bob, порядок доставки может быть любым. Это зависит и от наполнения очередей заявок самих агентов, и от того, где эти очереди находились в списке непустых очередей диспетчера. Так что может случится и так, что M2 будет обработано агентом Bob еще до того, как M1 дойдет до Alice.


Эту особенность нужно иметь в виду при работе с таким диспетчером.


Как описанный выше подход выглядит для пользователя на практике?


Прежде чем перейти к рассмотрению деталей реализации custom_queue_disps::one_thread-диспетчера, посмотрим на то, как выглядит использование этого диспетчера в коде:


so_5::launch( [](so_5::environment_t & env) {
   env.introduce_coop( [](so_5::coop_t & coop) {
      // (1)
      auto queue = std::make_shared<dynamic_per_agent_priorities_t>();

      // (2)
      auto binder = custom_queue_disps::one_thread::make_dispatcher(
            coop.environment() ).binder( queue );

      // (3)
      auto * alice = coop.make_agent_with_binder<demo_agent_t>(
            binder, "Alice" );
      auto * bob = coop.make_agent_with_binder<demo_agent_t>(
            binder, "Bob" );

      // (4)
      queue->define_priority( alice,
            typeid(demo_agent_t::hello),
            dynamic_per_agent_priorities_t::low );
      queue->define_priority( alice,
            typeid(demo_agent_t::bye),
            dynamic_per_agent_priorities_t::high );

      queue->define_priority( bob,
            typeid(demo_agent_t::hello),
            dynamic_per_agent_priorities_t::high );
      queue->define_priority( bob,
            typeid(demo_agent_t::bye),
            dynamic_per_agent_priorities_t::low );
   } );
} );

Здесь в точке (1) мы создаем отдельную очередь заявок, которая реализует доставку сообщений с учетом получателя сообщения и приоритета этого сообщения именно для этого получателя (класс dynamic_per_agent_priorities_t в деталях рассматривается ниже).


В точке (2) мы делаем сразу два действия:


  • во-первых, создаем новый экземпляр диспетчера;
  • во-вторых, получаем от этого диспетчера объект disp_binder, который нам нужен, чтобы привязать агентов к этому диспетчеру. Этот disp_binder знает, что агенты, которых он привязывает к диспетчеру, будут совместно использовать очередь заявок, созданную в точке (1).

В точке (3) мы создаем двух агентов, каждый из которых будет привязан к one_thread-диспетчеру, созданному в точке (2).


В точке (4) определяются приоритеты для сообщений, которые обрабатываются агентами Alice и Bob. Можно увидеть, что одним и тем же сообщениям назначаются разные приоритеты. Соответственно, даже если сообщения отсылаются агентам единовременно, обрабатываться сообщения будут в разном порядке. В соответствии со своими приоритетами. А это как раз то, что нам и нужно.


Некоторые пояснения касательно деталей реализации so5_custom_queue_disps


Базовый класс demand_queue_t и его наследники


Итак, нам нужно иметь возможность привязать к диспетчеру агентов, использующих совершенно разные очереди заявок. Поэтому, с одной стороны, диспетчер должен уметь работать с непохожими друг на друга очередями. И, с другой стороны, пользователь должен иметь возможность без проблем подсунуть диспетчеру свою реализацию очереди.


Для этого предназначен интерфейс demand_queue_t.


Публичная часть demand_queue_t


Публичная часть demand_queue_t выглядит так:


      [[nodiscard]]
      virtual bool
      empty() const noexcept = 0;

      [[nodiscard]]
      virtual std::optional<so_5::execution_demand_t>
      try_extract() noexcept = 0;

      virtual void
      push( so_5::execution_demand_t demand ) = 0;

Это как раз те методы, которые пользователь должен переопределить в своем классе очереди заявок для того, чтобы диспетчер с этой очередью заявок смог работать.


Надеюсь, что смысл методов empty() и push() очевиден, поэтому на них останавливаться не буду (если что, то отвечу на вопросы в комментариях). А вот по поводу возврата std::optional из try_extract() можно сказать пару слов.


Может показаться, что если диспетчер обращается к очереди заявок только когда очередь не пуста, то достаточно иметь только один метод extract(), который возвращает первую заявку из очереди (конструктуры/операторы копирования/перемещения для execution_demand_t не бросают исключений, поэтому можно обойтись одним extract() вместо пары front()+pop()).


Но в таком случае мы теряем некоторую долю гибкости. Скажем, возможность игнорировать заявки, которые ждали в очереди слишком долго. Тогда как возврат std::optional дает нам такую возможность. Поэтому диспетчер ожидает следующего поведения от очереди заявок:


  • если empty() возвращает false, то диспетчер может безопасно вызывать try_extract();
  • try_extract() может вернуть пустой std::optional даже если очередь была непустой. Это всего лишь означает, что актуальной заявки для обработки на самом деле не оказалось. Поэтому диспетчер просто идет дальше.

thread-safety для demand_queue_t


Диспетчер вызывает методы empty/try_extract/push только под своим собственным mutex-ом. Поэтому, если очередь заявок модифицирует свое состояние только в этих методах, то об обеспечении thread-safety можно не беспокоится, она обеспечивается диспетчером автоматически.


Однако, если внутри empty/try_extract/push требуется доступ к информации, которая каким-то образом может модифицироваться еще какими-то методами, то тогда ответственность за обеспечение thread-safety для этой дополнительной информации ложится на разработчика очереди заявок. Пример этого мы увидим ниже, когда будем говорить о классе dynamic_per_agent_priorities_t.


Специальные приоритеты для заявок evt_start и evt_finish


Если реализация очереди заявок выполняет переупорядочивание заявок согласно приоритетам или если реализация очереди заявок выбрасывает какие-то заявки из очереди, то следует учесть факт существования двух типов заявок: evt_start и evt_finish.


Заявка evt_start ставится в очередь заявок самой первой, прямо во время привязки агента к диспетчеру. Именно благодаря этой заявке у агента вызывается виртуальный метод so_evt_start.


Заявка evt_finish является самой последней заявкой для агента. Происходит это в процессе дерегистрации агента. И evt_finish должна быть самой последней заявкой, которая для агента будет выполнена. Никакие другие заявки после обработки evt_finish для агента обрабатываться не должны.


Так вот, поскольку это архиважные заявки для жизненного цикла агента, то выбрасывать их категорически нельзя. Это должно быть учтено при реализации очереди заявок.


Если заявки упорядочиваются в очереди согласно приоритетам, то у заявки evt_start должен быть наивысший приоритет. А у заявки evt_finish — самый низкий.


В SObjectizer 5.5, 5.6 и 5.7 заявки evt_start и evt_finish можно различить не по execution_demand_t::m_msg_type, как для обычных заявок. А по execution_demand_t::m_demand_handler. Ниже будет показано, как именно это происходит. Возможно, в SObjectizer-5.8 будет применен единообразный подход и все будет идентифицироваться посредством execution_demand_t::m_msg_type. Но в ближайших планах ветки 5.8 нет от слова совсем :)


Написание собственного demand_queue_t


В so5_custom_queue_disps_demo есть три реализации собственных очередей заявок, начиная от самой тривиальной до более-менее сложной. В этой статье мы рассмотрим два крайних случая.


Простейший случай: simple_fifo_t


Простейшая реализация использует обычную FIFO очередь без всяких изысков. Поэтому ее реализация тривиальна и компактна:


class simple_fifo_t final : public custom_queue_disps::demand_queue_t
   {
      std::queue< so_5::execution_demand_t > m_queue;

   public:
      simple_fifo_t() = default;

      [[nodiscard]]
      bool
      empty() const noexcept override { return m_queue.empty(); }

      [[nodiscard]]
      std::optional<so_5::execution_demand_t>
      try_extract() noexcept override
         {
            std::optional<so_5::execution_demand_t> result{
               std::move(m_queue.front())
            };
            m_queue.pop();

            return result;
         }

      void
      push( so_5::execution_demand_t demand ) override
         {
            m_queue.push( std::move(demand) );
         }
   };

Поскольку мы обеспечиваем строгий FIFO и ничего не выбрасываем, то нам здесь даже не нужно задумываться о существовании заявок evt_start/evt_finish.


Наиболее сложный случай: dynamic_per_agent_priorities_t


Наиболее сложная реализация использует динамически задаваемые приоритеты, которые назначаются агенту в зависимости от типа получаемого сообщения.


Для того, чтобы хранить описания заданных пользователем приоритетов потребуются следующие типы данных и члены класса dynamic_per_agent_priorities_t:


using type_to_prio_map_t = std::map< std::type_index, priority_t >;

using agent_to_prio_map_t =
            std::map< so_5::agent_t *, type_to_prio_map_t >;

std::mutex m_prio_map_lock;

agent_to_prio_map_t m_agent_prios;

Можно увидеть std::mutex который и будет задействован для обеспечения thread-safety при работе с m_agent_prios.


Здесь нам нужно использовать очередь с приоритетами. И, дабы воспользоваться std::priority_queue из стандартной библиотеки, мы будем хранить в очереди не so_5::execution_demand_t, а собственный класс:


struct actual_demand_t
   {
      so_5::execution_demand_t m_demand;
      priority_t m_priority;

      actual_demand_t(
         so_5::execution_demand_t demand,
         priority_t priority )
         :  m_demand{ std::move(demand) }
         ,  m_priority{ priority }
         {}

      [[nodiscard]]
      bool
      operator<( const actual_demand_t & o ) const noexcept
         {
            return m_priority < o.m_priority;
         }
   };

Далее, для реализации метода push(), в котором новая заявка должна встать в очередь согласно своему приоритету, нам потребуется определить тип заявки, кому она адресуется и какой из всего этого получается приоритет:


[[nodiscard]]
priority_t
handle_new_demand_priority( const so_5::execution_demand_t & d ) noexcept
   {
      if( so_5::agent_t::get_demand_handler_on_start_ptr()
            == d.m_demand_handler )
         return highest;

      if( so_5::agent_t::get_demand_handler_on_finish_ptr()
            == d.m_demand_handler )
         {
            std::lock_guard< std::mutex > lock{ m_prio_map_lock };
            m_agent_prios.erase( d.m_receiver );
            return lowest;
         }

      {
         std::lock_guard< std::mutex > lock{ m_prio_map_lock };
         auto it_agent = m_agent_prios.find( d.m_receiver );
         if( it_agent != m_agent_prios.end() )
            {
               auto it_msg = it_agent->second.find( d.m_msg_type );
               if( it_msg != it_agent->second.end() )
                  return it_msg->second;
            }
      }

      return normal;
   }

Сперва мы проверяем, является ли заявка заявкой типа evt_start. Если да, то ей присваивается наивысший приоритет.


Далее такая же проверка делается для заявки типа evt_finish. Но если пришла заявка evt_finish, то кроме назначения ей наименьшего приоритета мы делаем дополнительное действие: удаляем информацию о приоритетах для этого агента. Т.к. эта информация больше не потребуется, других заявок уже не будет.


Также в коде метода handle_new_demand_priority() можно обратить внимание на захват mutex-а в тех местах, где нам требуется модифицировать информацию о приоритетах. Это необходимо делать, т.к. эта информация модифицируется/используется не только при работе push(), но и при работе метода define_priority() о котором диспетчер не знает.


Вот, собственно, и все особенности. Остальная часть тривиальна:


void
define_priority(
   so_5::agent_t * receiver,
   std::type_index msg_type,
   priority_t priority )
   {
      std::lock_guard< std::mutex > lock{ m_prio_map_lock };

      m_agent_prios[ receiver ][ msg_type ] = priority;
   }

[[nodiscard]]
bool
empty() const noexcept override { return m_queue.empty(); }

[[nodiscard]]
std::optional<so_5::execution_demand_t>
try_extract() noexcept override
   {
      std::optional<so_5::execution_demand_t> result{
         m_queue.top().m_demand
      };
      m_queue.pop();

      return result;
   }

void
push( so_5::execution_demand_t demand ) override
   {
      const auto prio = handle_new_demand_priority( demand );
      m_queue.emplace( std::move(demand), prio );
   }

Приватная часть demand_queue_t


Кроме описанной выше публичной части demand_queue_t есть еще и "приватная" часть, которая предназначена для использования только со стороны диспетчера:


class demand_queue_t
   {
      demand_queue_t * m_next{ nullptr };

   public:
      [[nodiscard]]
      demand_queue_t *
      next() const noexcept { return m_next; }

      void
      set_next( demand_queue_t * q ) noexcept { m_next = q; }

      void
      drop_next() noexcept { set_next( nullptr ); }

Эта часть нужна для того, чтобы диспетчер мог провязывать непустые очереди заявок в интрузивный односвязный список.


dispatcher_handle. Что это и зачем?


После того, как с очередями разобрались, можно перейти к самому диспетчеру. И первое, с чем мы сталкиваемся, так это с тем, что в SObjectizer 5.6 и 5.7 нет никакого явного интерфейса для диспетчера, как это было в SObjectizer 5.5 и более ранних версиях. Т.е. реализовать диспетчер можно как угодно и в виде чего угодно (в SO-5.5 же диспетчер должен был наследоваться от специального класса dispatcher_t).


В SO-5.6/5.7 пользователь взаимодействует с диспетчерами посредством двух сущностей: dispatcher_handle и disp_binder. При disp_binder мы поговорим ниже, а пока рассмотрим dispatcher_handle.


За создание экземпляра диспетчера в SO-5.6/5.7 обычно отвечает функция-фабрика make_dispatcher(). Эта функция должна что-то возвратить, но что, если для диспетчера в современном SObjectizer-е нет никакого C++ного интерфейса?


А вот некий дескриптор/хэндл и возвращается. Именно это и называется dispatcher_handle.


Dispatcher_handle может рассматриваться как shared_ptr для какого-то неизвестного пользователю типа. И работать dispatcher_handle должен именно как shared_ptr: пока есть хотя бы один непустой dispatcher_handle, ссылающийся на экземпляр диспетчера, этот экземпляр будет существовать и будет работать.


Обычно у dispacher_handle есть публичный метод binder() который создает экземпляр disp_binder-а для этого диспетчера. Возможно, у dispatcher_handle есть и другие методы, специфические для конкретного типа диспетчера. Но обычно это binder() и несколько методов, делающих dispatcher_handle похожим на shared_ptr.


В рассматриваемой реализации у dispatcher_handler минималистичный интерфейс:


namespace impl
{

class dispatcher_t;

using dispatcher_shptr_t = std::shared_ptr< dispatcher_t >;

class dispatcher_handle_maker_t;

} /* namespace impl */

class [[nodiscard]] dispatcher_handle_t
   {
      friend class impl::dispatcher_handle_maker_t;

      impl::dispatcher_shptr_t m_disp;

      dispatcher_handle_t( impl::dispatcher_shptr_t disp );

      [[nodiscard]]
      bool
      empty() const noexcept;

   public :
      dispatcher_handle_t() noexcept = default;

      [[nodiscard]]
      so_5::disp_binder_shptr_t
      binder( demand_queue_shptr_t demand_queue ) const;

      [[nodiscard]]
      operator bool() const noexcept { return !empty(); }

      [[nodiscard]]
      bool
      operator!() const noexcept { return empty(); }

      void
      reset() noexcept;
   };

Функция-фабрика make_dispatcher() для нашего one_thread-диспетчера будет возвращать экземпляр dispatcher_handler именно этого типа.


disp_binder. Что это и что нам нужно от disp_binder для one_thread-диспетчера?


Выше мы уже несколько раз упоминали «привязку агентов к диспетчерам». Теперь же настало время поговорить об это в подробностях.


И вот тут в дело вступают специальные объекты под названием disp_binder-ы. Они служат как раз для того, чтобы привязать агента к диспетчеру при регистрации кооперации с агентом. А также для того, чтобы отвязать агента от диспетчера при дерегистрации кооперации.



В SObjectizer определен интерфейс, который должны поддерживать все disp_binder-ы. Конкретные же реализации disp_binder-ов зависят от конкретного типа диспетчера. И каждый диспетчер реализует свои собственные disp_binder-ы.


Начиная с версии 5.6 интерфейс этот выглядит следующим образом:


class disp_binder_t
   : private std::enable_shared_from_this< disp_binder_t >
{
   public:
      disp_binder_t() = default;
      virtual ~disp_binder_t() noexcept = default;

      virtual void
      preallocate_resources( agent_t & agent ) = 0;

      virtual void
      undo_preallocation( agent_t & agent ) noexcept = 0;

      virtual void
      bind( agent_t & agent ) noexcept = 0;

      virtual void
      unbind( agent_t & agent ) noexcept = 0;
};

Три первых метода, preallocate_resources(), undo_preallocation() и bind() используются при привязке агента к диспетчеру.


Их три поскольку регистрация агента — это достаточно сложный процесс, который SObjectizer пытается провести в транзакционной манере. Т.е. либо все агенты кооперации успешно регистрируются, либо не регистрируется не один из них. Для обеспечения этой транзакционности процесс регистрации разбит на несколько стадий.


На первой стадии SObjectizer пытается выделить все ресурсы, необходимые для агентов из новой кооперации. Например, какие-то диспетчеры должны создать для новых агентов новые рабочие нити. Как раз на этой стадии у disp_binder-а вызывается preallocate_resources(). В этом методе disp_binder должен создать все, что агенту потребуется для работы внутри SObjectizer (например, новая рабочая нить, очередь заявок и т.д.). Если все это создалось нормально, то disp_binder должен сохранить это у себя до тех пор, пока у него не вызовут метод bind() для этого же агента.


На стадии резервирования ресурсов могут возникнуть проблемы. Например, не запустится новая рабочая нить. Или не хватит памяти для создания еще одной очереди заявок.


При возникновении подобных проблем все, что было сделано до этого момента, нужно откатить. Для чего и предназначен метод undo_preallocation(). Если у disp_binder-а вызывается метод undo_preallocation(), то disp_binder должен освободить все ресурсы для агента, которые ранее были зарезервированы в preallocate_resources().


Если же стадия резервирования ресурсов завершилась успешно, то выполняется стадия собственно привязки агентов к диспетчерам. И вот здесь уже у disp_binder-а вызывается метод bind(). В этом методе disp_binder обязательно должен вызывать у привязываемого агента метод so_bind_to_dispatcher().


Метод bind() не случайно помечен как noexcept, т.к. на этой стадии исключений SObjectizer не ожидает (а если таковое возникнет, то восстановиться уже не получится).


Метод unbind(), очевидно, используется уже когда кооперация дерегистрируется и агент завершил все свои активности на рабочем контексте (включая и обработку evt_finish). Так что в unbind() disp_binder должен освободить все ресурсы, которые были выделены для агента в preallocate_resources().


Возможно звучит все это сложновато. Но в данном случае реализация disp_binder-а оказывается очень простой:


class actual_disp_binder_t final : public so_5::disp_binder_t
   {
      actual_event_queue_t m_event_queue;

   public:
      actual_disp_binder_t(
         demand_queue_shptr_t demand_queue,
         dispatcher_data_shptr_t disp_data ) noexcept
         :  m_event_queue{ std::move(demand_queue), std::move(disp_data) }
         {}

      void
      preallocate_resources(
         so_5::agent_t & /*agent*/ ) override
         {}

      void
      undo_preallocation(
         so_5::agent_t & /*agent*/ ) noexcept override
         {}

      void
      bind(
         so_5::agent_t & agent ) noexcept override
         {
            agent.so_bind_to_dispatcher( m_event_queue );
         }

      void
      unbind(
         so_5::agent_t & /*agent*/ ) noexcept override
         {}
   };

Все, что данный disp_binder должен сделать — это вызывать so_bind_to_dispatcher(). Никакого резервирования ресурсов выполнять не нужно. Т.к. единственный ресурс — это экземпляр actual_event_queue_t, который автоматически создается вместе с disp_binder-ом.


one_thread-диспетчер


Рассматриваемый нами one_thread-диспетчер состоит из трех частей.


Во-первых, это структура dispatcher_data_t, которая хранит необходимые диспетчеру данные:


struct dispatcher_data_t
   {
      std::mutex m_lock;
      std::condition_variable m_wakeup_cv;

      bool m_shutdown{ false };

      demand_queue_t * m_head{ nullptr };
      demand_queue_t * m_tail{ nullptr };
   };

using dispatcher_data_shptr_t =
      std::shared_ptr< dispatcher_data_t >;

Эти данные выделены в отдельную структуру для того, чтобы к ним можно было напрямую обращаться из event_queue, которая будет отвечать за сохранение заявок агентов в очереди заявок и за добавление ссылки на непустую очередь заявок в список диспетчерах.


Так что вторая часть диспетчера — это реализация интерфейса event_queue_t для one_thread-диспетчера:


class actual_event_queue_t final : public so_5::event_queue_t
   {
      demand_queue_shptr_t m_demand_queue;
      dispatcher_data_shptr_t m_disp_data;

   public:
      actual_event_queue_t(
         demand_queue_shptr_t demand_queue,
         dispatcher_data_shptr_t disp_data ) noexcept
         :  m_demand_queue{ std::move(demand_queue) }
         ,  m_disp_data{ std::move(disp_data) }
         {}

      void
      push( so_5::execution_demand_t demand ) override
         {
            std::lock_guard< std::mutex > lock{ m_disp_data->m_lock };

            auto & q = *m_demand_queue;
            const bool queue_was_empty = q.empty();
            q.push( std::move(demand) );

            if( queue_was_empty )
               {
                  // В этом блоке кода исключений быть не должно.
                  [&]() noexcept {
                     const bool disp_was_sleeping =
                           (nullptr == m_disp_data->m_head);

                     if( disp_was_sleeping )
                        {
                           m_disp_data->m_head = m_disp_data->m_tail = &q;

                           m_disp_data->m_wakeup_cv.notify_one();
                        }
                     else
                        {
                           m_disp_data->m_tail->set_next( &q );
                           m_disp_data->m_tail = &q;
                        }
                  }();
               }
         }
   };

Именно экземпляр такой event_queue и хранится внутри описанного выше disp_binder-а.


Тут нужно отметить, что в actual_event_queue_t хранится два умных указателя. Один на demand_queue, второй на экземпляр dispatcher_data_t. Тем самым контролируется время жизни этих сущностей. Т.е. и demand_queue, и dispatcher_data_t живут до тех пор, пока есть хотя бы один actual_event_queue_t. А поскольку actual_event_queue_t является частью disp_binder-а, то время жизни demand_queue и dispatcher_data_t определяется временем жизни disp_binder-ов. Когда все disp_binder-у исчезнут, пропадет и надобность в demand_queue и dispatcher_data_t.


Третья часть диспетчера — это собственно dispatcher_t, который запускает единственную рабочую нить и использует ее для обработки заявок. Полный код класса dispatcher_t можно увидеть в репозитории. Здесь же мы посмотрим лишь на два фрагмента.


Первый фрагмент — это основная функция рабочей нити и обслуживание заявок из непустых demand_queue:


class dispatcher_t final
   :  public std::enable_shared_from_this< dispatcher_t >
   {
      dispatcher_data_t m_disp_data;

      std::thread m_worker_thread;

      void
      thread_body() noexcept
         {
            const auto thread_id = so_5::query_current_thread_id();
            bool shutdown_initiated{ false };
            while( !shutdown_initiated )
               {
                  std::unique_lock< std::mutex > lock{ m_disp_data.m_lock };
                  shutdown_initiated = try_extract_and_execute_one_demand(
                        thread_id,
                        std::move(lock) );
               }
         }

      [[nodiscard]]
      bool
      try_extract_and_execute_one_demand(
         so_5::current_thread_id_t thread_id,
         std::unique_lock< std::mutex > unique_lock ) noexcept
         {
            do
               {
                  auto [demand, has_non_empty_queues] =
                        try_extract_demand_to_execute();
                  if( demand )
                     {
                        unique_lock.unlock();
                        demand->call_handler( thread_id );

                        break;
                     }
                  else if( !has_non_empty_queues )
                     {
                        m_disp_data.m_wakeup_cv.wait( unique_lock );
                     }
               }
            while( !m_disp_data.m_shutdown );

            return m_disp_data.m_shutdown;
         }

      [[nodiscard]]
      std::tuple< std::optional< so_5::execution_demand_t >, bool >
      try_extract_demand_to_execute() noexcept
         {
            std::optional< so_5::execution_demand_t > result;

            bool has_non_empty_queues{ false };

            if( !m_disp_data.m_head )
               return { result, has_non_empty_queues };

            auto * dq = m_disp_data.m_head;
            m_disp_data.m_head = dq->next();
            dq->drop_next();

            if( !m_disp_data.m_head )
               m_disp_data.m_tail = nullptr;
            else
               has_non_empty_queues = true;

            result = dq->try_extract();

            if( !dq->empty() )
               {
                  if( m_disp_data.m_tail )
                     m_disp_data.m_tail->set_next( dq );
                  else
                     m_disp_data.m_head = m_disp_data.m_tail = dq;
               }

            return { result, has_non_empty_queues };
         }

Второй фрагмент — это реализация метода make_disp_binder:


[[nodiscard]]
so_5::disp_binder_shptr_t
make_disp_binder(
   demand_queue_shptr_t demand_queue )
   {
      return std::make_shared< actual_disp_binder_t >(
            std::move(demand_queue),
            dispatcher_data_shptr_t{ shared_from_this(), &m_disp_data } );
   }

Важный момент, который стоит здесь пояснить — это факт того, что экземпляр dispatcher_data_t хранится в dispatcher_t по значению. Но в методе make_disp_binder в конструктор actual_disp_binder_t передается shared_ptr<dispatcher_data_t>. Тут всего лишь используется трюк c aliasing constructor для std::shared_ptr: хранить shared_ptr будет указатель на объект T, но вот счетчик ссылок будет использоваться от объекта Y. В нашем случае счетчик ссылок от dispatcher_t.


Вот, собственно, и все. Если кто-то хочет взглянуть на реализацию dispatcher_handler и make_dispatcher, то сделать это можно здесь.


Заключение


Данная статья преследует две цели:


Во-первых, хочется показать, что хоть в самом SObjectizer-е приоритетной доставки сообщений нет, но если это кому-то нужно, то это можно сделать самостоятельно. Например, показанным выше способом.


Во-вторых, хочется понять, насколько вообще может быть востребованна подобная функциональность. Если у тех, кто пробовал SObjectizer или же присматривался к SObjectizer-у, время от времени надобность в приоритетной доставке сообщений возникает, то ее можно добавить в so5extra. Или даже в сам SObjectizer. Мы вполне можем это сделать. Но только если такая функциональность действительно востребована.


Так что если у кого-то из читателей есть мнение на этот счет (скажем, есть желание иметь подобный диспетчер в SObjectizer-е прямо "искаропки"), то можно высказаться в комментариях. Обязательно прислушаемся.


Кстати, SObjectizer-5 уже 10 лет


Разработка SObjectizer-5 началась осенью 2010-го года и продолжается до сих пор. Радует и удивляет. А если кому-то интересно что лично я думаю по этому поводу, то можно заглянуть сюда.


Но еще больше меня поражает то, что кто-то берет и использует наш SObjectizer для реализации серьезных проектов несмотря на то, что его делают никому неизвестные люди из белорусской глубинки и за спиной SObjectizer-а нет больших компаний с громкими именами и толстыми кошельками. Вот это удивляет по-настоящему. А еще вселяет в нас уверенность в том, что все это не зря. Так что большое спасибо всем, кто рискнул и выбрал SObjectizer. Если бы не вы, этого маленького юбилея не было бы.