Раньше мы рассказывали про SObjectizer как про акторный фреймворк для C++, хотя в действительности это не совсем так. Например, уже давно в SObjectizer есть такая классная штука, как mchain-ы (они же каналы из модели CSP). Mchain-ы позволяют легко и непринужденно организовать обмен данными между рабочими потоками. Не создавая агентов, которые нужны далеко не всегда. Как раз на днях довелось в очередной раз этой фичей воспользоваться и упростить себе жизнь за счет передачи данных между потоками посредством каналов (т.е. SObjectizer-овских mchain-ов). Так что не только в Go можно получать удовольствие от использования CSP. В C++ это так же возможно. Кому интересно, что и как, прошу под кат.
Задача была следующей: есть некая сторонняя система, к которой нужно делать синхронные запросы. Нужно было посмотреть, как ведет себя эта система, если запросы к ней идут не в один поток, а в несколько. Для этого следовало сделать из имевшегося однопоточного многопоточного клиента, рабочие нити которого выдавали бы свой поток запросов к сторонней системе.
Полный список запросов, которые подлежали выполнению, находился в отдельном файле. Так что нужно было последовательно читать этот файл, доставать очередной запрос и отдавать его одной из свободных рабочих нитей. Каждая нить подсчитывала количество выполненных запросов. Нужно было определить, сколько времени потребуется на вычитку и обработку всех запросов, а так же подсчитать, сколько всего запросов было выполнено.
Очевидным образом напрашивалось простое решение. Есть главная рабочая нить, которая читает файл с запросами. Каждый запрос ставиться в общую очередь запросов. Откуда запросы разбираются рабочими нитями. Т.е. рабочая нить берет из очереди первый запрос, выполняет его, затем берет новый первый запрос из очереди и т.д. Если очередь пуста, то рабочая нить должна приостанавливаться до тех пор, пока что-то в очереди появится. Если очень заполнена, то должна приостанавливаться главная нить пока в очереди не появится свободное место.
Mchain-ы из SObjectizer-а как раз позволяют обойтись без написания своих thread-safe очередей.
Для решения этой задачи потребовалось два mchain-а. Первый mchain используется для передачи прочитанных запросов рабочим нитям. Главная нить пишет в него запросы,
рабочие нити читают запросы оттуда. Когда файл с запросами вычитан полностью, главная нить просто закрывает этот mchain. Соответственно, как только рабочие потоки увидят, что в mchain-е ничего нет и он закрыт, они завершат свою работу.
Второй mchain потребовался для того, чтобы рабочие нити смогли передать в главную нить информацию о том, что они заверили свою работу и сколько запросов они при этом обработали. В этот mchain рабочие нити пишут всего по одному сообщению. А главная нить только читает из этого mchain-а.
Ну и теперь можно посмотреть, как все это выглядит в коде. Код без комментариев, т.к. это была одноразовая программулина «на выброс». Посему необходимые пояснения будут приводится после соответствующего куска кода.
Начнем с функции run_app, которая вызывается из main() сразу после того, как программа разберет параметры командной строки:
void
run_app( const app_args_t & args )
{
so_5::wrapped_env_t sobj(
[]( so_5::environment_t & ) {},
[]( so_5::environment_params_t & params ) {
params.infrastructure_factory(
so_5::env_infrastructures::simple_mtsafe::factory() );
} );
auto tasks_ch = create_mchain( sobj,
std::chrono::seconds(5),
50,
so_5::mchain_props::memory_usage_t::preallocated,
so_5::mchain_props::overflow_reaction_t::abort_app );
auto finish_ack_ch = create_mchain( sobj );
std::vector< std::thread > workers;
const auto cleanup = cpp_util_3::at_scope_exit( [&] {
so_5::close_drop_content( finish_ack_ch );
so_5::close_drop_content( tasks_ch );
for( auto & t : workers )
t.join();
} );
cpp_util_3::n_times( args.m_threads_count, [&] {
workers.emplace_back( [&] {
worker_thread( args, tasks_ch, finish_ack_ch );
} );
} );
do_main_work( args, tasks_ch, finish_ack_ch );
}
Здесь сперва создается экземпляр SObjectizer Environment, которому будут принадлежать mchain-ы. Не имея SOEnvironment нельзя создать mchain, так что SOEnvironment приходится создавать.
Но нам не нужен полноценный SOEnvironment, который предназначен для создания тучи агентов в приложении, для эффективного управления которыми SOEnvironment вынужден создавать несколько собственных вспомогательных нитей. Поэтому в параметрах SOEnvironment мы задаем использовать специальный, однопоточный вариант SObjectizer-а. В этом случае wrapped_env_t создаст одну вспомогательную нить, на которой произойдет вызов so_5::launch() и все. Больше SObjectizer ничего делать не будет. Да и эта вспомогательная нить будет спать в so_5::launch() до тех пор, пока не произойдет возврат из run_app.
Далее нам нужен mchain для раздачи запросов рабочим нитям. Это tasks_ch. Но это не простой mchain. Во-первых, это mchain ограниченной емкости. Попытка добавить в заполненный mchain еще одно сообщение будет блокировать текущую нить. Но блокировать не навсегда, а всего на 5 секунд. Если же даже спустя 5 секунд в mchain-е свободного места не окажется, то все приложение будет прервано вызовом std::abort(). В данном случае это оправдано, т.к. в нормальных условиях ни одна из рабочих нитей не должна засыпать больше чем на несколько миллисекунд, не говоря уже про 5 секунд. Так что если за 5 секунд в tasks_ch не появилось свободного места, то что-то точно идет не так, поэтому нужно звать std::abort(). Кроме того, поскольку tasks_ch у нас имеет заранее известный размер, то мы предписываем сразу выделить необходимую память под всю очередь сообщений в mchain-е.
Со вторым mchain-ом, в который рабочие нити будут отсылать сообщения finish_ack, все гораздо проще. Поэтому finish_ack_ch создается простым вызовом create_mchain, с параметрами по умолчанию (безразмерный mchain без блокировки на операциях send).
Далее нам нужно запустить N рабочих нитей и сохранить их в векторе workers. Но здесь не так все просто. Мы можем получить исключение при создании очередной рабочей нити. При этом нам было бы полезно нормально завершить те нити, которые уже были созданы.
Чтобы упростить себе жизнь с откатом ранее выполненных операций, используется аналог D-шного scope_exit (ну или аналог BOOST_SCOPE_EXIT или Go-шного defer, тут уж кому что ближе). Переменная cleanup, по сути, представляет из себя объект с лямбдой внутри. Эта лямбда вызывается при вызове деструктора переменной cleanup. Создается cleanup средствами небольшой вспомогательной библиотеки cpp_util. Еще одно пояснение по поводу cleanup-а: первое, что нам нужно сделать при очистке — это закрыть mchain-ы. Если какая-то из рабочих нитей уже успела стартовать и уснуть на вызове receive из tasks_ch, то закрытие tasks_ch в cleanup сразу же разбудит эту нить и позволит ей завершить свою работу.
Ну а далее мы создаем рабочие нити и вызываем do_main_work. Внутри do_main_work выполняется основная работа главной нити приложения: чтение файла с запросами, передача запросов рабочим нитям, сбор результатов. Вот как выглядит упрощенный вариант do_main_work, из которого убрали второстепенные детали:
void
do_main_work(
const app_args_t & args,
so_5::mchain_t tasks_ch,
so_5::mchain_t finish_ack_ch )
{
data_file_handler_t file{
args.m_data_file,
args.m_force_keep_alive
};
const auto started_at = hires_clock::now();
while( !file.is_eof() )
{
auto request = file.get_next_request();
if( !request )
break;
so_5::send< std::string >( tasks_ch, *request );
}
so_5::close_retain_content( tasks_ch );
unsigned long long total_requests{};
so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ),
[&]( const finish_ack_t & what ) {
total_requests += what.m_requests;
} );
const auto total_time = hires_clock::now() - started_at;
if( total_requests )
{
... // Print the results...
}
}
Все самое интересное здесь собрано в двух местах.
Во-первых, внутри while. Там последовательно читаются запросы из файла и передаются рабочим нитям посредством вызова send. Если send вызывается когда tasks_ch полностью заполнен, то главная нить будет приостановлена (но не более, чем на 5 секунд).
Во-вторых, когда весь файл с запросами прочитан, нам нужно дождаться ответов от всех рабочих нитей. Для этого мы сперва закрываем tasks_ch, чтобы рабочие нити поняли, что пора завершать свою работу. Но закрывать нужно так, чтобы не потерялись те запросы, которые уже стоят в очереди, но еще не были обработаны. Поэтому вызывается close_retain_content (а вот для cleanup-действия в run_app была задействована close_drop_content, т.к. там нам не нужно ничего сохранять в закрываемом канале).
После того, как tasks_ch закрыт, нужно дождаться ответа от N рабочих нитей. Вот это ожидание ровно N ответов записывается одной магической строкой:
so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ),
Она говорит буквально следующее: читать из канала finish_ack_ch до тех пор пока не будет прочитано и обработано ровно threads_count сообщений.
Ну и для завершения картины нужно показать как выглядит код рабочей нити. Он совсем простой:
void
worker_thread(
const app_args_t & args,
so_5::mchain_t tasks_ch,
so_5::mchain_t finish_ack_ch )
{
io_performer_t io_performer{ args.m_srv, args.m_port };
unsigned long long total_requests{};
so_5::receive( from(tasks_ch),
[&]( const std::string & request ) {
io_performer.request_response( request );
++total_requests;
} );
so_5::send< finish_ack_t >( finish_ack_ch, total_requests );
}
Нить просто висит внутри receive из канала tasks_ch. Возврат из receive произойдет при закрытии tasks_ch. Если же tasks_ch пуст, то receive будет спать до тех пор,
пока в канал что-то поступит (либо пока канал не закроют). А когда возврат из receive происходит, то рабочая нить просто отсылает finish_ack-сообщение в finish_ack_ch и завершается.
Вот, собственно, и все.
Надо сказать, что с самой многопоточностью и обменом информацией между потоками проблем не было. Буквально с первого раза завелось и заработало. Проблемы возникли внутри реализации io_performer_t::request_response, когда из-за ошибок в реализации взаимодействия между клиентом и сервером подвисала текущая нить. Вот тогда и помогло ограничение в 5 секунд на время ожидания записи в полный tasks_ch: когда нити начинали зависать, срабатывал тайм-аут и многопоточный клиент аварийно завершался. Сразу становилось понятно, что есть баг, причем баг в request_response, т.к. только зависание там и могло остановить нормальное чтение из tasks_ch.
В завершение хочется сказать, что и Модель Акторов, и Модель Взаимодействующих Последовательных Процессов (aka CSP) — отличные штуки. Где-то хорошо работает одна, где-то вторая. SObjectizer позволяет использовать и то, и другое. А то и все сразу, иногда и такое бывает необходимо.
whalebothelmsman
А нельзя было нарезать файл на 4 части и запустить 4 независимых процесса?
eao197 Автор
Неудобно. Так ключик в командной строке поменял с -t 2 на -t 8 и запросы полетели не в 2 потока, а в 8-мь, при этом ничего не нужно делать с исходным файлом запросов, в котором этих запросов может быть под миллион, а то и больше.