Ссылки на статьи

Предисловие

На дворе четыре часа ночи. В душе не понимаю, зачем я это пишу, чего я хочу этим добиться, и т.д. Если вкратце, то это будет цикл статей из разряда "хоба, как могу", причём это самое "хоба" зачастую слишком очевидно и элементарно, да и далеко не всегда полезно, особенно, в контексте философии Qt. Так что это будут просто "размышления обо всём".

Что же мы тут будем делать?

Рассмотрим реализацию паттерна Active Object на C++ с различными примочками из Qt. Сначала поковыряю пару хелловордов, потом перейдём к чему-то хоть сколько-то юзабельному в контексте реального кода.

Я буду использовать Qt6.4, потому что в нём есть много прикольных штук, которых не было в Qt5 (как минимум, QPromise и QHttpServer, которого все так долго ждали), но используя какую-то фичу Qt6, я буду стараться давать альтернативу для Qt5.

Немного про пациента

Active object — паттерн многопоточного программирования, основная задача которого — разделение вызова некоторой операции и её выполнения. Если по простому: один поток дёргает функцию, а вот её выполнением займётся уже возможно этот же, возможно, другой поток. При этом, поток-исполнитель выполняет все запросы последовательно, что исключает любую необходимость синхронизации доступа.

В большинстве случаев весь Active object сводится к двум подходам: future + continuations или просто коллбеки, если нормального фьючерса под рукой не нашлось (здравствуй, C++).

Надо отметить, что часть тех вещей, которые будут описаны тут, не очень сильно похожи на каноничную реализацию паттерна. Причина этому проста: асинхронная природа Qt, которая зачастую избавляет от кучи головной боли при синхронизации потоков. Но, как я уже отметил выше, весь цикл статей будет просто набором моих размышлений и экспериментов на эту тему.

Первый блин

С чего следует начать? Правильно: начинать всегда следует с плохого примера. Для начала изобразим самую простейшую схему реализации Active object.

Примитивный Active object
Примитивный Active object

Работа происходит следующим образом: Client идёт к Active object и просит его выполнить некоторую задачу. Active object отдаёт Client-у future на результат выполнения задачи, и кладёт задачу в очередь. После чего поток-исполнитель забирает задачу из очереди, выполняет её, и записывает результат в promise.

Начнём с самого простого, и при этом плохого способа реализации, который только можно выдумать: унаследоваться от класса QThread, и переопределить метод run. Почему этот способ плох, поясню после реализации.

Наш первый Active object не будет делать ничего выдающегося. Пусть для начала он просто выводит сообщения в консоль и уведомляет того, кто это сообщение отправил о том, что оно напечатано.

Вместо QPromise из Qt6 можно взять библиотеку QtPromise. Божественная штука, довольно приятный интерфейс, много полезных примочек.

AsyncQDebugPrinter.h
class AsyncQDebugPrinter : public QThread {
private:
    class PrinterMessage {
    private:
        QPromise<void> m_promise;
        const QString m_message;
    public:
        PrinterMessage(const QString &message);
        const QString& message() const;
        QPromise<void>& promise();
    };
private:
    std::queue<PrinterMessage> m_messages;
    std::condition_variable m_messagesAwaiter;
    std::mutex m_mutex;
public:
    explicit AsyncQDebugPrinter(QObject *parent = nullptr);
    QFuture<void> print(const QString& message);
protected:
    virtual void run() override;
};

Класс PrinterMessage инкапсулирует внутри себя сообщение и промис, установка которого уведомит отправителя сообщения, что оно напечатано. Т.к. он содержит внутри себя экземпляр move-only класса QPromise, то и сам является move-only классом.

Реализация PrinterMessage
AsyncQDebugPrinter::PrinterMessage::PrinterMessage(const QString &message)
    :m_message{ message } {}
const QString& AsyncQDebugPrinter::PrinterMessage::message() const {
    return m_message;
}
QPromise<void> &AsyncQDebugPrinter::PrinterMessage::promise() {
    return m_promise;
}

Реализация AsyncQDebugPrinter достаточно тривиальна:

Метод print принимает сообщение, создаёт задачу на его вывод (task) и берёт фьючерс от этой задачи. Далее блокирует мьютекс, перемещает task в очередь сообщений, уведомляет condition_variable о получении нового сообщения, и возвращает вызывающий стороне фьючерс, после чего происходит разблокировка мьютекса.

Метод print
//AsyncQDebugPrinter.cpp
AsyncQDebugPrinter::AsyncQDebugPrinter(QObject *parent)
    :QThread{ parent } {}

QFuture<void> AsyncQDebugPrinter::print(const QString &message) {
    auto task = PrinterMessage{ message };
    auto future = task.promise().future();
    std::lock_guard locker{ m_mutex };
    m_messages.push(std::move(task));
    m_messagesAwaiter.notify_one();
    return future;
}

В методе run крутится поток-исполнитель. Крутится он до тех пор, пока кто-то извне не дёрнет метод requestInterruption() класс QThread.

Вся работа метода сводится к тому, чтобы condition_variable дождался, пока в очереди не появятся новые сообщения. Далее эта очередь перемещается в буфер, после чего блокировка снимается. Это важный момент: блокировка есть только при работе с очередью, при этом она минимальна, а при самой обработке сообщений никаких блокировок нет в принципе.

Ну и затем происходит перебор всех сообщений в очереди, текст выводится на экран, а промис устанавливается в состояние finished.

Метод run
//AsyncQDebugPrinter.cpp
void AsyncQDebugPrinter::run() {
    while(not isInterruptionRequested()) {
        std::unique_lock<std::mutex> locker{ m_mutex };
        if(m_messagesAwaiter.wait_until(locker, std::chrono::steady_clock::now() + 500ms, [this]() ->bool { return not m_messages.empty(); })) {
            auto buffer = std::move(m_messages);
            locker.unlock();
            
            while(not buffer.empty()) {
                qDebug() << buffer.front().message();
                buffer.front().promise().finish();
                buffer.pop();
            }
        }
    }
}

Пробуем в использовании

Запустить это нечто достаточно просто. Создаём объект, дёргаем метод start, и радуемся возможности делать print("Hello, world") из разных потоков.

qDebug() << "Start application";//А вот тут у нас будет id главного потока
auto printer = new AsyncQDebugPrinter{ qApp };
printer->start();

printer->print("Hello, world!").then([printer] {
    qDebug() << "In continuation";
    printer->print("Previous message was printed");
});

На выводе будет картина маслом:

Циферки 16036 и 16037 — это, собственно, id потоков. Видно, что "Start application" выведен в главном потоке, а вот остальные сообщения — уже в потоке-исполнителе Active object.

Причём тут нужно отметить, что continuation (лямбда, закинутая в .then) тоже выполняется потоком active object. Это значит, что он занимается не своей работой.

Конечно, можно просто делать дешёвые continuations, которые только лишь будут запускать новые задачи, но это не путь джедаев. Мы сделаем нормально. Немного подкорректируем код: добавим в .then первым параметром (перед лямбдой) такую штуку:

    printer->print("Hello, world!").then(QtFuture::Launch::Async, [printer] {
        qDebug() << "In continuation";
        printer->print("Previous message was printed");
    });

QtFuture::Launch::Async заставит лямбду выполниться на потоках QThreadPool::globalInstance(), а Active object сможет заниматься исключительно тем, для чего мы его и создавали.

Вот теперь другое дело:

  • 16249 — главный поток

  • 16250 — поток active object

  • 16251 — поток из QThreadPool::globalInstance()

Не вздумайте создавать какие-либо QObject в continuation, которые выполняются на потоках QThreadPool. Эти потоки не проводят события, а значит, любой QObject будет мёртв (ни сигналы, ни события работать не будут). К слову, создавать QObject в continuation, выполняемых на потоке этого Active object также нельзя, потому что он точно также не провайдит никаких событий.

Так а почему эта реализация плоха?

Начать следует с того, что наследоваться от QThread — плохая идея. Об этом подробно рассказано здесь.

Методы quit и exit работать не будут, т.к. мы не запустили QEventLoop в методе run. Класс с нерабочим интерфейсом - что может быть хуже?

Если не вызвать requestInterruption() и не дождаться конца выполнения потока до удаление executor-а, то при вызове его дестркутора будет вызван terminate и пользователь увидит некрасивое сообщение об ошибке. С другой стороны, можно добавить принудительное ожидание в деструкторе Active Object, что решит эту проблему.

Заключение

Первая статья получилась вводной и пока не раскрыла плюсы Active object. Попробую в дальнейшем это исправить.

Исходный код примера доступе на GitHub.

Комментарии (0)