Сегодняшняя статья вдохновила меня поделиться своим способом вынесения баз данных в отдельный тред. Способ подходит не только для БД, но и для любых взаимодействий, описываемых паттерном «в отдельном потоке живёт какой-то объект, надо у него что-то спрашивать и что-то с ним делать». Кроме того, способ хорош тем, что он пытается быть типобезопасным и расширяемым: никаких stringly-typed
Disclaimer: приведенный здесь код является куском одного моего большого проекта, поэтому в нём будут использоваться некоторые вспомогательные библиотечные функции из этого проекта. Я, однако, постараюсь не упустить случаи такого использования и опишу их семантику.
Итак, начнём с самого главного: как бы нам хотелось работать с объектом в отдельном потоке? В идеале мы просто дёргаем какие-то методы у какого-то объекта, методы возвращают
Вспомним, что декомпозиция — наш друг, поэтому возьмём нашу исходную задачу «надо дёргать что-то в отдельном треде» и рассмотрим её кусочек «надо держать отдельный поток и обеспечивать потокобезопасный вызов чего-то в нём с возвратом
Решим эту задачу следующим образом: объект-наследник
Получится что-то такое:
То есть,
После этого излучается сигнал
Посмотрим теперь на реализацию методов
С этими функциями всё просто: подсоединяем сигнал
Отметим, что именно Qt'шный механизм сигналов-слотов ответственен за то, чтобы
Собственно, на этом и всё. В качестве примера использования можно привести, например, кусочек системы хранения аватарок на диске в IM-клиенте:
А вот реализация
Ну и напоследок, отметим недостатки предложенного решения:
QMetaObject::invokeMethod()
, никаких передач результатов дёрганья объекта в потоке через сигналы. Только прямой вызов функций, только QFuture
!Disclaimer: приведенный здесь код является куском одного моего большого проекта, поэтому в нём будут использоваться некоторые вспомогательные библиотечные функции из этого проекта. Я, однако, постараюсь не упустить случаи такого использования и опишу их семантику.
Итак, начнём с самого главного: как бы нам хотелось работать с объектом в отдельном потоке? В идеале мы просто дёргаем какие-то методы у какого-то объекта, методы возвращают
QFuture<T>
, готовность которых будет означать, что соответствующий асинхронный метод закончил выполняться, и у него появились результаты типа T
.Вспомним, что декомпозиция — наш друг, поэтому возьмём нашу исходную задачу «надо дёргать что-то в отдельном треде» и рассмотрим её кусочек «надо держать отдельный поток и обеспечивать потокобезопасный вызов чего-то в нём с возвратом
QFuture
».Решим эту задачу следующим образом: объект-наследник
QThread
, отвечающий за управление потоком, имеет метод ScheduleImpl()
, вызываемый из основного потока (да и из других тоже), принимающий некоторый функтор, оборачивающий этот функтор в QFuture
и сохраняющий всё, что нужно, в специальную очередь, которая затем обработается внутри QThread::run()
.Получится что-то такое:
class WorkerThreadBase : public QThread
{
Q_OBJECT
QMutex FunctionsMutex_;
QList<std::function<void ()>> Functions_;
public:
using QThread::QThread;
protected:
void run () override;
virtual void Initialize () = 0;
virtual void Cleanup () = 0;
template<typename F>
QFuture<ResultOf_t<F ()>> ScheduleImpl (const F& func)
{
QFutureInterface<ResultOf_t<F ()>> iface;
iface.reportStarted ();
auto reporting = [func, iface] () mutable
{
ReportFutureResult (iface, func);
};
{
QMutexLocker locker { &FunctionsMutex_ };
Functions_ << reporting;
}
emit rotateFuncs ();
return iface.future ();
}
private:
void RotateFuncs ();
signals:
void rotateFuncs ();
};
Пояснения по всяким ReportFutureResult и ResultOf_t
ResultOf_t
— прямой аналог std::result_of_t
из C++14. Мой проект, к сожалению, пока ещё должен поддерживать C++11-компиляторы.template<typename T>
using ResultOf_t = typename std::result_of<T>::type;
ReportFutureResult
берёт функтор, его аргументы, выполняет функтор и помечает соответствующий QFutureInterface
как готовый, заодно передавая ему результат выполнения функтора, или заворачивает в QFutureInterface
исключение, если выполнение функтора завершилось этим самым исключением. К сожалению, дело несколько осложняется возвращающими void функторами: для них приходится писать отдельную функцию, потому что в C++ нельзя объявить переменную типа void
. Такая уж у нас система типов, эх, тип есть, значение в нём есть, а объявить нельзя.template<typename R, typename F, typename... Args>
EnableIf_t<!std::is_same<R, void>::value>
ReportFutureResult (QFutureInterface<R>& iface, F&& f, Args... args)
{
try
{
const auto result = f (args...);
iface.reportFinished (&result);
}
catch (const QtException_t& e)
{
iface.reportException (e);
iface.reportFinished ();
}
catch (const std::exception& e)
{
iface.reportException (ConcurrentStdException { e });
iface.reportFinished ();
}
}
template<typename F, typename... Args>
void ReportFutureResult (QFutureInterface<void>& iface, F&& f, Args... args)
{
try
{
f (args...);
}
catch (const QtException_t& e)
{
iface.reportException (e);
}
catch (const std::exception& e)
{
iface.reportException (ConcurrentStdException { e });
}
iface.reportFinished ();
}
QtException_t
нужен для поддержки сборки с Qt4:#if QT_VERSION < 0x050000
using QtException_t = QtConcurrent::Exception;
#else
using QtException_t = QException;
#endif
ConcurrentStdException
заворачивает стандартное исключение в такое, которое понимает Qt'шный механизм QFuture'ов, но его реализация чуть сложнее и здесь не столь важна.То есть,
ScheduleImpl()
принимает некий функтор с сигнатурой типа T ()
, возвращает QFuture<T>
, оборачивает функтор в специальную функцию, теперь уже с сигнатурой void ()
, связанную с возвращаемым QFuture<T>
, и которая по выполнению функтора этот QFuture<T>
пометит готовым, и добавляет эту обёртку в очередь После этого излучается сигнал
rotateFuncs()
, который внутри run()
соединяется с методом RotateFuncs()
, как раз ответственным за обработку очереди сохраненных обёрток функторов.Посмотрим теперь на реализацию методов
run()
и RotateFuncs()
:void WorkerThreadBase::run ()
{
SlotClosure<NoDeletePolicy> rotator
{
[this] { RotateFuncs (); },
this,
SIGNAL (rotateFuncs ()),
nullptr
};
Initialize ();
QThread::run ();
Cleanup ();
}
void WorkerThreadBase::RotateFuncs ()
{
decltype (Functions_) funcs;
{
QMutexLocker locker { &FunctionsMutex_ };
using std::swap;
swap (funcs, Functions_);
}
for (const auto& func : funcs)
func ();
}
Немного про SlotClosure
Шаблонный аргумент
SlotClosure
— вспомогательный класс, помогающий присоединять сигналы к лямбдам, а не слотам. В Qt5 для этого есть более адекватный синтаксис, но мне, к сожалению, также пока ещё надо поддерживать и Qt4-сборку.SlotClosure
устроен просто, он вызывает свой первый аргумент каждый раз, когда объект, являющийся вторым аргументом, излучит сигнал-третий аргумент. Четвёртый аргумент — родительский объект. Здесь у нас SlotClosure
задаётся на стеке, поэтому родители не нужны.Шаблонный аргумент
NoDeletePolicy
означает, что объект не должен заканчивать жизнь самоубийством после первого сигнала. Среди других политик удаления есть ещё, например, DeleteLaterPolicy
, удаляющий объект-соединение после первого срабатывания сигнала, что удобно для различных однократно выполняемых задач.С этими функциями всё просто: подсоединяем сигнал
rotateFuncs()
к функции RotateFuncs()
(хм, интересно, сколько комментариев будет на тему стиля именования?), вызываем функцию инициализации объектов потока, определённую где-то в наследнике, и начинаем крутить поток. Когда владелец потока сделает потоку quit()
, QThread::run()
вернёт управление, и наследник сможет подчистить за собой в Cleanup()
.Отметим, что именно Qt'шный механизм сигналов-слотов ответственен за то, чтобы
rotateFuncs()
, излучаемый из основного потока, вызывал RotateFuncs()
в потоке нашего WorkerThreadBase
.RotateFuncs()
же ненадолго блокирует основную очередь, перемещая её к себе, после чего начинает её последовательно выполнять.Собственно, на этом и всё. В качестве примера использования можно привести, например, кусочек системы хранения аватарок на диске в IM-клиенте:
avatarsstoragethread.h
class AvatarsStorageThread final : public Util::WorkerThreadBase
{
std::unique_ptr<AvatarsStorageOnDisk> Storage_;
public:
using Util::WorkerThreadBase::WorkerThreadBase;
QFuture<void> SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData);
QFuture<boost::optional<QByteArray>> GetAvatar (const QString& entryId, IHaveAvatars::Size size);
QFuture<void> DeleteAvatars (const QString& entryId);
protected:
void Initialize () override;
void Cleanup () override;
};
avatarsstoragethread.cpp
QFuture<void> AvatarsStorageThread::SetAvatar (const QString& entryId,
IHaveAvatars::Size size, const QByteArray& imageData)
{
return ScheduleImpl ([=] { Storage_->SetAvatar (entryId, size, imageData); });
}
QFuture<boost::optional<QByteArray>> AvatarsStorageThread::GetAvatar (const QString& entryId, IHaveAvatars::Size size)
{
return ScheduleImpl ([=] { return Storage_->GetAvatar (entryId, size); });
}
QFuture<void> AvatarsStorageThread::DeleteAvatars (const QString& entryId)
{
return ScheduleImpl ([=] { Storage_->DeleteAvatars (entryId); });
}
void AvatarsStorageThread::Initialize ()
{
Storage_.reset (new AvatarsStorageOnDisk);
}
void AvatarsStorageThread::Cleanup ()
{
Storage_.reset ();
}
А вот реализация
AvatarsStorageOnDisk
— отдельная интересная тема, связанная с моим доморощенным недо-ORM-фреймворком, позволяющим генерировать таблички, SQL-запросы и соответствующие функции для вставки/удаления/обновления по описанию структуры с данными через Boost.Fusion. Впрочем, конкретно к вопросу многопоточности эта реализация никак не относится, что, в общем-то, и хорошо, особенно с точки зрения декомпозиции исходной задачи.Ну и напоследок, отметим недостатки предложенного решения:
- Нужно дублировать в публичном API класса-наследника
WorkerThreadBase
все методы, которые захочется вызвать у выносимого в отдельный поток объекта. Как достаточно эффективно решить эту проблему, я сходу не придумал. Initialize()
иCleanup()
прямо просятся обернуться в какое-нибудь RAII. Стоит придумать что-нибудь на эту тему.
iCpu
Забавно, особенно про QFutureInterface. Не знал, что он есть. Что ж, пойду делать работу над ошибками.