actor system


rotorненавязчивый С++ акторный микрофремворк с возможностью создания иерархий супервайзеров, похожий на своих старших братьев — caf и sobjectizer. В последних релизах с момента последнего анонса накопились существенные улучшения, которые хотелось бы осветить.


Общий интерфейс для таймеров (v0.10)


Таймеры сами по себе вездесущи во всех акторных фрейморках, т. к. они делают программы более надёжными. До v0.10 не было API для того, чтобы взвести таймер; это можно было сделать только посредством доступа к низлежащему движку событий (event loop) и использованию соответствующего API, разного для разных движков. Это было не очень удобно и ломало абстракции: кроме доступа к API движка, нужно было в обработчике таймера использовать низкоуровневый API rotor'а, чтобы работала доставка сообщений. Кроме того, отмена таймера также имеет свои особенности в каждом цикле событий, что захламляло ненужными деталями логику работы актора.


Начиная с v0.10 в rotor'е, можно делать что-то вроде:


namespace r = rotor;

struct some_actor_t: r::actor_base_t {
    void on_start() noexcept {
        timer_request = start_timer(timeout, *this, &some_actor_t::on_timer);
    }

    void on_timer(r::request_id_t, bool cancelled) noexcept {
        ...;
    }

    void some_method() noexcept {
        ...
        cancel_timer(timer_id);
    }

    r::request_id_t timer_id;
};

Надо отметить, что к моменту завершения работы актора (shutdown_finish), все взведённые таймеры должны сработать или быть отменены, иначе это ведёт к неопределённому поведению (undefined behavior).


Поддержка отмены запросов (v0.10)


По-моему мнению, у всех сообщений в caf семантика "запрос-ответ", в то время как в sobjectizer'е все сообщения имеют обычную "отправил-и-забыл" ("fire-and-forget") семантику. rotor поддерживает оба типа, причём по-умолчанию сообщения являются "отправил-и-забыл", а "запрос-ответ" можно сделать поверх обычных сообщений.


В обоих фреймворках, caf и sobjectizer, у каждого актора есть управляемая очередь сообщений, что обозначает, что фреймворк не доставляет новое сообщение, пока предыдущее не было обработано. В противоположность этим фреймворкам, в rotor'е нет управляемой очереди сообщений, что обозначает, что актор сам должен создать свою собственную очередь и заботиться о перегрузках при необходимости. Для мгновенно обрабатываемых сообщений типа "пинг-понг" это не имеет особого значений, однако для "тяжёлых" запросов, которые в процессе своей обработки делают ввод-вывод (I/O), разница может быть существенна. Например, если актор опрашивает удалённую сторону через HTTP-запросы, нежелательно начинать новый запрос, пока предыдущий ещё не закончился. Ещё раз: сообщение не доставлено, пока предыдущее не обработано, и не важно, что за типа сообщения.


Это также обозначает, что для управляемых очередей сообщений отмена запросов в общем случае невозможна, т.к. сообщение-отмена находится в очереди, и у него нет шансов быть обработанным до тех пор, пока предыдущее сообщение-запрос не будет обработано.


В rotor'е, при необходимости, надо создавать собственную очередь запросов, и, если сообщение-отмена поступает, то актор должен либо поискать в очереди и отменить его, либо, если сообщение уже обрабатывается, отменить соответствующие операции ввода-вывода, и, в конечном итоге, ответить на исходный запрос со статусом "отменено". Нужно заметить, что только сообщения-запросы могут быт отменены, т. к. у них есть внутренний идентификатор.


namespace r = rotor;

namespace payload {
struct pong_t {};
struct ping_t {
    using response_t = pong_t;
};
} // namespace payload

namespace message {
using ping_request_t = r::request_traits_t<payload::ping_t>::request::message_t;
using ping_response_t = r::request_traits_t<payload::ping_t>::response::message_t;
using ping_cancel_t = r::request_traits_t<payload::ping_t>::cancel::message_t;
} // namespace message

struct some_actor_t: r::actor_base_t {
    using ping_ptr_t = r::intrusive_ptr_t<message::ping_request_t>;

    void on_ping(ping_request_t& req) noexcept {
        // just store request for further processing
        ping_req.reset(&req);
    }

    void on_cancel(ping_cancel_t&) noexcept {
        if (req) {
            // make_error is v0.14 feature
            make_response(*req, make_error(r::make_error_code(r::error_code_t::cancelled)));
            req.reset();
        }
    }

    // простейшая "очередь" из одного сообщения.
    ping_ptr_t ping_req;
};

Вообще говоря, отмена запросов может быть сделана и в sobjectizer'е, однако, там, во-первых, нужно сделать собственный механизм "запрос-ответ", и, во-вторых, свою собственную очередь поверх той, что уже предоставляется sobjectizer'ом, т. е. ценой дополнительных накладных расходов. Впрочем, по-моему мнению, парадигма "запрос-ответ" несколько инородна для sobjectizer'а, поскольку он скорее ориентирован на блокирующие операции в обработчиках сообщений, которые, конечно, не могут быть отменены, вне зависимости от используемого фреймворка.


std::thread backend/supervisor (v0.12)


Это давно ожидаемая возможность, которая делает rotor похожим на sobjectizer: в случае, когда актору нужно осуществить блокирующие операции и ему не нужен никакой цикл событий. Например, обработчик сообщений должен выполнить интенсивные вычисления на центральном процессоре.


Очевидно, что во время блокирующих операций нет возможности сработать таймерам или другим сообщениям быть доставленными. Другими словами, во время блокирующих операций актор теряет свою реактивность, так как он не может реагировать на входящие сообщения. Чтобы преодолеть это, блокирующие операции должны быть разбиты на более мелкие итеративные куски; а когда актор закончит обрабатывать текущий кусок работы, он посылает себе сообщение с номером следующего куска для обработки, и так далее, пока все части не будут обработаны. Это даст текущему потоку выполнения немного воздуха, чтобы доставить другие сообщения, выполнить код, связанный с истекшими таймерами и т.п. Например, вмето того, чтобы вычислять свёртку sha512 для всего файла размером 1TB, задание может быть разделено на вычисление свёрток помегабайтово, что сделает поток вычисления достаточно реактивным. Данный подход универсален и применим для любого актороного фреймворка.


Само собой разумеется, что целая иерархия акторов может быть запущена на std::thread бэкенде, не только один актор. Следующий нюанс, на который следует обратить внимание, это то, что rotor'у надо подсказать, какие обработчики сообщений "тяжёлые" (блокирующие), чтобы обновить таймеры после этих обработчиков. Это делается во время подписки, т.е.:


struct sha_actor_t : public r::actor_base_t {
    ...
    void configure(r::plugin::plugin_base_t &plugin) noexcept override {
        r::actor_base_t::configure(plugin);
        plugin.with_casted<r::plugin::starter_plugin_t>([&](auto &p) {
            p.subscribe_actor(&sha_actor_t::on_process)->tag_io(); // важно
        });
    }

Полный исходный код реактивного актора, который вычисляет свёртку sha512, который реагирует на CTRL+c, доступен по ссылке.


Идентификация Акторов (v0.14)


Канонический способ идентификации актора — по основной адресу (в rotor'е у актора может быть несколько адресов, аналогично как в sobjectizer'е агент может быть подписан на несколько mbox'ов). Однако, иногда желательно вывести в лог имя актора, который завершил работу, в соответствующем колбеке в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {
    void on_child_shutdown(actor_base_t *actor) noexcept override {
        std::cout << "actor " << (void*) actor->get_address().get() << " died \n";
    }
}

Адрес актора динамичен и меняется при каждом запуске программы, так что эта информация почти бесполезна. Чтобы она имела смысл, актор должен напечатать свой основной адрес, где, например, в перекрытом методе on_start(). Однако, это решение не очень удобно, поэтому было решено ввести свойство std::string identity непосредственно в базовый класс actor_base_t. Таким образом, идентичность актора может быть выставлена в конструкторе или во время конфигурации плагина address_maker_plugin_t:


struct some_actor_t : public t::actor_baset_t {
    void configure(r::plugin::plugin_base_t &plugin) noexcept override {
        plugin.with_casted<r::plugin::address_maker_plugin_t>([&](auto &p) {
            p.set_identity("my-actor-name", false);
        });
        ...
    }
};

Теперь можно выводить идентичность актора в его супервайзере:


struct my_supervisor_t : public r::supervisor_t {
    void on_child_shutdown(actor_base_t *actor) noexcept override {
        std::cout << actor->get_identity() << " died \n";
    }
}

Иногда акторы уникальны в одной программе, а иногда сосуществуют несколько экземпляров одного и того же класса акторов. Чтобы различать между их, адрес каждого может быть добавлен к имени актора. В этом и смысл второго параметра bool в методе set_identity(name, append_addr) выше.


По умолчанию идентичность актора равна чему-то вроде actor 0x7fd918016d70 или supervisor 0x7fd218016d70. Эта возможность появилась в rotor'е начиная с версии v0.14.


Extended Error вместо std::error_code, shutdown reason (v0.14)


Когда случается что-то непредвиденное в процессе обработки запроса, до версии v0.14, ответ содержал код ошибки в виде std::error_code. Такой подход хорошо служил своей цели, но, ввиду, иерархичной природы rotor'а, этого оказалось не достаточно. Представим себе случай: супервыйзер запускает двух акторов, и у одного из них произошёл сбой в инициализации. Супервайзер экскалирует проблему, т. е. выключается сам и шлёт запрос на выключение второму актору. Однако, на момент, когда второй актор выключился, исходный контекст уже был потерян, и совершенно неясно, почему он выключился. А причина кроется в том, что std::error_code не содержит в себе достаточно информации.


Поэтому было решено ввести собственный класс расширенной ошибки rotor::extended_error_t, который содержит std::error_code, std::string в качестве контекста (обычно это идентичность актора), а также умный указатель на следующую расширенную ошибку, что вызвала текущую. Теперь схлопывание иерархии акторов может быть представлено цепочкой расширенных ошибок, где причина выключения каждого актора может быть отслежена:


struct my_supervisor_t : public r::supervisor_t {
    void on_child_shutdown(actor_base_t *actor) noexcept override {
        std::cout << actor->get_identity() << " died, reason :: " << actor->get_shutdown_reason()->message();
    }
}

выведет что-то вроде:


actor-2 due to supervisor shutdown has been requested by supervisor <- actor-1 due initialization failed

Вместе с идентификацией акторов данная возможность предоставляет больше диагностических инструментов для разработчиков, которые используют rotor.