За время долгой работы в IT непосредственно с кодом, подмечаю одну особенность, что писать приходится всё меньше (в последнее время практически не писать), а ревьювить всё больше. На последнем месте работы за полтора года я изучил уже примерно столько же кода, сколько на прошлом... лет за 8.

Всё чаще видны нагромождения тонн кода, которые по факту не нужны, не вносят никакой дополнительной пользы. Но создают раз за разом головную боль для следующего читающего этот код программиста, который вынужден что-то поправить или дописать в этом коде. По итогу, программист махает рукой на эту чудную "архитектуру"... и пишет ещё один wrapper / adapter над ним. И, таким образом, передаёт пламенный привет последующим коллегам в будущее ;).

Попробуем взять и переписать с минимумом кода одну из очень часто встречающихся задач - рассылку уведомлений объектам в коде при возникновении какого-то события. На первый взгляд кажется, что в c++ уже есть все инструменты, чтобы написать этот код в несколько строк: функтор std::function<...> - чтобы сохранить отложенный вызов, контейнер std::vector<std::function...> - чтобы сохранить цепочку отложенных вызовов. По которым нужно просто пробежаться при возникновении события и вызвать сохранённые функторы.

Однако в моей практике приходилось встречаться с разного вида монстрами, которые реализовывали подобную задачу с количеством строк, начиная от тысячи. Там, где через эти тысячи удавалось продраться, чтобы понять заложенную автором идею, оказывалось, что либо идеи никакой дополнительной нет, либо автор пытался навесить в реализацию множество дополнительных "фич", сделать код ответственным за всё, включая сохранение аргументов вызова в БД и запуска космического корабля для доставки сообщения на Марс. Одной из самых монструозных библиотек для этой задачи, на мой взгляд, является библиотека boost::signals2, о которой есть даже статья с разбором - https://rsdn.org/article/cpp/signals2.xml.

Проще некуда

С чего начнём? С названия - пусть будет "оповещатель" (в коде - "notifier").

У нашего оповещателя есть клиенты, которые предоставляют что-то, что надо вызвать в нужный момент времени, в виде функтора std::function<...> с фиксированным интерфейсом. Никаких "угадай параметр", "параметр по умолчанию, если не задали", "а тут по ссылке, а тут по значению" и т.п. Это проблема клиента. Если это вообще проблема. Пока что вообще сделаем оповещения без параметров.

При обращении клиента для подписки на событие ему хорошо бы выдать какую-то бумажку, билетик то бишь. Без бумажки ты букашка, а с бумажкой... Это про другое, конечно, но "билетик", а точнее, идентификатор подписки, нужен хотя бы для того, чтобы клиент мог отменить подписку, когда захочет.

Как будут доставляться события? Капитан Очевидность подсказывает следующий код - просто пройтись по списку и вызвать функторы.

Что ж, накатим:

class notifier {
public:
    typedef int sub_id_t;

private:
    struct subscription {
        std::function<void()> m_callback;
        const sub_id_t m_id;

        subscription(std::function<void()> c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}   
    };   

public:
    sub_id_t subscribe(std::function<void()> callback) {
        m_list.emplace_back(std::move(callback), m_next_id);
        return m_next_id++;
    }

    bool unsubscribe(sub_id_t id) {
        auto it = find_if(m_list.begin(), m_list.end(),
            [id](auto& v){ return v.m_id == id; });
        if (it != m_list.end()) {
            m_list.erase(it);
            return true;
        }
        return false;
    }

    void notify() {
        for (auto& s : m_list)
            s.m_callback();
    }

private:
    sub_id_t m_next_id = 1;
    std::list<subscription> m_list;
};

Для однопоточного кода вполне себе рабочее решение. Кроме возможности задать аргументы при оповещении, что можно очень просто решить шаблонами. Но об этом ниже.

Трубим сразу в несколько труб

На практике подобные оповещатели используются всё-таки в многопоточном коде. Это и доставка оповещений, которая может быть вызвана сразу из нескольких потоков. Это и подписка / отписка. Самое тривиальное решение для использования однопоточного кода в многопоточном мире - диктатура глобальных блокировок! Зарядим один mutex на всё:

class notifier {
    ... // as before
public:
    sub_id_t subscribe(std::function<void()> callback) {
        std::lock_guard l{m_list_mtx};
        ... // as before
    }

    bool unsubscribe(sub_id_t id) {
        std::lock_guard l{m_list_mtx};
        ... // as before
    }

    void notify() {
        std::lock_guard l{m_list_mtx};
        ... // as before
    }

private:
    std::mutex m_list_mtx;
    ... // as before
};

Несмотря на топорность этого решения, такое встречается в промышленном коде! Однако весьма несложно значительно улучшить его, если заметить, что во время выполнения кода подписчика блокировать оповещатель вовсе не обязательно. Давайте снимать блокировку перед вызовом очередного подписчика во время доставки сообщения и устанавливать снова для следующей итерации в цикле по подписчикам.

А как на счёт подписки / отписки во время, пока выполняется доставка оповещений в другом потоке (а может даже и не в одном)? Ну, если гарантировать, что модификации контейнера делаются только в одном потоке в один момент времени, то контейнер мы не испортим. Что касается итерации по нему в notify(), то в момент вызова подписчика контейнер может быть модифицирован - это нужно учитывать при продолжении итерации. Если это std::vector - все его итераторы могут стать недействительными (invalid), и тогда итерацию нужно делать по позиции. И учитывать, что удаление более раннего подписчика нарушит порядок обхода - пропустим один подписчик. В текущем решении мы используем std::list, а значит, недействительным станет только итератор элемента, который удаляют.

Посмотрим на процедуры подписки/отписки со стороны клиентского кода. Очевидно, что этот код ожидает, что после завершения вызова subscribe(...) в предоставленный функтор будут доставляться оповещения. А что там с отпиской? Её могут вызвать и во время доставки оповещений. Разумным было бы гарантировать клиентскому коду, что после возврата из unsubscribe(...) соответствующий функтор уже не вызывается и вызываться уже никогда не будет. Клиент вполне может захотеть удалить, к примеру, связанный с этим функтором объект после этого.

Чтобы реализовать соответствующие гарантии, будем хранить в каждой структуре subscription счётчик, сколько потоков прямо сейчас непосредственно вызвали функтор. И положим рядом condition variable, на котором можно будет организовать ожидание, что никто уже не выполняет этот функтор.

Итого:

class notifier {
    ... // as before
private:
    struct subscription {
        std::function<void()> m_callback;
        const sub_id_t m_id;
        // how many notify cycles use this object right now
        unsigned m_refs = 0;
        std::condition_variable m_waiter;

        subscription(std::function<void()> c, sub_id_t id)
            : m_callback(std::move(c)), m_id(id)
        {}
    };

public:
    ... // as before

    bool unsubscribe(sub_id_t id) {
        std::unique_lock l{m_list_mtx};

        auto it = find_if(m_list.begin(), m_list.end(),
            [id](auto& v){ return v.m_id == id; });

        if (it != m_list.end()) {
            // if somebody executes this subscription's callback, m_refs != 0.
            // So this line unlocks the global lock and waits for notification
            it->m_waiter.wait(l, [&it](){ return ! it->m_refs; });
            m_list.erase(it);
            return true;
        }
        return false;
    }

    void notify() {
        std::unique_lock l{m_list_mtx};

        for (auto &s : m_list) {
            // mark this subscription as used so nobody can remove it
            ++s.m_refs;
            l.unlock();

            try {
                s.m_callback();
            } catch (...) {
            }

            l.lock();
            if (! --s.m_refs)
                s.m_waiter.notify_all();
        }
    }
    ... // as before
};

Простенько и со вкусом! Вполне себе неплохое решение, за исключением всё ещё отсутствия аргументов для функторов и маленького факта - отписку нельзя вызывать прямо или косвенно из кода подписчика. В противном случае получим взаимоблокировку (dead lock). Исполнение зависнет на ожидании condition variable в unsubscribe(...), пока не останется потоков выполняющих функтор этого подписчика. Но так как в цепочке вызова этого функтора и вызван unsubscribe(...), ожидаемой ситуации никогда не наступит.

Либо нужно писать строгим шрифтом пользователям этого оповещателя такое ограничение на его использование, либо как-то разрешить эту ситуацию в коде. Так как в серьёзном промышленном коде цепочки вызовов настолько длинны и неисповедимы, что очень сложно гарантировать, откуда и когда будет вызван unsubscribe(...), лучше решить это всё-таки в коде оповещателя. И не требовать от клиентов слишком многого. Но это улучшение будем делать в следующей статье.

Как это использовать?

Элементарно!

int main() {
    notifier s;

    // trivial logger streaming into std::cout with locks
    g_sync_logger() << "---- test1 ----";

    // Trivial test for delivering an event on the same thread
    auto id1 = s.subscribe([]{ g_sync_logger() << "subscriber 1 executed"; });
    s.notify();
    verify(s.unsubscribe(id1));

    s.notify(); // nothing to notify

    g_sync_logger() << "---- test2 ----";

    // MT test for unsubscribing while delivering an event
    id1 = s.subscribe([]{
        g_sync_logger() << "subscriber 2 started";
        std::this_thread::sleep_for(std::chrono::seconds(1));
        g_sync_logger() << "subscriber 2 finihed";
    });

    std::thread t{[&s](){ s.notify(); }};

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    g_sync_logger() << "trying to unsubscribe the subscriber 2";
    verify(s.unsubscribe(id1));
    g_sync_logger() << "finished unsubscription of the subscriber 2";

    t.join();
}

Вывод приложения (в квадратных скобках - ID потока):

$ ./notifs-1-1
75043.766 [44674] ---- test1 ----
75043.766 [44674] subscriber 1 executed
75043.766 [44674] ---- test2 ----
75043.766 [44675] subscriber 2 started
75043.966 [44674] trying to unsubscribe the subscriber 2
75044.766 [44675] subscriber 2 finihed
75044.766 [44674] finished unsubscription of the subscriber 2

Как-то так. Оповещатель занимает всего 80 строк. Дальнейшие шаги продолжим в следующей статье цикла.

Исходный код текущей версии подписчика на Github

А в заключение небольшой опрос.

Комментарии (18)


  1. Sabirman
    18.09.2024 11:15

    А почему не shared_mutex вместо счетчик потоков, которые занимаются рассылкой ?

    https://learn.microsoft.com/ru-ru/cpp/standard-library/shared-mutex?view=msvc-170


    1. Corosan Автор
      18.09.2024 11:15

      Эти улучшения - в следующих статьях. Это первая статья в цикле статей. Если интересно, по ссылке на Github - в соседних файлах есть shared_mutex.


  1. outlingo
    18.09.2024 11:15
    +2

    А почему просто не с клонировать вектор подписчиков и итерироваться по нему? Тогда лок вывешивается только на время клонирования и проблема решена, все подписчики на момент вызова получат уведомление. Нет ничего страшного в клонировании вектора


    1. Abstraction
      18.09.2024 11:15

      Проблема:

      struct Data { int val; };
      struct User { std::unique_ptr<Data> data; };
      
      User u;
      //...
      //подписка
      auto subscriptionId = s.subscribe([&](){LOG() << u.data->val;});
      //...
      //отписка
      s.unsubscribe(subscriptionId);
      u.data = nullptr;

      Если мы в одном потоке скопировали вектор и пошли по нему, а за это время другой поток успел отписаться и занулить указатель, то произойдёт упс.


      1. outlingo
        18.09.2024 11:15
        +2

        произойдёт упс.

        У вас тут наличествует откровенный мисдизайн.

        Ваш эвент должен содержать только данные которыми владеет только сам эвент и которые он освободит в момент своего удаления - то есть конструктор эвента должен клонировать данные, и деструктор эвента должен высвобождать данные, либо ссылки на ресурсы которые не будут удалены за всё время существования эвента.

        А если вы используете shared resource - то необходимо использовать защиту ресурса (семафором, менеджером ресурсов, умным указателем - любым удобным вам инструментом) от удаления - а может и от модификации - на все то время, пока на ресурс есть ссылки.

        Пихать же указатель в асинхронную обработку и освобождать его, не проверив пока таски не закончились - это безумие и прямой путь к сегфолту.


        1. titovmaxim
          18.09.2024 11:15
          +1

          Как-то делали свою систему реактивности на c++. И подробная задача встала в чуть более обобщённом варианте. Нужна была возможность отдать из объекта callback, который имеет доступ к полям объекта. Не только для рассылок, в объем случае. Сам объект мог начать уничтожаться в произвольный момент из произвольного места. Завернуть весь объект в шаред и удерживать его не всегда было возможно. В итоге сделали под такие callback отдельный класс с блокирующим деструктором. И помещали их в конец определения класса. Пока шла обработка вызова callback, его деструктор просто не давал родительскому объекту уничтожитьия. Бонусом убрали необходимость обязательной рассылки. По типу weak_ptt. Если такой callback умер, тот вызова и не будет. Само потом почиститься. В итоге и ручных отписок в очень большом production приложении практически нет. За исключением какой-то сложной логики


        1. Abstraction
          18.09.2024 11:15

          либо ссылки на ресурсы которые не будут удалены за всё время существования эвента.

          Ну так в этом и проблема: с точки зрения клиента после возвращения из unsubscribe() эвент больше не существует. А в вашем решении получается, что каких-либо гарантий о том, когда именно он прекращает существовать, у клиента нет вообще (при обходе вектора предыдущий обработчик взял и вызвал sleep(20)).

          С практической стороны, если хочется создать обработчик что-то делающий с объектом, то что прикажете делать в деструкторе этого объекта? Никакие умные указатели в полях объекта не помогут, объект должен умереть. Гарантировать что деструктор не будет вызван во время вызова notify()?


          1. outlingo
            18.09.2024 11:15

            Так у вас эта проблема точно так же есть.

            Цепочка l.unlock(); s.m_callback() неатомарна. Я вам больше скажу, сам вызов s.m_callback() описывается в несколько инструкций, после любой из которых тред может быть приостановлен и при этом уже разблокирован, но код внутри m_callback еще не начал выполняться.

            Поэтому unsubscribe() может просунуться между этими двумя вызовами и вы ВНЕЗАПНО! (нет) получите в точности ту же проблему.

            Так что самым правильным будет сделать две реализации - одна синхронная, под глобальным локом, в которой unsubscribe / subscribe не завершатся пока не завершатся все notify() - с которой вы и начали (что и правильно), и вторая более оптимистичная, которая реализует модель "после вызова unsubscribe новые вызовы notify уже не попадут в обработчик, но уже сделанные незавершенные вызовы могут привести к его вызову" (ситуация когда функтор, как вы его назвали, не удаляется).

            А половинчатые решения со скрытым race condition как в вашем примере... Не надо их притаскивать. Их очень трудно отдебажить но на них легко подскользнуться.


            1. Abstraction
              18.09.2024 11:15

              (На всякий случай: я не автор статьи и не автор текста статьи, я комментатор.)

              В случае вызова unsubscribe() во втором варианте он же заблокируется в it->m_waiter.wait(l, [&it](){ return ! it->m_refs; }); . Или я чего-то не вижу?


              1. outlingo
                18.09.2024 11:15

                И это даже хуже. В notify лок взяли, начали цикл, сняли лок внутри цикла, ушли в колобок. И тут кто то вызывает unsubscribe параллельно, unsubscribe весь под локом, лок взят. Unsubscribe ачинает ожидать завершения использования элемента через счётчик ссылок. Под локом, да. В notify завершается обработка этой подписки и пытается взять лок внутри цикла перед уменьшением счётчика ссылок. Но этот лок удерживается unsubscribe. Всё, повисли.


                1. Abstraction
                  18.09.2024 11:15

                  std::condition_variable::wait() не удерживает блокировку непрерывно же?..


                  1. outlingo
                    18.09.2024 11:15
                    +1

                    И во время снятого в ожидании лока вызывается второй unsubscribe с теми же параметрами к нам приходит double-free?


                    1. Abstraction
                      18.09.2024 11:15

                      Выглядит как проблема. @Corosan , мы чего-то не понимаем, или это действительно failure mode, и предполагается что двойной вызов unsubscribe() есть ошибка?


  1. domix32
    18.09.2024 11:15

    Выглядит как будто структуре не нужен конструктор вовсе - ничто не мешает её инициализировать обычным {} , а в свежих плюсах можно ещё и как в си красиво сделать

    {.id=next_id(), .callback=std::move(callback),...}