Сегодняшняя статья вдохновила меня поделиться своим способом вынесения баз данных в отдельный тред. Способ подходит не только для БД, но и для любых взаимодействий, описываемых паттерном «в отдельном потоке живёт какой-то объект, надо у него что-то спрашивать и что-то с ним делать». Кроме того, способ хорош тем, что он пытается быть типобезопасным и расширяемым: никаких stringly-typed QMetaObject::invokeMethod(), никаких передач результатов дёрганья объекта в потоке через сигналы. Только прямой вызов функций, только QFuture!

Disclaimer: приведенный здесь код является куском одного моего большого проекта, поэтому в нём будут использоваться некоторые вспомогательные библиотечные функции из этого проекта. Я, однако, постараюсь не упустить случаи такого использования и опишу их семантику.

Итак, начнём с самого главного: как бы нам хотелось работать с объектом в отдельном потоке? В идеале мы просто дёргаем какие-то методы у какого-то объекта, методы возвращают QFuture<T>, готовность которых будет означать, что соответствующий асинхронный метод закончил выполняться, и у него появились результаты типа T.

Вспомним, что декомпозиция — наш друг, поэтому возьмём нашу исходную задачу «надо дёргать что-то в отдельном треде» и рассмотрим её кусочек «надо держать отдельный поток и обеспечивать потокобезопасный вызов чего-то в нём с возвратом QFuture».

Решим эту задачу следующим образом: объект-наследник QThread, отвечающий за управление потоком, имеет метод ScheduleImpl(), вызываемый из основного потока (да и из других тоже), принимающий некоторый функтор, оборачивающий этот функтор в QFuture и сохраняющий всё, что нужно, в специальную очередь, которая затем обработается внутри QThread::run().

Получится что-то такое:
class WorkerThreadBase : public QThread
{
	Q_OBJECT

	QMutex FunctionsMutex_;
	QList<std::function<void ()>> Functions_;
public:
	using QThread::QThread;
protected:
	void run () override;

	virtual void Initialize () = 0;
	virtual void Cleanup () = 0;

	template<typename F>
	QFuture<ResultOf_t<F ()>> ScheduleImpl (const F& func)
	{
		QFutureInterface<ResultOf_t<F ()>> iface;
		iface.reportStarted ();

		auto reporting = [func, iface] () mutable
		{
			ReportFutureResult (iface, func);
		};

		{
			QMutexLocker locker { &FunctionsMutex_ };
			Functions_ << reporting;
		}

		emit rotateFuncs ();

		return iface.future ();
	}
private:
	void RotateFuncs ();
signals:
	void rotateFuncs ();
};


Пояснения по всяким ReportFutureResult и ResultOf_t
ResultOf_t — прямой аналог std::result_of_t из C++14. Мой проект, к сожалению, пока ещё должен поддерживать C++11-компиляторы.
template<typename T>
using ResultOf_t = typename std::result_of<T>::type;


ReportFutureResult берёт функтор, его аргументы, выполняет функтор и помечает соответствующий QFutureInterface как готовый, заодно передавая ему результат выполнения функтора, или заворачивает в QFutureInterface исключение, если выполнение функтора завершилось этим самым исключением. К сожалению, дело несколько осложняется возвращающими void функторами: для них приходится писать отдельную функцию, потому что в C++ нельзя объявить переменную типа void. Такая уж у нас система типов, эх, тип есть, значение в нём есть, а объявить нельзя.
template<typename R, typename F, typename... Args>
EnableIf_t<!std::is_same<R, void>::value>
	ReportFutureResult (QFutureInterface<R>& iface, F&& f, Args... args)
{
	try
	{
		const auto result = f (args...);
		iface.reportFinished (&result);
	}
	catch (const QtException_t& e)
	{
		iface.reportException (e);
		iface.reportFinished ();
	}
	catch (const std::exception& e)
	{
		iface.reportException (ConcurrentStdException { e });
		iface.reportFinished ();
	}
}

template<typename F, typename... Args>
void ReportFutureResult (QFutureInterface<void>& iface, F&& f, Args... args)
{
	try
	{
		f (args...);
	}
	catch (const QtException_t& e)
	{
		iface.reportException (e);
	}
	catch (const std::exception& e)
	{
		iface.reportException (ConcurrentStdException { e });
	}

	iface.reportFinished ();
}


QtException_t нужен для поддержки сборки с Qt4:
#if QT_VERSION < 0x050000
	using QtException_t = QtConcurrent::Exception;
#else
	using QtException_t = QException;
#endif


ConcurrentStdException заворачивает стандартное исключение в такое, которое понимает Qt'шный механизм QFuture'ов, но его реализация чуть сложнее и здесь не столь важна.


То есть, ScheduleImpl() принимает некий функтор с сигнатурой типа T (), возвращает QFuture<T>, оборачивает функтор в специальную функцию, теперь уже с сигнатурой void (), связанную с возвращаемым QFuture<T>, и которая по выполнению функтора этот QFuture<T> пометит готовым, и добавляет эту обёртку в очередь

После этого излучается сигнал rotateFuncs(), который внутри run() соединяется с методом RotateFuncs(), как раз ответственным за обработку очереди сохраненных обёрток функторов.

Посмотрим теперь на реализацию методов run() и RotateFuncs():
void WorkerThreadBase::run ()
{
	SlotClosure<NoDeletePolicy> rotator
	{
		[this] { RotateFuncs (); },
		this,
		SIGNAL (rotateFuncs ()),
		nullptr
	};

	Initialize ();

	QThread::run ();

	Cleanup ();
}

void WorkerThreadBase::RotateFuncs ()
{
	decltype (Functions_) funcs;

	{
		QMutexLocker locker { &FunctionsMutex_ };

		using std::swap;
		swap (funcs, Functions_);
	}

	for (const auto& func : funcs)
		func ();
}


Немного про SlotClosure
SlotClosure — вспомогательный класс, помогающий присоединять сигналы к лямбдам, а не слотам. В Qt5 для этого есть более адекватный синтаксис, но мне, к сожалению, также пока ещё надо поддерживать и Qt4-сборку.

SlotClosure устроен просто, он вызывает свой первый аргумент каждый раз, когда объект, являющийся вторым аргументом, излучит сигнал-третий аргумент. Четвёртый аргумент — родительский объект. Здесь у нас SlotClosure задаётся на стеке, поэтому родители не нужны.

Шаблонный аргумент NoDeletePolicy означает, что объект не должен заканчивать жизнь самоубийством после первого сигнала. Среди других политик удаления есть ещё, например, DeleteLaterPolicy, удаляющий объект-соединение после первого срабатывания сигнала, что удобно для различных однократно выполняемых задач.


С этими функциями всё просто: подсоединяем сигнал rotateFuncs() к функции RotateFuncs() (хм, интересно, сколько комментариев будет на тему стиля именования?), вызываем функцию инициализации объектов потока, определённую где-то в наследнике, и начинаем крутить поток. Когда владелец потока сделает потоку quit(), QThread::run() вернёт управление, и наследник сможет подчистить за собой в Cleanup().

Отметим, что именно Qt'шный механизм сигналов-слотов ответственен за то, чтобы rotateFuncs(), излучаемый из основного потока, вызывал RotateFuncs() в потоке нашего WorkerThreadBase.

RotateFuncs() же ненадолго блокирует основную очередь, перемещая её к себе, после чего начинает её последовательно выполнять.

Собственно, на этом и всё. В качестве примера использования можно привести, например, кусочек системы хранения аватарок на диске в IM-клиенте:
avatarsstoragethread.h
class AvatarsStorageThread final : public Util::WorkerThreadBase
{
	std::unique_ptr<AvatarsStorageOnDisk> Storage_;
public:
	using Util::WorkerThreadBase::WorkerThreadBase;

	QFuture<void> SetAvatar (const QString& entryId, IHaveAvatars::Size size, const QByteArray& imageData);
	QFuture<boost::optional<QByteArray>> GetAvatar (const QString& entryId, IHaveAvatars::Size size);
	QFuture<void> DeleteAvatars (const QString& entryId);
protected:
	void Initialize () override;
	void Cleanup () override;
};


avatarsstoragethread.cpp
QFuture<void> AvatarsStorageThread::SetAvatar (const QString& entryId,
		IHaveAvatars::Size size, const QByteArray& imageData)
{
	return ScheduleImpl ([=] { Storage_->SetAvatar (entryId, size, imageData); });
}

QFuture<boost::optional<QByteArray>> AvatarsStorageThread::GetAvatar (const QString& entryId, IHaveAvatars::Size size)
{
	return ScheduleImpl ([=] { return Storage_->GetAvatar (entryId, size); });
}

QFuture<void> AvatarsStorageThread::DeleteAvatars (const QString& entryId)
{
	return ScheduleImpl ([=] { Storage_->DeleteAvatars (entryId); });
}

void AvatarsStorageThread::Initialize ()
{
	Storage_.reset (new AvatarsStorageOnDisk);
}

void AvatarsStorageThread::Cleanup ()
{
	Storage_.reset ();
}



А вот реализация AvatarsStorageOnDisk — отдельная интересная тема, связанная с моим доморощенным недо-ORM-фреймворком, позволяющим генерировать таблички, SQL-запросы и соответствующие функции для вставки/удаления/обновления по описанию структуры с данными через Boost.Fusion. Впрочем, конкретно к вопросу многопоточности эта реализация никак не относится, что, в общем-то, и хорошо, особенно с точки зрения декомпозиции исходной задачи.

Ну и напоследок, отметим недостатки предложенного решения:
  1. Нужно дублировать в публичном API класса-наследника WorkerThreadBase все методы, которые захочется вызвать у выносимого в отдельный поток объекта. Как достаточно эффективно решить эту проблему, я сходу не придумал.
  2. Initialize() и Cleanup() прямо просятся обернуться в какое-нибудь RAII. Стоит придумать что-нибудь на эту тему.

Комментарии (1)


  1. iCpu
    23.12.2015 06:57

    Забавно, особенно про QFutureInterface. Не знал, что он есть. Что ж, пойду делать работу над ошибками.