В прошлой статье, отбросив всё, что нам было известно из мира "кровавого энтерпрайз IT", и опустошив чашу, мы "написали на коленке" простейший оповещатель (notifier). Объект, вызывающий по цепочке заранее зарегистрировавшихся клиентов (подписчиков) при наступлении какого-то события.

Что там с аргументами?

У оповещателя не было поддержки настраиваемых параметров для передачи подписчикам - пора это исправить, сделав его шаблонным. Пусть список типов параметров указывают прямо при объявлении объекта оповещателя. Никаких ухищрений касательно определения оптимального способа передачи аргументов делать не будем. Если программист для передачи одного char зарядил const char&, использовав внутри таким образом целый указатель на 8 байт (для 64-х разрядной архитектуры) вместо одного байта - что ж... Напишите ему в ревью. Опять же, в качестве типа аргумента можно указать и неконстантную ссылку, но осознавать, что теперь любой подписчик в цепочке может поменять значение аргумента и следующий увидит изменённое значение. Будем проще. И, надеюсь, нас не обвинят здесь, что простота хуже воровства.

А ещё бы чуть быстрее...

Упоротые на скорости... разрабатывающие высокопроизводительный код, программисты, могут заметить, что использование одного mutex'а на контейнер даже для итерации по нему - это излишне. Во время итерации контейнер никто не меняет - зачем здесь блокировка? Да, в цикле меняется счётчик ссылок использования объекта подписки, но, может быть, можно сделать этот счётчик атомарным? Тогда mutex можно заменить на r/w lock или, в терминах c++, shared_mutex. И использовать разделяемую блокировку на нём в цикле доставки событий notify(), а вот монопольную блокировку ставить, когда имеется необходимость реально поменять контейнер - в методах subscribe() / unsubscribe(). Когда такая оптимизация может быть заметна? В случае очень большого количества событий для доставки подписчикам и очень быстрых подписчиков, которые, скажем так, не делают внутри почти ничего, а значит, не занимают времени. То есть, если CPU большую часть времени исполняет цикл в notify(), а не код подписчиков.

Ок, во что превратится код? Как видно ниже, он не стал сильно больше:

template <class ... Args> class notifier {
    ... // as before
private:
    struct subscription {
        std::function<void(Args ...)> m_callback;
        const sub_id_t m_id;
        std::atomic<unsigned> m_refs = 0;    // note! <- atomic
        std::condition_variable_any m_waiter;

        subscription(std::function<void(Args ...)> c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}
    };

public:
    sub_id_t subscribe(std::function<void(Args ...)> callback) { ... }

    ... // as before

    void notify(Args ... args) {
        std::shared_lock l{m_list_mtx};

        for (auto &s : m_list) {
            s.m_refs.fetch_add(1, std::memory_order_relaxed);
            l.unlock();

            try {
                s.m_callback(args ...);
            } catch (...) {
            }

            l.lock();
            // shared lock defences 's' object from deletion but not from
            // modifying the 'm_refs' counter from multiple threads
            if (s.m_refs.fetch_sub(1, std::memory_order_relaxed) == 1)
                s.m_waiter.notify_all();
        }
    }

private:
    sub_id_t m_next_id = 0;
    std::shared_mutex m_list_mtx;    // note! <- shared mutex
    std::list<subscription> m_list;
};

Полный код здесь - https://github.com/Corosan/subscribers/blob/main/src/notifs-2-1.cpp

Удобство использования

В прошлой статье уже упоминался недостаток существующей реализации - невозможность вызвать unsubscribe(...) прямо из подписчика. То есть, в мире возможно всё, но этот код просто зависнет. Пора это поправить.

Хорошо бы вместо целочисленного счётчика исполняющих код подписчика потоков хранить список всех идентификаторов соответствующих потоков. Тогда во время отписки можно проверить, есть ли идентификатор текущего потока в этом списке. Если есть - unsubscribe(...) вызван из подписчика. Что делать в этом случае? Во-первых, выставить какой-нибудь флаг, что трогать этот подписчик больше не надо (кстати, это можно сделать и в предыдущей реализации, а то может получиться, что при плотном потоке оповещений отписаться вообще будет невозможно). Во-вторых, подождать, пока список идентификаторов потоков не сократится до одного элемента. То есть, в нём останется только наш поток, в котором и вызван unsubscribe(...). В-третьих, ... можно поступить, как и в предыдущем случае - удалить объект подписки. Правда, если пользователь использовал его, чтобы контролировать время жизни подписчика, мы получим сложно диагностируемую проблему - использование после удаления. То есть, мы удалим объект подписки, он удалит функтор подписчика, тот потянет удаление каких-то ещё объектов, участвующих в обработке события. Но мы же ещё не вышли из самого функтора! Нет, так делать не надо. Лучше делегировать задачу удаления объекта подписки в этом случае циклу рассылки оповещений в notify(...). А самим просто выйти.

Перепишем реализацию под только что обсуждённую логику. Да, с ускорением, описанным чуть выше, придётся попрощаться - какие atomic reference counters, если у нас теперь вместо счётчика - целый контейнер идентификаторов?!

template <class ... Args> class notifier {
public:
    typedef int sub_id_t;

private:
    struct subscription {
        std::function<void(Args ...)> m_callback;
        const sub_id_t m_id;
        // ids of threads which currently execute this subscription's callback
        std::vector<std::thread::id> m_active_cycle_threads;
        std::condition_variable m_waiter;
        bool m_unsubscribe_from_callback = false;

        subscription(std::function<void(Args ...)> c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}
    };   

public:
    ... // as earlier

    bool unsubscribe(sub_id_t id) {
        std::unique_lock l{m_list_mtx};

        auto it = find_if(m_list.begin(), m_list.end(),
            [id](auto& v){ return v.m_id == id; });

        if (it != m_list.end()) {
            auto& threads = it->m_active_cycle_threads;
            // Looking for current thread ID in a list of
            // currently executing the subscriber threads
            auto thread_it = find(threads.begin(), threads.end(),
                std::this_thread::get_id());

            if (thread_it == threads.end()) {
                // Trivial case when the unsubscribe operation is called not
                // from some subscriber's callback - as earlier
                it->m_waiter.wait(l, [&it, &threads]{ return threads.empty(); });
                m_list.erase(it);
                return true;
            } else {
                // This subscription object will be removed by a notification
                // delivery cycle eventually, which has originated a call chain
                // yielded to this unsubscribe call.
                it->m_unsubscribe_from_callback = true;
                it->m_waiter.wait(l, [&it, &threads]{ return threads.size() <= 1; });
                return true;
            }
        }
        return false;
    }

    void notify(Args ... args) {
        // using temporary list of items to be deleted - it allows to
        // defer object destruction to be executed not under lock
        std::list<subscription> garbage;
        std::unique_lock l{m_list_mtx};

        for (auto it = m_list.begin(); it != m_list.end(); ) {
            if (it->m_unsubscribe_from_callback) {
                // somebody tries to remove the subscription - don't touch it
                ++it;
                continue;
            }
            auto& threads = it->m_active_cycle_threads;

            // It's not a good to touch a heap allocator at this fast delivery
            // cycle. But an allocation inside this container is expected at
            // beginning phase only - the active threads list not going to grow
            // in future usually
            threads.push_back(std::this_thread::get_id());
            l.unlock();

            try {
                // Note that no std::forward<> optimization here because
                // arguments can't be forwarded into more than one subscriber -
                // all but the first one will get giglets
                it->m_callback(args ...);
            } catch (...) {
            }

            l.lock();
            threads.erase(
                find(threads.begin(), threads.end(), std::this_thread::get_id()));

            // If all callbacks have gone (no active threads registered inside
            // the subscription), issue a notification on the condition variable
            // for somebody who may wait on it inside an unsubscribe()
            // operation.
            // If the only thread is registered and a flag about pending
            // unsubscription is set, issue a notification for the only live
            // callback so it can return from the unsubscribe operation.
            if (threads.empty() || (threads.size() == 1 && it->m_unsubscribe_from_callback))
                it->m_waiter.notify_all();
            if (threads.empty() && it->m_unsubscribe_from_callback)
                garbage.splice(garbage.begin(), m_list, it++);
            else
                ++it;
        }
        // Note that garbage will be cleared after the m_list_mtx is unlocked
    }
    ... // as earlier
};

Некоторые читатели могут презрительно фыркнуть - фу, прямо в критическом коде работа с линейным поиском для удаления элемента - идентификатора потока. И подобный линейный поиск в unsubscribe(...) для выяснения, вызывается ли он в контексте доставки оповещений notify(...). Этим читателям предлагается в качестве домашнего упражнения оптимизировать хранение и поиск идентификаторов потоков. И написать бенчмарк для сравнения. Что-то подсказывает, что быстрее не будет. Ведь контейнер m_active_cycle_threads хранит список не всех потоков в системе, и не всех потоков, на которых "крутится" этот процесс, а только лишь потоки, которые волею случая обслуживают прямо сейчас один и тот же подписчик. Не думаю, что их наберётся больше десятка. Только если сам подписчик - не тяжеловес по использованию CPU (а в этом случае - о какой оптимизации говорим?). А линейные операции на массивах из дюжины элементов при расположении этих элементов в памяти рядом друг с другом - самые быстрые операции. Локальность данных с точки зрения кэша CPU здесь играет наиважнейшую роль.

За кадром по прежнему оставлен вопрос, что делать в случае сбоя одного или нескольких подписчиков. И выбрасывания исключения. Пока оно тихо глотается. Возможно, это лучше сделать настраиваемым. Но не уверен, что есть хорошее "решение в общем виде" кроме как залоггировать ошибку. Так что "настраиваемость" здесь может опять же привести к разбуханию кода без явной пользы.

Как это использовать?

Как и в прошлой статье - элементарно! Но давайте напишем тестик, который продемонстрирует, что отписка изнутри подписчика тоже работает:

int main() {
    // no any mean in specifying int and char arguments except to check
    // that the code is correct. At least from compilation point of view
    notifier<int, char> s;

    // MT test where the same subscription is called from two threads and one of them tries to
    // unsubscribe while other works for some time
    int id1;
    const auto main_thread_id = std::this_thread::get_id();
    id1 = s.subscribe([&id1, &s, main_thread_id](int, char){
        if (main_thread_id == std::this_thread::get_id()) {
            g_sync_logger() << "subscriber 3 started from thread 1";
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            g_sync_logger() << "subscriber 3 - try to unsubscribe";
            s.unsubscribe(id1);
            g_sync_logger() << "subscriber 3 finished on thread 1";
        } else {
            g_sync_logger() << "subscriber 3 started from thread 2";
            std::this_thread::sleep_for(std::chrono::seconds(1));
            g_sync_logger() << "subscriber 3 finished on thread 2";
        }});

    std::thread t = std::thread{[&s]{ s.notify(4, 'd'); }};
    s.notify(5, 'e');

    t.join();

    verify(! s.unsubscribe(id1));
    verify(s.count() == 0);
}

Вывод утилиты (обратите внимание, отписка инициируется в первом потоке, но ждёт, пока второй закончит работу внутри подписчика):

$ ./notifs-2-2
86393.515 [51812] subscriber 3 started from thread 1
86393.515 [51813] subscriber 3 started from thread 2
86393.715 [51812] subscriber 3 - try to unsubscribe
86394.515 [51813] subscriber 3 finished on thread 2
86394.516 [51812] subscriber 3 finished on thread 1

Полный код этого варианта: https://github.com/Corosan/subscribers/blob/main/src/notifs-2-2.cpp

Что ж, получилась весьма себе сносная реализации без тонны мета-шаблонного программирования - весь класс оповещателя вместе с комментариями - на 130 строчек. Этого должно быть достаточно для большинства типичных задач в промышленном коде. И, важно отметить, этот код сравнительно прост и понятен для большинства специалистов. Недостаток - здесь нет job security :).

В следующей статье мы перешагнём эту черту и сделаем его несколько "современнее" и "академичнее". Попутно подняв планку для тех, кто может захотеть его поменять :). На этом направлении часто бывает сложно остановиться. Надеюсь, саркастический оттенок завершающего слова здесь подскажет, что путь этот скользкий, и лучше по нему не идти! По крайней мере в реальном коде, который пишется в группе с коллегами, которым его тоже читать и поддерживать. Ну а дома для повышения ЧСВ квалификации - очень рекомендую книжки:

  1. Ivan Čukić. Functional Programming in C++. https://www.manning.com/books/functional-programming-in-c-plus-plus

  2. David Abrahams, Aleksey Gurtovoy. C++ Template Metaprogramming: Concepts, Tools, and Techniques from Boost and Beyond. ISBN 0-321-22725-5.

Комментарии (2)


  1. Sazonov
    20.09.2024 09:52
    +2

    Было бы неплохо потом сравнить ваш подход с SObjectizer, про который тут написано много и интересно.

    И с Qt, где вся эта работа сделана на неблокирующих операциях.


  1. bfDeveloper
    20.09.2024 09:52
    +1

    И я снова приду с критикой отписки в самом слушателе. Вы замыкаете id по ссылке, потому что нет никакой физической возможности замкнуть по значению. А если код асинхронный и нельзя замыкать локальные переменные? Будете замыкать смарт-поинтер на специальный объект-хранилище?

    Вы выбрали свой набор требований, весьма скромный, и реализовали строго его. И даже он с одной стороны избыточен и дорог для многих применений (мне например, не нужна потокобезопасность, за это я не люблю и std::shared_ptr), а с другой недостаточен.

    Я люблю велосипеды, сам их пишу. Но я не продаю свой велосипед как единственно правильный, максимально простой или ещё какой-то. Он на то и велосипед, что узко заточен под мои задачи.