SObjectizer — это относительно небольшой C++17 фреймворк, который позволяет использовать в С++ программах такие подходы, как Actor Model, Publish-Subscribe и Communicating Sequential Processes (CSP), что упрощает разработку сложных многопоточных приложений. Если читатель в первый раз слышит о SObjectizer-е, то составить впечатление о нем можно ознакомившись вот с этой статьей.

Недавно состоялся релиз очередной мажорной версии, 5.8.0. Это хороший повод еще раз напомнить о проекте и сказать несколько слов о наиболее важном в очередном релизе, в том числе и о (частичном) сломе совместимости с предыдущей веткой 5.7.

Кому интересно, милости прошу под кат.

Самое интересное/значимое из новенького

Появление message_sink-ов

Преамбула

Ключевая фича, ради которой ветка 5.8 и была создана, -- это предоставление возможности подписать mbox на сообщения из другого mbox-а. Т.е. при отправке сообщения в mbox_A оно автоматически перенаправляется в mbox_X.

Звучит, наверное, странно, поэтому попробую объяснить на пальцах.

В SObjectizer агенты взаимодействуют посредством почтовых ящиков (mbox-ов): агент A, который хочет отослать сообщение агенту X, должен отправить сообщение в некий почтовый ящик, а агент X должен подписаться на сообщение из этого почтового ящика.

При этом почтовый ящик выступает в качестве брокера сообщений: mbox знает кто подписан на сообщения и, когда сообщение в mbox поступает, mbox отдает его только подписчикам именно этого сообщения.

Обычный MPMC mbox до версии 5.8.0
Обычный MPMC mbox до версии 5.8.0

Изначально в SObjectizer подписчиками могли быть только агенты. Что всегда казалось и логичным, и достаточным.

Но по мере использования SObjectizer в разных условиях стали возникать сценарии, когда такое поведение начало ощущаться как ограничение.

Допустим, у нас есть сложное приложение, обрабатывающее потоки данных. Внутри приложения данные переносятся посредством сообщения msg_incoming_data. Есть агенты-поставщики, которые берут данные из каких-то источников и отсылают сообщения msg_incoming_data. И есть агенты-потребители, которые принимают msg_incoming_data и делают обработку содержащихся в сообщении данных. Приложение сложное, есть разные типы поставщиков, разные типы потребителей, разные связи между ними.

Например, может быть агент device_reader для чтения данных с подключенного к компьютеру устройства. Этот агент создает MPMC-mbox в который отсылает сообщения msg_incoming_data с очередной порцией данных. Все желающие могут подписаться на этот mbox и начать обрабатывать msg_incoming_data.

Параллельно может быть агент trend_detector, который ждет msg_incoming_data на свой direct_mbox и выполняет некоторую специфическую обработку. Этот агент прекрасно работает в конфигурации, в которой msg_incoming_data отсылаются ему напрямую.

Пример взаимодействия агентов
Пример взаимодействия агентов

И вот в один прекрасный момент нам потребовалось сделать так, чтобы msg_incoming_data, отсылаемые device_reader на MPMC-mbox, приходили на direct_mbox агента trend_detector. При этом агенты уже написаны, оттестированы и никто из них ничего не знает друг про друга.

Требуемая схема взаимодействия агентов
Требуемая схема взаимодействия агентов

В предыдущих версиях SObjectizer выбор у нас был небольшим:

  • либо мы переписываем какого-то из агентов так, чтобы он мог использовать чужой mbox (т.е. либо device_reader отсылает сообщения напрямую в mbox trend_detector-а, либо trend_detector должен подписываться на сообщения из mbox-а device_reader-а);

  • либо мы делаем вспомогательного агента, который подписывается на mbox device_reader-а и пересылает сообщения в mbox trend_detector-а.

Переделка уже существующих агентов не выглядит хорошим подходом, особенно если какой-то из агентов написан не нами. Да и гибкость у этого варианта так себе. Вдруг потребуется другой способ коммутации агентов и что, переписывать их еще раз?

Вариант с дополнительным агентом, занимающимся только пересылкой сообщений, дает большую гибкость и не требует модификации уже написанного и отлаженного кода. Но и здесь есть свои неприятные стороны:

  • цепочка доставки сообщения до получателя удлиняется: сообщение сперва летит к промежуточному агенту и лишь затем к целевому. И хотя сообщения в SObjectizer летают быстро, все-таки мы же в C++ ради производительности и негоже терять эту самую производительность на ровном месте;

  • не так-то просто становится управлять подобными маршрутами в динамике. Например, сперва нам может быть необходимо получать сообщения из mbox_A и отправлять их в mbox_X, затем потребуется начать получать сообщения из mbox_B (но продолжать отправлять в mbox_X), затем нужно будет вернуться к получению из mbox_A, но доставлять уже в mbox_Y. Когда все это делается вспомогательным агентом, то управлять его подписками становится сложнее.

Возможно, ситуация выглядит несколько надуманной и чрезмерно усложненной. Собственно, таковой она и мне показалась, когда нам первый раз о ней рассказали. Посему, не очень-то и хотелось модифицировать SObjectizer для подобных сценариев, очень уж экзотическими они выглядели...

Но спустя какое-то время мне самому довелось столкнуться с необходимостью поиметь что-то похожее. Причем смысл был именно в том, чтобы оставить уже существующих, работающих по своим правилам агентов как есть, модифицировав лишь цепочку находящихся между ними mbox-ов. Ну а когда сам попадаешь в ловушку, то появляется и настоятельная потребность найти из нее выход.

Идея message_sink-ов

В итоге решение было найдено в дополнительной абстракции message_sink. Ранее получателями сообщений могли быть только агенты. Теперь же в SObjectizer есть некий абстрактный потребитель сообщения. Агент же становится лишь частным случаем такого потребителя.

Обычный MPMC mbox начиная с версии 5.8.0
Обычный MPMC mbox начиная с версии 5.8.0

Ну а поскольку получателями сообщений теперь могут быть разные сущности, то появляется возможность сделать такой message_sink, который переадресует сообщение в другой mbox.

Поэтому, если нам нужно, чтобы сообщение msg_incoming_data попало из mbox_A в mbox_X без каких-либо вспомогательных агентов, то просто создается message_sink, который пересылает msg_incoming_data в mbox_X, и этот message_sink подписывается на mbox_A. При этом отправитель сообщения в mbox_A даже не будет подозревать о том, что сообщение из mbox_A окажется в mbox_X (собственно, в этом и был смысл mbox-ов изначально).

Итоговая схема взаимодействия агентов
Итоговая схема взаимодействия агентов

Готовые средства для работы с message_sink-ами

Чтобы программисту не приходилось реализовывать собственные message_sink-и для таких простых ситуаций, как "взять сообщение M из mbox_A и переложить его в mbox_X", в SO-5.8.0 добавлена пара вспомогательных типов.

Первый из них, so_5::single_sink_binding_t, предназначен для простых ситуаций, когда нужно сообщение из mbox_A переслать в единственный mbox_X. Что-то вроде:

const so_5::mbox_t mbox_A = ...;
const so_5::mbox_t mbox_X = ...;
auto binding = std::make_unique<so_5::single_sink_binding_t>();
binding->bind<msg_incoming_data>(mbox_A, so_5::wrap_to_msink(mbox_X));

// Теперь msg_incoming_data автоматически попадет в mbox_X.
so_5::send<msg_incoming_data>(mbox_A, ...);

// Переадресуем сообщение на другой mbox.
const so_5::mbox_t mbox_Y = ...;
// Связь mbox_A >>> mbox_X будет автоматически разрушена, а новая
// связть mbox_A >>> mbox_Y будет создана.
binding->bind<msg_incoming_data>(mbox_A, so_5::wrap_to_msink(mbox_Y));

// Теперь msg_incoming_data автоматически отправится в mbox_Y, а в mbox_X не попадет.
so_5::send<msg_incoming_data>(mbox_A, ...);

Если нужно что-то посложнее, то есть еще и so_5::multi_sink_binding_t, который может сразу делать переадресацию сообщения на несколько mbox-ов:

const so_5::mbox_t mbox_A = ...;
const so_5::mbox_t mbox_X = ...;
auto binding = std::make_unique< so_5::multi_sink_binding_t<> >();
binding->bind<msg_incoming_data>(mbox_A, so_5::wrap_to_msink(mbox_X));

// Теперь msg_incoming_data автоматически попадет в mbox_X.
so_5::send<msg_incoming_data>(mbox_A, ...);

// Добавим переадресацию на еще один mbox.
const so_5::mbox_t mbox_Y = ...;
// В дополнение к связи mbox_A >>> mbox_X создадим еще и mbox_A >>> mbox_Y.
binding->bind<msg_incoming_data>(mbox_A, so_5::wrap_to_msink(mbox_Y));

// Теперь msg_incoming_data автоматически попадет и в mbox_X, и в mbox_Y.
so_5::send<msg_incoming_data>(mbox_A, ...);

Еще одно принципиальное отличие single_sink_binding_t от multi_sink_binding_t в том, что single_sink_binding в принципе не thread safe, тогда как multi_sink_binding по умолчанию обеспечивает thread safety. Предполагается, что у single_sink_binding будет всего один (агент) владелец, который будет работать с single_sink_binding монопольно на собственном рабочем контексте. Тогда как у multi_sink_binding может быть несколько (агентов) владельцев, которые могут работать с экземпляром multi_sink_binding одновременно на разных рабочих нитях.

Вышеупомянутые single_sink_binding_t и multi_sink_binding_t -- это минимальный набор, который закрывает известные нам на текущий момент потребности. Если этого недостаточно и хочется заиметь какой-то собственный хитрый message_sink, то придется сделать это самостоятельно.

Подозреваю, что подобное занятие может оказаться не самым простым и потребует погружения в потроха SObjectizer-а. В этом случае я бы порекомендовал обратиться к нам за советом или за помощью.

Для чего еще могли бы пригодится message_sink-и?

Из очевидного:

  • для того, чтобы делать еще более сложные схемы переадресации сообщений между mbox-ами. Например, сообщение msg_incoming_data, отправленное в mbox_A, в зависимости от содержимого может быть переадресовано в один из нескольких mbox-ов получателей mbox_X, mbox_Y, mbox_Z;

  • для того, чтобы доставлять сообщения из SObjectizer-а в какой-то другой механизм обработки событий. Например, приложение уже использует какой-то фреймворк со своими очередями сообщений и механизмами диспетчеризации обработчиков для этих сообщений. И в это приложение добавляется часть, написанная на SObjectizer. Посредством message_sink-ов можно построить мост, который будет прокидывать сообщение, отправленное в SObjectizer-овский mbox, в очередь сообщений старого фреймворка.

Наверняка есть и что-то менее очевидное. Так что будем посмотреть как это новшество приживется.

Последствие внедрения message_sink-ов для реализации mbox-ов

У добавления message_sink-ов в SObjectizer получился небольшой, но, как мне кажется, приятный бонус: mbox-ам теперь не нужно контролировать message_limits.

Ранее ответственность за проверку превышения лимита для сообщения и инициацию реакции на превышение лежала на mbox-ах. Поэтому в полноценных mbox-а приходилось делать неочевидные телодвижения (вроде чего-то такого). Плюс при хранении подписок нужно было заморачиваться еще и хранением информации о лимитах. В общем, message_limits делали написание собственных mbox-ов более сложной задачей, чем хотелось бы.

Теперь же mbox-ы освобождены от необходимости иметь дело с message_limits, эту ношу мы переложили на стандартные message_sink-и, которые обслуживают подписки агентов. Поэтому написание собственного mbox-а под специфические задачи, на мой взгляд, стало более простым занятием. Что не может не радовать.

Борьба с std::bad_alloc и появление двух новых диспетчеров

Преамбула

В реализации SObjectizer есть отдельная большая тема, связанная с динамической памятью и noexcept-гарантиями.

Дело в том, что SObjectizer активно использует динамическую память: агенты и кооперации создаются в динамической памяти, сообщения создаются в динамической памяти, очереди сообщений реализуются посредством использования динамической памяти и т.д.

Оно, как бы, не всегда хорошо (для hard real time нехорошо вообще), но пока что вот так.

Однако, есть нюанс: вызывая new нужно быть готовым получить в ответ std::bad_alloc и с этим нужно что-то делать.

В частности, было бы хорошо, чтобы приложение, в котором std::bad_alloc возникло, могло корректно завершить работу. Применительно к SObjectizer-у это означает, что приложение должно иметь возможность нормально дерегистировать все уже запущенные кооперации.

И вот этот момент в предыдущих версиях SObjectizer не был учтен. В деструкторах некоторых объектов создавались вспомогательные временные контейнеры (которые аллоцировали память под свои нужды), да и сама по себе операция дерегистрации кооперации требовала включение кооперации в специальную очередь для выполнения финальных действий по очистке связанных с кооперацией ресурсов (для чего также требовалась аллокация).

В общем, если написанное на SObjectizer приложение начинало завершение своей работы из-за std::bad_alloc, то уже сам SObjectizer мог спровоцировать последующие std::bad_alloc. А когда std::bad_alloc вылетает из деструктора какого-то объекта внутри SObjectizer-а, то это ведет к std::terminate и убийству всего приложения.

Звучит все это печально и здесь остается только посыпать голову пеплом: да, недосмотрели ????

В свое оправдание можно сказать, что (насколько нам известно) SObjectizer в основном используется в server-side приложениях. Там падение с рестартом из-за исчерпания ресурсов не особо критично. По крайней мере с жалобами на этот недостаток пока никто не приходил.

Но постепенно SObjectizer начинает использоваться и в desktop-приложениях. И вот здесь такое поведение SObjectizer-а стало меня откровенно пугать. Значит с этим нужно было что-то делать. И вот что было сделано...

Рефакторинг потрохов SObjectizer

Были проведены инспекция и рефакторинг существующего кода. Если какие-то действия, выполнявшиеся в деструкторах могли потребовать динамической аллокации памяти, то они были переписаны так, чтобы от этих аллокаций избавиться. Так что теперь, надеюсь, деструкторы SObjectizer-овских объектов не могут спровоцировать std::bad_alloc.

Также была модифицирована процедура окончательной дерегистрации кооперации.

Окончательная дерегистрация -- это когда все агенты кооперации уже завершили свою работу и нужно очистить связанные с кооперацией ресурсы: отвязать агентов от диспетчеров, уничтожить агентов и т.д., и т.п. Когда кооперация дерегистрируется, то сперва все ее агенты завершают свою работу (у них вызывается so_evt_finish), и лишь затем кооперация помещается в очередь для окончательной дерегистрации.

Ранее в качестве очереди для окончательной дерегистрации кооперации применялся обычный mchain, а сейчас объекты so_5::coop_t провязываются в интрузивный список, что не требует аллокаций и, следовательно, не может привести к std::bad_alloc.

Диспетчеры nef_one_thread и nef_thread_pool

Один из ключевых моментов при дерегистрации кооперации -- это завершение работы агентов дерегистрируемой кооперации. У агента должен быть вызван виртуальный метод so_evt_finish, после чего агент должен сообщить кооперации, что он закончил свою работу.

Для этого SObjectizer отсылает каждому агенту специальное событие evt_finish. И это событие доставляется до агента обычным образом -- через очередь сообщений того диспетчера, к которому агент привязан.

И в этом есть проблема: если диспетчер применяет очереди сообщений, использующие динамическую память (а все штатные диспетчеры SObjectizer-а именно такие), то нет гарантий, что для evt_finish всегда найдется память. Т.е., теоритически, мы можем получить std::bad_alloc при попытке доставить evt_finish до дерегистрируемого агента.

Это означает, что не смотря на все предпринятые меры, SObjectizer все таки может породить std::bad_alloc при завершении своей работы. И связано это с тем, что штатным диспетчерам нужно поставить evt_finish в очередь, эта операция может потребовать динамической аллокации, а динамическая аллокация может привести к std::bad_alloc.

В SObjectizer-5.8.0 добавлено два новых диспетчера, которые обеспечивают гарантию "noexcept for evt_finish": nef_one_thread и nef_thread_pool. Они для каждого агента, который привязывается к диспетчеру, предварительно аллоцируют событие evt_finish. Если на этой стадии возникает std::bad_alloc, то ничего страшного, просто произойдет откат процедуры регистрации. Но если же агент был успешно привязан к диспетчеру, то с доставкой ему evt_finish проблем уже нет: это событие преаллоцировано для каждого агента, диспетчеры используют интрузивные списки в качестве очередей, так что уже заблаговременно созданный evt_finish просто "прошивается" в нужный список.

Поэтому, если пользователю нужно, чтобы в его приложении SObjectizer корректно завершался в условиях нехватки памяти, то следует использовать новые nef_one_thread и nef_thread_pool диспетчеры.

Старые диспетчеры (one_thread, active_obj, active_group, thread_pool, adv_thread_pool, диспетчеры с поддержкой приоритетов) гарантию "noexcept for evt_finish" не обеспечивают. Связано это с тем, что такая гарантия не бесплатна (так, nef_one_thread работает чуть-чуть, но медленнее one_thread). И если пользователя старое поведение SObjectizer в условиях std::bad_alloc не беспокоило, то он может продолжать использовать привычные ему инструменты и дальше без новых накладных расходов.

От себя добавлю, что появление новых диспетчеров nef_one_thread и nef_thread_pool -- это эксперимент. Нужно посмотреть, к чему он приведет. Если со временем выяснится, что гарантия "noexcept for evt_finish" нужна по умолчанию, то такая гарантия будет обеспечена и для других штатных диспетчеров. Но здесь, опять же, будем посмотреть на то, как события будут развиваться дальше и кто и насколько будет заинтересован в таких гарантиях.

Расширение интерфейса event_queue_t

Для того, чтобы сделать новые диспетчеры nef_one_thread и nef_thread_pool потребовалось расширить интерфейс so_5::event_queue_t, который должен реализовываться всеми очередями сообщений у SObjectizer-овских диспетчеров. Ранее в нем был всего один метод push, теперь же методов стало три:

void push(execution_demand_t) = 0;

void push_evt_start(execution_demand_t) = 0;

void push_evt_finish(execution_demand_t) noexcept = 0;

Старый метод push продолжает использоваться как обычно -- SObjectizer вызывает его когда агенту нужно доставить обычное сообщение.

Новый метод push_evt_start теперь используется SObjectizer-ом для отсылки самого первого события, evt_start, когда агент регистрируется в SObjectizer Environment (именно это событие ведет к вызову so_evt_start).

При этом важно подчеркнуть, что методы push и push_evt_start не имеют отметки noexcept. Этим методам разрешен выброс исключений и SObjectizer к этому готов (в зависимости от ситуации обеспечивается либо базовая, либо сильная гарантия exception safety). Т.е. если при вызове push или push_evt_start вылетит исключение, то с SObjectizer-ом ничего страшного не произойдет.

Новый метод push_evt_finish теперь используется SObjectizer-ом для отсылки самого последнего события для агента, evt_finish. И этот метод помечен как noexcept. Сделано это потому, что восстановиться после исключения из push_evt_finish SObjectizer не может.

Реализации event_queue_t для новых диспетчеров nef_one_thread и nef_thread_pool гарантируют то, что push_evt_finish не бросают исключения.

А вот реализации event_queue_t для других штатных диспетчеров SObjectizer-а такой гарантии не дают. Поэтому, если при дерегистрации агента, привязанного к one_thread или active_obj диспетчеру, в push_evt_finish возникнет исключение, то все приложение будет убито через std::terminate.

Добавлю так же, что Asio-диспетчеры из сопутствующего проекта so5extra, так же не дают noexcept-гарантий для push_evt_finish (там в реализации используется Asio-шный post, а этот post может бросать исключения и с этим ничего не поделать).

Перенос unique_subscribers mbox-а из so5extra в ядро SObjectizer

Проект so5extra рассматривался нами как коллекция примамбасов для SObjectizer-а, которые по тем или иным причинам не хотелось включать в ядро SObjectizer-а.

За время своего развития so5extra обзавелся множеством (в основном) полезных возможностей, одной из которых стал unique_subscribers mbox. Это такой хитрый mbox, который внешне выглядит как MPMC (т.е. подписки могут делать разные агенты-получатели), но работает как MPSC: поддерживается доставка мутабельных сообщений и гарантируется, что на мутабельное сообщение может подписаться один единственный получатель.

Этот тип mbox-ов был добавлен в so5extra относительно недавно, одним из самых последних. Но зато он оказался одним из самых полезных и востребованных, в каких-то ситуациях без него просто не обойтись.

Ну а раз он настолько полезен, то как-то нехорошо заставлять пользователя подключать к своему проекту еще одну библиотеку ради unique_subscribers mbox. Поэтому начиная с версии 5.8.0 unique_subscribers mbox стал частью ядра SObjectizer.

Конечно же, это не все

Выше описаны самые значимые изменения/нововведения версии 5.8.0, но это не полный их список. Более полную информацию можно найти на соответствующей странице Wiki проекта. Да и вообще стоит посмотреть в Wiki, там теперь документации стало немного больше. Надеюсь, что серия статьей ByExample, с разбором примеров от простого ко все более сложным, окажется полезной для изучающих SObjectizer.

Насколько же ломающий это релиз?

Версия 5.8.0 открывает новую ветку развития SObjectizer и ломает совместимость с предыдущей веткой 5.7. Что, к сожалению, означает, что перевод проекта с SO-5.7 на SO-5.8 может потребовать модификации исходников. Но насколько это серьезно?

Если в проекте нет реализации собственных mbox-ов и/или диспетчеров, то столкнуться придется вот с чем:

  • из 5.8.0 были удалены вещи, которые в предыдущих версиях SObjectizer-а были помечены как deprecated. Например, вот такой close_drop_content;

  • метод so_5::agent_t::so_exception_reaction() теперь должен быть помечен как noexcept.

По большому счету, здесь нет ничего страшного и сложного. Несколько своих разработок мы перевели на SO-5.8.0 вообще ничего не изменив в исходниках, просто перекомпилировали и все.

А вот если в проекте используются собственные реализации mbox-ов и/или диспетчеров, то адаптация под ветку 5.8 может потребовать серьезных усилий:

  • mbox-ы, как уже было сказано выше, теперь работают с message_sink-ами и не поддерживают message_limits. Поэтому изменились методы подписки/отписки в so_5::abstract_message_box_t;

  • в старой схеме доставки сообщений (которая использовалась очень и очень давно) обнаружился дефект, который мог приводить к блокировке работы timer thread, а это очень плохо. Для устранения этого дефекта пришлось ввести понятие delivery_mode и изменить формат метода do_deliver_message в so_5::abstract_message_box_t;

  • диспетчеры теперь должны реализовывать обновленный интерфейс so_5::event_queue_t.

Вот эти изменения просто так обойти не получится, здесь потребуется переделка и (возможно) перепроектирование ваших собственных mbox-ов и диспетчеров. Насколько дорого это обойдется сложно сказать, нам при адаптации so5extra под SO-5.8.0 обошлось недорого, а обновленные реализации оказались проще изначальных.

Что дальше?

Боюсь, здесь остается сказать только банальность: поживем — увидим. Пока что находилось и время, и желание, и ресурсы чтобы развивать SObjectizer. Что не может не радовать. Надеюсь, так будет и дальше.

И раз уж начались банальности, то скажу еще одну: если вам чего-то не хватает в SObjectizer, то дайте нам знать. Здесь в комментариях, или в виде issue на GitHub-е, или в Google-группе. Логика простая: если мы не знаем, что вам нужно, то оно точно в SObjectizer не появится :)

Комментарии (0)