О проекте SObjectizer-5 мы рассказываем на Хабре уже давно и более-менее регулярно. А вот о сопутствующем ему проекте so5extra речь заходит гораздо реже и вскользь. Между тем so5extra развивается уже пять лет (как же быстро летит время) и на днях мы зафиксировали очередную версию. Что представляется хорошим поводом представить вашему вниманию обзор библиотеки so5extra, с акцентом на разнообразие реализованных в ней типов почтовых ящиков (mbox-ов в нашей терминологии).
Откуда есть пошла so5extra
При разработке же SObjectizer-5 мы постарались оформить SObjectizer-5 в виде минималистического ядра. Дополнительная функциональность для которого оформлялась бы отдельными библиотеками. Поэтому изначально предполагалось, что для SObjectizer-5 будут написаны какие-то сопутствующие библиотеки.
Такие библиотеки и были написаны и даже распространялись изначально вместе с SObjectizer-ом. Но где-то в 2014-ом году их развитие прекратилось вместе с отказом от использования монстроузной ACE в качестве базы для SObjectizer-5.
Чуть позже, в 2017-ом году, встал и вопрос о том, как монетизировать вложенные в SObjectizer-5 усилия.
Ответ на этот вопрос мы видели в создании дополнительной библиотеки с различными прибамбасами для SObjectizer-5, которая бы распространялась уже под двойной лицензией: бесплатно (под GNU Affero GPL) для открытых проектов или за деньги для закрытых проектов.
Так и возникла библиотека so5extra (или so_5_extra в старом написании).
Идея была следующей: мы включаем в so5extra функционал, который по тем или иным причинам не очень выгодно было бы затаскивать в сам SObjectizer. Кому этот функционал нужен и кто не хочет реализовывать его самостоятельно, тот может либо купить у нас лицензию, либо же использовать в своем открытом проекте бесплатно (при условии, что GNU Affero GPL его устраивает).
При этом в so5extra добавлялись вещи, которые могли бы быть полезными в тех или иных ситуациях, но отсутствие которых в самом ядре SObjectizer-а было не критичным. В случае чего каждый мог сам сделать для себя все то, что входит в so5extra. Но зачем же делать, если все это уже можно взять в so5extra?
К чему мы с so5extra в итоге пришли?
Пришли мы к тому, что спроса на коммерческие лицензии на so5extra не было и наша идея о монетизации SObjectizer-5 через so5extra не взлетела.
Ну а раз так, то оказалось проще полностью перевести so5extra под 3-х пунктную BSD-лицензию и не накладывать никаких ограничений на ее использование.
Поэтому начиная с версии 1.4 (2019-й год) библиотека so5extra, так же как и SObjectizer-5, бесплатна как для закрытых, так и для открытых проектов.
Что сейчас входит в so5extra
В отличии от SObjectizer-5, в котором многие фичи тесно завязаны друг на друга, so5extra представляет из себя набор практически независимых подмодулей, которые можно разделить на следующие категории (каждая категория живет в своем пространстве имен внутри объемлющего пространства so_5::extra).
Асинхронные операции (so_5::extra::async_op)
Инструменты для реализации сценария, когда один агент начинает какую-то операцию (скажем, агент A отсылает агенту X сообщение get_status), а затем должен получить и обработать результат этой операции (например, от агента X может прилететь ok или error). Причем фокус в том, что агент A может одновременно начать несколько таких операций с разными агентами, посему нужно как-то различать результаты независимых операций.
Посредством so_5::extra::async_op это может выглядеть, например, вот так:
Пример использования async_op
// Сообщение для запроса статуса.
struct get_status {
so_5::mbox_t reply_to_; // Куда нужно отсылать ответ.
};
// Сообщения с ответами.
struct status_ok { ... /* какие-то данные */ };
struct status_error { ... /* какие-то данные */ };
// Агент, который запрашивает статусы.
class status_requester final : public so_5::agent_t {
// Обработчики результатов.
// Обращаем внимание, что вместе с самим результатом передается
// и идентификатор запроса, которого нет ни в исходном get_status,
// ни в ответных status_ok/error.
//
// Обработчик для результата ok.
void on_ok(int req_id, const status_ok & msg) {...}
// Обработчик для результата error.
void on_error(int req_id, const status_error & msg) {...}
// Метод, где запрос инициируется.
void on_some_event(mhood_t<some_event> cmd) {
int req_id = calculate_req_id(*cmd); // Получили ID для операции.
// Теперь еще нужен и уникальный mbox для ответов.
auto reply_to = so_make_new_direct_mbox();
// Начинаем асинхронную операцию.
so_5::extra::async_op::time_unlimited::make(*this)
//
// Описываем события, которые будут считаться завершением операции.
//
// Этот обработчик указывает, что операция будет завершена, когда
// в состоянии по умолчанию агент получит status_ok из reply_to.
.completed_on(reply_to, so_default_state(),
// В лямбде захватывается req_id, тем самым лямбда оказывается
// уникальной именно для этого запроса.
[this, req_id](mhood_t<status_ok> cmd) {
on_ok(req_id, *cmd);
})
// Этот обработчик указывает, что операция будет завершена, когда
// в состоянии по умолчанию агент получит status_error из reply_to.
.completed_on(reply_to, so_default_state(),
// В лямбде захватывается req_id, тем самым лямбда оказывается
// уникальной именно для этого запроса.
[this, req_id](mhood_t<status_error> cmd) {
on_error(req_id, *cmd);
})
// Сразу и начинаем операцию.
.activate([&]{ so_5::send<get_status>(cmd->target(), reply_to); });
}
...
};
Фокус здесь в том, что обработчики событий, которые задаются через completed_on
автоматически в нужный момент подписываются на соответствующие сообщения из соответствующих mbox-ов, а затем также автоматически описываются. Тем самым для каждой асинхронной операции создается собственный набор обработчиков событий. Этот набор автоматически удаляется по завершении соответствующей операции.
Диспетчеры (so_5::extra::disp)
В состав SObjectizer-5 уже входит набор штатных диспетчеров и so5extra добавляет в этот набор еще два диспетчера на базе Asio:
asio_one_thread (только одна рабочая нить);
asio_thread_pool (пул рабочих нитей).
То, что эти диспетчеры построены на базе Asio означает, что диспетчер на своих рабочих нитях крутит обычный Asio-шный цикл обработки событий. И диспетчеризация заявок агентов обеспечивается через Asio-шный post.
Это дает возможность на одном и том же рабочем контексте инициировать асинхронные IO-операции, обрабатывать и результаты IO-операции, и входящие сообщения для агентов не беспокоясь о синхронизации. Что особенно удобно когда SObjectizer-овские агенты должны работать, например, с сокетами.
Пример "боевого" использования asio_one_thread можно найти в arataga (об этом проекте мы рассказывали ранее: раз и два).
Дополнительные версии SObjectizer Environment Infrastructure (so_5::extra::env_infrastructures)
SObjectizer Environment -- это контейнер, в котором живут зарегистрированные пользователем кооперации с агентами, диспетчеры и пр.
SObjectizer Environment-у для работы требуется некоторая внутренняя инфраструктура, которая определяет как именно поддерживается работа таймера, как и когда выполняется уничтожение полностью дерегистрированных коопераций, что выступает в качестве диспетчера по умолчанию. За все это отвечает такая штука, как SObjectizer Environment Infrastructure (подробнее о том, что происходит у SObjectizer-а "под капотом" можно прочитать здесь).
В состав SObjectizer-а уже входит несколько штатных реализаций Environment Infrastructure, а в so5extra есть еще две реализации на базе Asio:
asio::simple_not_mtsafe
, вся работа ведется на единственной рабочей нити, при этом нет никакой защиты от доступа к Environment Infrastructure из параллельно работающих нитей;asio::simple_mtsafe
, вся работа ведется на единственной рабочей нити, но сама Environment Infrastructure защищена от доступа из разных рабочих потоков.
Эти реализации Environment Infrastructure предназначены для написания приложений для работы с сетью посредством Asio. Поскольку Asio и так предоставляет нам поддержку таймеров и возможность диспетчеризации произвольных действий (посредством asio::post
), то имеет смысл их переиспользовать и для нужд SObjectizer-а. Особенно, если основной упор в приложении делается на работу с сетью, а агенты используется лишь для упрощения реализации бизнес-логики.
При этом asio::simple_not_mtsafe
предназначена для написания совсем легковесных приложений, где вообще вся работа может (а зачастую и должна) выполняться на единственной рабочей нити (как правило это главная нить приложения). Тогда как asio::simple_mtsafe
добавлен для использования в более сложных случаях: когда есть отдельная нить для IO-операции (именно там и будет работать asio::simple_mtsafe
), но рядом могут быть и другие рабочие нити, для выполнения каких-то "тяжелых" операций (например, вычислительных).
Средства для работы с конвертами для сообщений (so_5::extra::enveloped_msg)
Некоторое время назад в SObjectizer была добавлена возможность помещать отосланное сообщение в некий "конверт". При этом у конверта есть право решать отдавать ли сообщение получателю или нет. Плюс к этому конверт может выполнить какие-то дополнительные действия после того, как процедура доставки сообщения завершилась.
Подробнее о конвертах для сообщений на Хабре уже была отдельная статья.
Итак, SObjectizer поддерживает оборачивание сообщений в конверты, но не содержит никаких готовых конвертов. Предполагается, что пользователь под свои специфические нужды сделает специализированный конверт.
Но т.к. создание своего собственного конверта может быть не самой простой задачей даже для тех, кто хорошо знает SObjectizer, то в so5extra было добавлено несколько вспомогательных инструментов, которые облегчают написание конвертов для сообщений.
Например, это простой класс just_envelope_t
, который можно использовать в качестве базы для своих собственных конвертов.
Пример наследования от just_envelope_t
class my_simple_envelope final : public so_5::extra::enveloped_msg::just_envelope_t {
using base_type = so_5::extra::enveloped_msg::just_envelope_t;
public:
using base_type::base_type;
void access_hook(access_context_t context, handler_invoker_t & invoker) noexcept override {
... // Нужная нам логика предоставления доступа к сообщению.
}
};
Еще полезными могут быть вспомогательные средства, которые упрощают создание экземпляра сообщения, оборачивание этого экземпляра в нужный конверт и отсылку получившегося конверта.
Примеры отсылки сообщений в конвертах
so_5::mbox_t mb1 = ...;
// Создать сообщение my_message, упаковать его в my_envelope
// и затем отослать его в почтовый ящик mb1.
so_5::extra::enveloped_msg::make<my_message>(...)
.envelope<my_envelope>(...)
.send_to(mb1);
// Создать сообщение my_message, упаковать его в my_envelope
// и затем отослать его в почтовый ящик mb1 в виде
// отложенного сообщения с задержкой в 10s.
so_5::extra::enveloped_msg::make<my_message>(...)
.envelope<my_envelope>(...)
.send_delayed_to(mb1, 10s);
// Создать сообщение my_message, упаковать его в my_envelope
// и затем отослать его в почтовый ящик mb1 в виде
// периодического сообщения с задержкой в 10s и периодом
// повтора в 30s.
auto timer_id = so_5::extra::enveloped_msg::make<my_message>(...)
.envelope<my_envelope>(...)
.send_periodic_to(mb1, 10s, 30s);
// Создать сообщение my_message, упаковать его сперва
// в конверт типа inner_envelope_type, затем в конверт
// типа outer_envelope_type, а уже затем отослать его
// в почтовый ящик mb2.
so_5::extra::enveloped_msg::make<my_message>(...)
.envelope<inner_envelope_type>(...)
.envelope<outer_envelope_type>(...)
.send_to(mbox);
Там же можно найти и готовую реализацию конверта, который разрешает доставку сообщения только внутри заданного интервала времени.
Пример time_limited_delivery
// Создать и отослать my_message. Если сообщение не будет
// доставлено за 20 секунд, то доставка будет отменена.
so_5::extra::enveloped_msg::make<my_message>(...)
.envelope<so_5::extra::enveloped_msg::time_limited_delivery>(20s)
.send_to(mbox);
Дополнительные типы почтовых ящиков (so_5::extra::mboxes)
В so5extra реализовано восемь дополнительных типов mbox-ов, но подробнее мы поговорим о mbox-ах из so5extra ниже.
Дополнительные типы message chains (so_5::extra::mchains)
На данный момент в so5extra входит единственная реализация mchain-а, fixed_size, в которой размер mchain-а задается в compile-time.
Пример создания fixed_size mchain-а
// Ждем всего одно сообщение в ответ, поэтому создаем mchain
// фиксированного размера для единственного сообщения
auto reply_ch = so_5::extra::mchains::fixed_size::create_mchain<1>(env,
so_5::mchain_props::overflow_reaction_t::abort_app);
// Отправляем запрос.
so_5::send<request>(dest_mbox, reply_ch);
... // Делаем что-то еще...
// Теперь ждем ответа из reply_ch и обрабатываем полученный ответ.
so_5::receive(from(reply_ch).handle_n(1), [](const reply & r) {...});
Пока нам этого достаточно, но если со временем возникнет идея еще какого-то типа mchain-а, то реализация этой идеи, скорее всего, сперва появится в so5extra.
Отзывные сообщения (so_5::extra::revocable_msg)
Реализация функции send
для отсылки сообщения, но с возможностью затем "отозвать" сообщение. Под "отзывом" подразумевается запрет на обработку сообщения, если сообщение на момент "отзыва" еще не поступило в обработку (все еще находится в очереди заявок).
Пример использования revocable_msg
class session_handler final : public so_5::agent_t {
...
// ID отзывного сообщения.
// Последний отосланный экземпляр generate_pdf будет автоматически
// отозван при разрушении этого объекта.
so_5::extra::revocable_msg::delivery_id_t current_pdf_request_;
...
void on_generate_document_cmd(mhood_t<generate_doc_cmd> cmd) {
if(doc_type::pdf == cmd->doc_type()) {
// Отсылаем запрос pdf_generator-у.
// Если мы раньше отсылали аналогичный запрос и он еще не был
// обработан, то отосланный ранее запрос будет отозван автоматически.
current_pdf_request_ = so_5::extra::revocable_msg::send<generate_pdf>(...);
...
}
else ...
}
...
void on_cancel_document_generation(mhood_t<cancel_generate_doc_cmd> cmd) {
// Генерация PDF больше не актуальна, отзываем запрос если
// он еще не обработан.
current_pdf_request_.revoke();
...
}
};
Отзывные таймеры (so_5::extra::revocable_timers)
У штатных периодических сообщений (или отложенных сообщений, но которые генерируются с получением timer_id для возможности отмены) есть одна тонкая особенность: если таймер уже успел отослать сообщение получателю и сообщение оказалось в очереди заявок получателя, а затем для timer_id вызвали release (т.е. деактивировали таймер), то сообщение из очереди заявок извлечено не будет и до получателя все-таки дойдет.
Во многих случаях такая особенность роли не играет, но иногда с ней нужно считаться. В этом случае полезными окажутся отзывные таймеры из so5extra: они запретят доставку до получателя даже тех сообщений, что уже оказались в очереди заявок.
При этом отзывные таймеры из so5extra максимально сильно мимикрируют под обычные таймеры из SObjectizer-5, отличается только пространство имен.
Пример использования отзывного таймера
auto id = so_5::extra::revocable_timer::send_delayed<my_message>(mbox, 10s, ...);
...
if(some_condition())
// Отзываем ранее отосланное отложенное сообщение.
id.release();
Механизм shutdowner (so_5::extra::shutdowner)
Механизм shutdowner позволяет отложить процесс завершения работы SObjectizer Environment на некоторое время.
Дело в том, что когда вызывается so_5::environment_t::stop()
, то SObjectizer дерегистрирует все активные кооперации и когда последняя из них будет полностью уничтожена, то SObjectizer завершит свою работу.
Такое поведение позволяет агентам доработать и получить все сообщения, которые уже стояли в очередях заявок на момент вызова so_5::environment_t::stop()
. Но не всегда этого бывает достаточно.
Например, может быть агент-кэш, который при завершении работы SObjectizer-а должен сохранить куда-то свое содержимое. А это сохранение может потребовать обмена сообщениями с другими агентами. Но этот самый обмен новыми сообщениями после вызова so_5::environment_t::stop()
окажется невозможен, т.к. внутри stop()
кооперации будут помечены как дерегистрируемые и агенты из дерегистрируемых коопераций потеряют возможность получать новые сообщения (можно только дообработать то, что уже стоит в очередях).
Механизм shutdowner позволяет достаточно легко выйти из этой ситуации.
Пример применения shutdowner-а
class data_cache final : public so_5::agent_t {
public :
data_cache(context_t ctx) : so_5::agent_t(std::move(ctx)) {
// Нам нужен специальный mbox от shutdowner-а.
const auto notify_mbox =
so_5::extra::shutdowner::layer(so_environment()).notify_mbox();
// Создаем подписку на этот mbox.
// Пока подписка будет существовать, shutdowner не позволит
// SObjectizer-у завершить свою работу.
so_subscribe(notify_mbox).event( &data_cache::on_shutdown );
...
}
...
private :
void on_shutdown(mhood_t<so_5::extra::shutdowner::shutdown_initiated_t>) {
// Получили уведомление о том, что работу пора бы завершить.
// Начинаем запись, которая займет какое-то время и потребует
// общения с другими агентами.
store_data_to_disk();
}
void on_save_completed(mhood_t<data_stored>) {
// Получили уведомление о том, что запись завершена.
// Больше нас ничего не держит, можно дерегистрировать
// собственную кооперацию.
so_deregister_agent_coop_normally();
// Когда кооперация уничтожится автоматически произойдет
// отмена подписки на mbox от shutdowner-а и shutdowner
// разрешит SObjectizer-у завершить работу.
}
};
Средства для синхронного взаимодействия между агентами (so_5::extra::sync)
В SObjectizer-5.5 средства для синхронного общения между агентами были частью самого ядра SObjectizer-а. Но эти средства были не очень удобными и имели не самую удачную реализацию, которая со временем стала серьезно усложнять процесс сопровождения и развития ветки 5.5.
Поэтому при разработке ветки 5.6 синхронное взаимодействие между агентами из SObjectizer-а было вообще выброшено. А его новый вариант, более мощный и гибкий, стал частью so5extra. Так что начиная с SObjectizer-5.6.0 для общения агентов в синхронном режиме нужно использовать so5extra. А выглядит это, например, вот так:
Пример синхронного взаимодействия между агентами
// Компактный псевдоним для типов, участвующих во взаимодействии "запрос-ответ".
using my_request_reply = so_5::extra::sync::request_reply_t<my_request, my_reply>;
...
// Агент, который отвечает на запросы.
class request_handler final : public so_5::agent_t {
...
// Запрос прилетает в виде обычного сообщения и обрабатывается
// обычным обработчиком событий.
void on_request(typename my_request_reply::request_mhood_t cmd) {
... // Обработка запроса.
// Содержимое запроса доступно через метод `cmd->request()`.
// Отсылка ответа на запрос.
cmd->make_reply(...); // Все аргументы пойдут в конструктор my_reply.
}
...
void so_define_agent() override {
// Подписка на запросы обычным образом.
so_subscribe_self().event(&request_handler::on_request);
}
};
...
// Отсылаем запрос на mbox нужного нам request_handler-а.
so_5::mbox_t handler_mbox = ...;
// Отсылаем запрос и ждем ответа не более 15s.
// Если ответа по каким-то причинам не будет, то выскочит исключение.
my_reply reply = my_request_reply::ask_value(handler_mbox, 15s,
...); // Все остальные аргументы пойдут в конструктор my_request.
Подробнее о входящих в so5extra почтовых ящиках
Так уж получилось, что в so5extra больше всего различных реализаций mbox-ов. Да и самая свежая версия, 1.5.2, добавляет еще три новых реализации. Поэтому имеет смысл посвятить mbox-ам оставшуюся часть статьи.
Тот факт, что у mbox-ов в so5extra столько разных реализаций не должен удивлять. Ведь mbox изначально в SObjectizer-5 задумывался как некий "черный ящик", который может (более того, даже должен) иметь разные реализации под разные задачи и специфические условия. Вот в so5extra и собирается некоторая коллекция заточенных под разную специфику mbox-ов.
broadcast::fixed_mbox
Если нам нужно отослать сообщение сразу в несколько mbox-ов, то обычно это делается так: целевые mbox-ы собираются в каком-то контейнере, затем выполняется итерация по этому контейнеру, для каждого элемента выполняется send:
std::vector<so_5::mbox_t> targets = ...;
for(const auto & m : targets)
so_5::send<start>(m, ...);
Но не всегда у нас есть возможность использовать контейнер с целевыми mbox-ами...
Например, у нас есть готовый агент manager, который получает в конструкторе единственный целевой mbox.
Пример агента manager
class manager final : public so_5::agent_t
{
const so_5::mbox_t worker_;
...
void evt_start_processing(mhood_t<start_signal>) {
... // Какие-то действия самого manager-а.
// Управляем worker-ом.
so_5::send<start>(worker_, ...);
}
void evt_stop_processing(mhood_t<stop_signal>) {
... // Какие-то действия самого manager-а.
// Управляем worker-ом.
so_5::send<stop>(worker_, ...);
}
public:
manager(context_t ctx, so_5::mbox_t worker)
: so_5::agent_t{std::move(ctx)}
, worker_{std::move(worker)}
{...}
};
И вот в какой-то прекрасный момент мы захотели, чтобы за этим единственным mbox-ом worker-а скрывался не один рабочий агент, а сразу несколько. И все они должны единовременно получать start и stop от manager-а.
Можно, конечно, переделать manager-а и отдавать ему в конструктор контейнер с mbox-ами. А можно ничего не менять, а воспользоваться broadcast::fixed_mbox из so5extra.
Пример применения fixed_mbox
// Шаг 1: регистрируем рабочих агентов и собираем их mbox-ы.
std::vector<so_5::mbox_t> worker_mboxes;
for(int i = 0; i != worker_count; ++i)
{
auto worker = env.make_agent<worker_t>(generate_worker_name(i));
worker_mboxes.push_back(worker->so_direct_mbox());
env.register_agent_as_coop(std::move(worker));
}
// Шаг 2: создаем промежуточный mbox.
auto broadcasting_mbox = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>::make(
env, std::move(worker_mboxes));
// Шаг 3: создаем менеджера, который получает промежуточный mbox.
env.register_agent_as_coop(env.make_agent<manager_t>(std::move(broadcasting_mbox)));
Свое название fixed_mbox получил потому, что набор целевых mbox-ов он принимает в момент создания и измениться этот набор затем уже не может, целевые mbox-ы фиксируются раз и навсегда.
collecting_mbox
Данный тип mbox-а предназначен для того, чтобы собрать N сообщений одинакового типа и затем доставить их до получателя в виде одного большого сообщения-агрегата.
Этот тип mbox-а применим в следующем сценарии:
мы отсылаем сообщение-команду N подчиненным агентам (например, команду start рабочим агентам);
каждый подчиненный агент должен в ответ прислать сообщение-подтверждение;
нам нужно дождаться подтверждений от всех подчиненных агентов.
Без collecting_mbox нам бы пришлось завести счетчик ответных сообщений, получать каждое ответное сообщение отдельно и ждать пока счетчик достигнет нужного значения. А вот collecting_mbox позволяет без всего этого обойтись, он сам будет вести такие подсчеты.
Пример применения collecting_mbox
// Команда, которую мы отдаем подчиненным агентам.
struct do_something final : public so_5::message_t {...};
// Подтверждение, которое должны присылать подчиненные агенты.
struct do_something_ack final : public so_5::message_t {...};
// Тип mbox-а для получения ответных сообщений.
using ack_mbox_type =
so_5::extra::mboxes::collecting_mbox::mbox_template_t<do_something_ack>;
// Агент, который раздает команды и получает подтверждения.
class manager final : public so_5::agent_t
{
so_5::mbox_t ack_mbox_;
std::vector<so_5::mbox_t> workers_;
...
void so_evt_start() override {
// При старте создаем mbox для получения ответов.
ack_mbox_ = ack_mbox_type::make(so_direct_mbox(), worker_count);
// Подписываемся на него.
so_subscribe(ack_mbox_).event(&manager::evt_ack_received);
// Создаем рабочих агентов.
workers_ = make_workers(worker_count);
}
...
void evt_ack_received(mhood_t<ack_mbox_type::messages_collected_t> cmd) {
// Получили все подтверждения.
// Их теперь можно обработать индивидуально.
cmd->for_each([this](mhood_t<do_something_ack> msg) {...});
}
};
inflight_limit
Данный тип mbox-а ограничивает количество сообщений типа M, которые находятся в процессе доставки (т.е. были отосланы, сохранены в очереди заявок, но еще не были полностью обработаны). Может использоваться как еще один механизм защиты агентов от перегрузок. Например, inflight_limit может быть полезен когда агент A отсылает агенту B сообщения со слишком большим темпом, но мы не можем изменить агент B и задействовать для B штатные message_limits.
В отличии от механизма message_limits, и в отличии от mchain-ов с фиксированным размером, у inflight_limit есть всего лишь один способ реакции на "лишние" сообщения: отбрасывание всего, что отсылается сверх заданного лимита.
Т.е, если создали inflight_limit mbox для сообщения M с лимитом 3, и пытаемся отослать 4-е сообщение когда три предыдущих еще не обработались, то 4-е сообщение просто выбрасывается, как будто его и не было.
Это совсем новый тип mbox-а, который был добавлен в самой свежей на данный момент версии 1.5.2. Посмотрим как он будет использоваться. Возможно, его функциональность будет расширена в последующих версиях so5extra.
first_last_subscriber_notification
Этот тип mbox-а также был добавлен в so5extra только что. Под влиянием вот этой статьи с Хабра: "C++ магистрали потоков и древо алгоритма". Мне показалось, что ситуация, когда желательно знать о появлении самого первого подписчика и об исчезновении самого последнего подписчика, может возникнуть и при разработке приложений на SObjectizer. Хотя пока мы с такой надобностью вроде бы и не сталкивались. Но фича выглядит полезной, поэтому почему бы и не?
Суть данного mbox-а в том, что он отсылает уведомление msg_first_subscriber
когда для сообщения типа M появляется первый подписчик. И отсылает уведомление msg_last_subscriber
когда для сообщения типа M исчезает последний подписчик.
Пример использования информации о подписчиках
namespace notifications_ns = so_5::extra::mboxes::first_last_subscriber_notification;
// Сообщение для отсылки периодически генерируемых данных.
struct msg_data final : public so_5::message_t {...};
// Агент, который генерирует данные.
// Но генерирует их только тогда, когда в данных кто-то заинтересован.
class data_producer final : public so_5::agent_t
{
// Состояние в котором ждем появление первого подписчика.
state_t st_wait_consumers{this};
// Состояние в котором работаем в нормальном режиме.
state_t st_consumers_connected{this};
// mbox для широковещательной рассылки генерируемых данных.
const so_5::mbox_t data_mbox_;
public:
data_producer(context_t & ctx)
: so_5::agent_t{std::move(ctx)}
, data_mbox_{ // Нужно создать новый mbox для отсылки данных.
// Указываем, что в mbox будут отсылаться сообщения типа msg_data.
notifications_ns::make_mbox<msg_data>(
so_environment(),
// Куда нужно доставлять нотификации о подписчиках.
so_direct_mbox(),
// Новый mbox будет multi-consumer mbox-ом.
so_5::mbox_type_t::multi_producer_multi_consumer )
}
{}
// Геттер для mbox-а в который отсылаются генерируемые данные.
[[nodiscard]]
const so_5::mbox_t &
data_mbox() const noexcept { return data_mbox_; }
void so_define_agent() override
{
st_wait_consumers
// Здесь нам нужна подписка на нотификацию о первом подписчике.
.event([this](mhood_t<notifications_ns::msg_first_subscriber>) {
st_consumers_connected.activate(); // Переходим в нормальное состояние.
});
st_consumers_connected
.on_enter([]{ ... /* инициируем генерацию */ })
.on_exit([]{ ... /* прекращаем генерацию */ })
// Здесь нам нужна подписка на нотификацию о последнем подписчике.
.event([this](mhood_t<notifications_ns::msg_last_subscriber>) {
st_no_consumers.activate(); // Возвращаемся в пассивное состояние.
});
}
...
private:
...
void on_data_ready()
{
// Отсылаем очередную порцию сгенерированных данных.
so_5::send<msg_data>(data_mbox_, ...);
}
};
retained_msg
Этот mbox сохраняет у себя последнее отосланное сообщение. И когда кто-то создает подписку, то retained_msg mbox сразу же отсылает подписчику последнее сохраненное сообщение.
Такой тип mbox-а удобен, например, в следующем сценарии:
есть агент configurator, который следит за конфигурацией приложения и принимает обновления конфигурации из каких-то источников (например, отслеживает изменения в конфигурационных файлах, переодически сканирует какую-то БД или же ждет новую конфигурацию на HTTP-входе);
когда конфигурация меняется configurator рассылает новую информацию через широковещательный (т.е. MPMC) mbox с именем "current_config";
все рабочие агенты, которые заинтересованы в этой информации "ловят" ее через обычную подписку на mbox с именем "current_config".
А теперь представим, что новые рабочие агенты могут появляться и исчезать в динамике. И когда новый агент появляется, то ему следует тут же получить текущую конфигурацию.
Но как это сделать?
Можно, конечно же, отправлять индивидуальные запросы агенту configurator.
А можно в качестве mbox-а с именем "current_config" использовать retained_msg mbox.
Тогда configurator просто отсылает в "current_config" сообщения лишь по необходимости. А новые рабочие агенты получают самое последнее сообщение из "current_config" автоматически, стоит им только сделать подписку.
round_robin
Этот mbox отвечает за то, чтобы пересылать отсылаемые в него сообщения получателям по очереди.
Представьте себе, что у нас есть агент-продюсер, который генерирует данные. И эти данные должны обрабатываться агентом-консьюмером. Но один консьюмер не успевает обработать полученные данные до того, как агент-продюсер сгенерирует новые.
Поэтому нам нужно иметь N агентов-консьюмеров. Сперва новые данные отсылаются первому консьюмеру. Затем второму. Затем третьему и т.д. А затем возвращаемся к первому консьюмеру.
Такая себе простенькая балансировка нагрузки по принципу round-robin.
Вот round_robin mbox как раз именно это и делает.
Простой пример использования round_robin mbox-а
// Агент-консьюмер, который обрабатывает данные.
class data_consumer final : public so_5::agent_t
{
public :
data_consumer(context_t ctx, const so_5::mbox_t & data_mbox )
: so_5::agent_t{std::move(ctx)}
{
// Сразу же подписываемся на входящие данные.
so_subscribe(data_mbox).event(&data_consumer::on_data);
}
private :
void on_data(mhood_t<data> cmd)
{
... // Обработка данных.
}
};
// Агент-продюсер, который генерирует данные.
class data_producer final : public so_5::agent_t
{
const so_5::mbox_t data_mbox_;
...
public:
data_producer(context_t ctx, so_5::mbox_t & data_mbox)
: so_5::agent_t{ctx}
, data_mbox_{std::move(data_mbox)}
{...}
...
private:
void evt_data_ready(mhood_t<data_ready>) {
... // Формирование данных для отправки.
so_5::send<data>(data_mbox_, ...); // Просто отсылаем очередную порцию.
}
};
// Создаем round-robin mbox.
const auto rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>(env);
// Создаем консьюмеров.
for(int i = 0; i < 3; ++i)
evt.register_agent_as_coop(
evt.make_agent<data_consumer>(rrmbox));
// Создаем продюсера.
evt.register_agent_as_coop(
evt.make_agent<data_producer>(rrmbox));
unique_subscribers
Этот тип mbox-а предназначен для ситуаций, когда за единственным mbox-ом нужно спрятать несколько разных агентов-подписчиков, по аналогии с тем, как это происходит с обычным multi-producer/multi-consumer mbox-ом. Но при этом нам нужно отсылать в этот mbox мутабельные сообщения. Т.е. нужно, чтобы mbox выглядел как MPMC mbox, но работал бы как MPSC mbox.
Как раз unique_subscribers таким mbox-ом и является. Он разрешает подписаться на конкретный тип сообщения только одному подписчику. Например, если агент a1 подписался на сообщение M1, то пока a1 держит подписку, на M1 не может подписаться ни один другой агент. При этом агент a2 может подписаться на сообщение M2 (если для M2 еще нет подписчиков), а агент a3 -- на M3. И когда в mbox будет отправлено сообщение M1, то оно пойдет только a1, сообщение M2 пойдет только a2, а M3 -- только a3.
Использование unique_subscribers mbox удобно в случаях, когда у нас есть агент-продюсер, вынужденный рассылать новые данные в виде мутабельных сообщений (например, это сообщения с большими бинарными данными внутри или же сообщения с move-only типами), а обрабатывать эти данные выгодно разными агентами. Unique_subscribers mbox позволяет "спрятать" от агента-продюсера детали обработки отсылаемых сообщений: продюсеру становится не важно сколько именно и каких агентов выполняют обработку.
composite
Еще один новый тип mbox-а, появившийся в версии 1.5.2. Описываю его последним потому, что его роль становится лучше понятна с учетом многообразия представленных в so5extra mbox-ов.
Некоторые из описанных выше mbox-ов, вроде inflight_limit и first_last_subscriber_notification, допускают отсылку и подписку только на один тип сообщения, на тот тип, для которого mbox был создан.
Не всегда это удобно. Часто хочется иметь всего один mbox, в который нам нужно отсылать сообщения разных типов (например, M1, M2, M3 и т.д.), но при этом желательно иметь разные политики обработки для этих сообщений. Скажем, иметь ограничения по количеству доставляемых сообщений для M1, получать нотификации о подписчиках для M2, использовать round-robin балансировку для M3.
Всего этого можно достичь посредством composite mbox, который позволяет спрятать несколько разных mbox-ов за одним "фасадом".
Пример создания compoiste mbox-а
// Создаем актуальные mbox-ы.
auto m1_sink = so_5::extra::mboxes::inflight_limit::make_mbox<M1>(...);
auto m2_sink = so_5::extra::mboxes::first_last_subscriber_notification::make_mbox<M2>(...);
auto m3_sink = so_5::extra::mboxes::round_robin::make_mbox<>(...);
// Создаем общий для них агрегирующий mbox.
auto sink = so_5::extra::mboxes::composite::multi_consumer_builder(
so_5::extra::mboxes::composite::throw_if_not_found())
.add<M1>(m1_sink)
.add<M2>(m2_sink)
.add<M3>(m3_sink)
.make(env);
Созданный таким образом mbox позволит отсылать в него сообщения типов M1, M2 и M3. При этом сообщение каждого из этих типов будет доставляться в разные актуальные mbox-ы. При попытке отослать сообщение какого-то другого типа (или при попытке подписаться на сообщение какого-то другого типа) будет выброшено исключение.
Заключение
Надеюсь, что мне удалось познакомить читателей с библиотекой so5extra, которую мы развиваем в дополнение к SObjectizer-5.
Эта библиотека содержит в себе ряд полезных мелочей, значительная часть которых никогда в SObjectizer-5 не попадет, поскольку мы хотим сохранить ядро SObjectizer-5 минималистичным. Посему, если вам в SObjectizer не хватает какой-то мелочи (или не мелочи, вроде синхронного взаимодействия между агентами), то имеет смысл заглянуть в so5extra.
Было бы, конечно, интересно узнать кому и чего не хватало (не хватает сейчас) в SObjectizer. Но, что-то мне подсказывает, что спрашивать об этом бесполезно :)
Поэтому просто спасибо за то, что дочитали :)