В мире существует множество библиотек, реализующих сигналы в C++. К сожалению, у всех реализаций, с которыми я сталкивался, есть несколько проблем, которые не позволяют писать простой многопоточный код с использованием этих библиотек. Здесь я расскажу об этих проблемах, и о том, как их можно решить.
Думаю, многие уже знакомы с этой концепцией, но на всякий случай всё же напишу.
Сигнал — это способ отправить уведомление о произвольном событии получателям, которые могут регистрироваться независимо друг от друга. Если угодно, callback с множеством получателей. Или, для тех, кто работал с .NET, multicast delegate.
В однопоточном коде всё выглядит отлично, и работает неплохо, но что насчёт многопоточного?
Тут, к сожалению, есть три общих для разных реализаций проблемы:
Рассмотрим каждую из них подробно. Для этого напишем часть прошивки воображаемой медиа-приставки, а именно три класса:
Сразу скажу, что код, который вы тут увидите, предельно упрощён, и не содержит ничего лишнего, чтобы мы могли сконцентрироваться на этих проблемах. Также вы встретите типы вида TypePtr. Это всего лишь std::shared_ptr<Type>, не пугайтесь.
Итак, StorageManager. Нужен геттер для тех носителей, которые уже вставлены в приставку, и сигнал для уведомления о появлении новых.
Увы, таким интерфейсом невозможно воспользоваться без того, чтобы получить race condition.
Не работает в таком порядке…
… и не работает в таком порядке.
Очевидно, раз мы получили race condition, нам нужен мьютекс.
Этот код будет работать, но у него есть несколько недостатков:
Давайте перенесём всё то, что мы делаем вокруг вызова connect (захват мьютекса и обход коллекции) внутрь.
Тут важно понимать, что алгоритм получения текущего состояния зависит от природы этого самого состояния. Если это коллекция, нужно вызвать обработчик для каждого элемента, если же это, например, enum, то нужно вызвать обработчик ровно один раз. Соответственно, нам нужна некая абстракция.
Добавим в сигнал популятор — функцию, принимающую обработчик, который сейчас подключается, и пусть владелец сигнала (StorageManager, в нашем случае) определяет, каким образом текущее состояние будет отправлено в этот обработчик.
Класс signal_connection пока принимает лямбда-функцию, которая удалит обработчик из списка в сигнале. Чуть дополненный код я приведу позже.
Перепишем StorageManager с использованием этой новой концепции:
Если вы используете C++14, популятор может быть совсем коротким:
Обратите внимание, что при вызове популятора мьютекс захватывается в методе signal::connect, поэтому в теле самого популятора делать этого не нужно.
Клиентский код становится совсем коротким:
Одной строчкой мы одновременно подключаемся к сигналу и получаем текущее состояние объекта. Отлично!
Теперь пора писать MediaScanner. В конструкторе подключимся к сигналу StorageManager::OnStorageAdded, а в деструкторе отключимся.
Увы, этот код время от времени будет падать. Причина в том, как работает метод disconnect во всех известных мне реализациях. Он гарантирует, что когда сигнал будет вызван в следующий раз, соответствующий обработчик не сработает. При этом, если обработчик в это время исполняется в другом потоке, то он не будет прерван, и продолжит работать с разрушенным объектом MediaScanner.
В Qt каждый объект принадлежит определённому потоку, и его обработчики вызываются исключительно в этом потоке. Для безопасного отключения от сигнала, вам следует вызывать метод QObject::deleteLater, который гарантирует, что реальное удаление будет произведено из нужного потока, и что ни один обработчик не будет вызван после удаления.
Это неплохой вариант, если вы готовы полноценно интегрироваться с Qt (отказаться от std::thread в ядре вашей программы в пользу QObject, QThread и прочего).
Буст для решения этой проблемы предлагает использовать методы track/track_foreign в слоте (т. е. обработчике). Эти методы принимают weak_ptr на произвольный объект, и соединение обработчика с сигналом существует, пока жив каждый из объектов, за которым «следит» слот.
Работает это довольно просто: в каждом слоте есть коллекция weak_ptr'ов на отслеживаемые объекты, которые «лочатся» (простите) на время выполнения обработчика. Таким образом, эти объекты гарантированно не разрушаются, пока код обработчика имеет к ним доступ. Если же какой-либо из объектов уже был уничтожен, соединение разрывается.
Проблема в том, что нам для этого требуется иметь weak_ptr на подписываемый объект. На мой взгляд, самый адекватный способ этого достичь — сделать фабричный метод в классе MediaScanner, где подписать создаваемый объект на все интересные ему сигналы:
Итак, недостатки:
Давайте сделаем метод disconnect блокирующим, чтобы он гарантировал нам, что после того, как он вернёт управление, можно уничтожать всё, к чему имел доступ обработчик сигнала. Что-то вроде метода std::thread::join.
Забегая вперёд, скажу, что нам для этого понадобятся три класса:
Код класса signal_connection:
Тут нужно сказать, что я сторонник RAII-шного объекта соединения. Останавливаться подробно на этом не буду, скажу только, что это в данном контексте несущественно.
Класс signal у нас тоже немного поменяется:
Теперь у нас рядом с каждым обработчиком лежит объект life_token::checker, который ссылается на life_token, лежащий в signal_connection. Его мы захватываем на время выполнения обработчика при помощи объекта life_token::checker::execution_guard
Как теперь выглядит код MediaScanner? Ровно так, как нам и хотелось его написать в самом начале:
Пишем MediaUiModel, которая будет реагировать на найденные медиа-файлы и добавлять в себя строки для их отображения.
Для этого добавим в MediaScanner следующий сигнал:
Тут есть две важные вещи:
Помимо предыдущей проблемы, тут есть ещё одна. Каждый раз, когда срабатывает сигнал, мы перекладываем обработчик в поток UI. Если в какой-то момент мы удаляем модель (например, вышли из приложения «Галерея»), все эти обработчики приходят позже в мёртвый объект. И опять падение.
Всё тот же deleteLater, с теми же особенностями.
Если вам повезло, и ваш UI-фреймворк позволяет сказать модели deleteLater, вы спасены. Вам достаточно сделать публичный метод, который будет сначала отключать модель от сигналов, а потом вызывать deleteLater, и вы получите примерно такое же поведение, как в Qt. Правда, предыдущую проблему вам всё же придётся решить. Для этого вы, скорее всего, сделаете внутри модели shared_ptr на некий класс, который и будете подписывать на сигналы. Кода не очень мало, но это дело техники.
Если же вам не повезло, и ваш UI-фреймворк требует удаления модели ровно тогда, когда ему захотелось, вы будете изобретать свой life_token.
Очень просто. Во-первых, сделать интерфейс для потока, как очереди задач:
Во-вторых, сделать в сигнале перегруженный метод connect, который принимает поток:
В этом методе в коллекцию _handlers положить обёртку над обработчиком, которая при вызове перекладывает в нужный поток пару из обработчика и соответствующего life_token::checker. Для вызова реального обработчика в конечном потоке мы будем использовать execution_guard точно так же, как и раньше.
Таким образом, метод disconnect нам будет гарантировать в том числе и то, что асинхронные обработчики тоже не будут вызваны после того, как мы отключились от сигнала.
Код обёртки и перегруженного метода connect я здесь приводить не буду. Думаю, идея ясна и так.
Код модели же становится совсем простым:
Здесь метод AppendRow будет вызываться строго в потоке UI, и лишь до тех пор, пока мы не отключимся.
Итак, есть три ключевые вещи, которые позволяют писать намного более простой код с использованием сигналов:
Конечно, код сигналов, который я тут привёл, очень прост и примитивен, и работает не очень быстро. Моей целью было рассказать об альтернативном подходе, который мне кажется более привлекательным, чем доминирующие сегодня. В реальности все эти вещи можно написать гораздо эффективнее.
Этот подход мы используем в нашем проекте около пяти лет, и очень счастливы.
Я переписал с использованием C++11 с нуля те сигналы, что у нас были, улучшил те части реализации, которые давно стоило улучшить.
Пользуйтесь на здоровье: https://github.com/koplyarov/wigwag.
Судя по реакции людей на реддите и в твиттере, в основном всех волнуют три вопроса:
Q: Тут же нужно блокировать life_token на вызов каждого обработчика. Не будет ли это медленно?
A: Как ни странно, нет. Можно вместо мьютекса использовать атомарные переменные, а если мы таки попали вызовом disconnect в тот момент, когда обработчик исполнялся, ждать на std::condition_variable. Тогда результат абсолютно противоположен: из-за отсутствующего оверхеда в виде track/track_foreign (которые требуют работы с коллекциями weak_ptr), эта реализация и по памяти и по скорости оставляет далеко позади boost::signals2, и даже опережает Qt.
Бенчмарки можно посмотреть тут.
Q: Не будет ли deadlock'ов из-за блокирующего метода disconnect?
A: Да, тут deadlock'и получить действительно чуть проще, чем в бусте и Qt. На мой взгляд, это окупается более простым кодом использования сигналов и более высокой скоростью их работы. К тому же, если аккуратно следить за тем, кто на кого подписан, то такие ситуации — скорее исключение.
Ну и, естественно deadlock'и нужно ловить и чинить. В Linux для этого рекомендую Helgrind. Для Windows двухминутный поиск в гугле даёт Intel Inspector и CHESS.
Если же по какой-то причине вы не можете себе позволить ничего из вышеперечисленного (например, на вашей платформе недостаточно памяти для запуска helgrind или вообще какая-нибудь маргинальная операционная система), есть костылерешение в виде вот такого (опять же, упрощённо) класса мьютекса:
И в Visual Studio и в GCC есть средства для получения бэктрейса в коде. Кроме того, есть неплохой libunwind.
С этим подходом большую часть ваших deadlock'ов поймают QA, а вы при одном взгляде на логи поймёте, где всё заблокировалось. Останется только починить.
Q: Можно ли использовать один мьютекс на несколько сигналов? Можно ли обрабатывать исключения так, как я хочу? Можно ли не использовать синхронизацию, и получить быстрые однопоточные сигналы?
A: Можно, можно, можно. Для этого всего есть шаблонные стратегии. Подробнее — в документации.
Что такое сигналы?
Думаю, многие уже знакомы с этой концепцией, но на всякий случай всё же напишу.
Сигнал — это способ отправить уведомление о произвольном событии получателям, которые могут регистрироваться независимо друг от друга. Если угодно, callback с множеством получателей. Или, для тех, кто работал с .NET, multicast delegate.
Пара примеров с boost::signals2
Объявление сигнала:
Подключение к сигналу и отключение от него:
Вызов сигнала:
struct Button
{
boost::signals2::signal<void()> OnClick;
};
Подключение к сигналу и отключение от него:
void ClickHandler()
{ cout << “Button clicked” << endl; }
// ...
boost::signals2::connection c = button->OnClick.connect(&ClickHandler);
// ...
c.disconnect();
Вызов сигнала:
struct Button
{
boost::signals2::signal<void()> OnClick;
private:
void MouseDownHandler()
{
OnClick();
}
};
Теперь о проблемах
В однопоточном коде всё выглядит отлично, и работает неплохо, но что насчёт многопоточного?
Тут, к сожалению, есть три общих для разных реализаций проблемы:
- Нет способа атомарно подключиться к сигналу и получить связанное состояние
- Неблокирующее отключение от сигнала
- Отключение асинхронного обработчика не отменяет вызовы, которые уже попали в очередь его потока
Рассмотрим каждую из них подробно. Для этого напишем часть прошивки воображаемой медиа-приставки, а именно три класса:
- StorageManager — класс, который реагирует на флэшки, DVD-диски и прочие носители, которые пользователь вставил в приставку
- MediaScanner — класс, который ищет медиа-файлы на каждом из таких устройств
- MediaUiModel — модель для отображения этих медиа-файлов в воображаемом Model-View-что-нибудь фреймворке
Сразу скажу, что код, который вы тут увидите, предельно упрощён, и не содержит ничего лишнего, чтобы мы могли сконцентрироваться на этих проблемах. Также вы встретите типы вида TypePtr. Это всего лишь std::shared_ptr<Type>, не пугайтесь.
Нет способа атомарно подключиться к сигналу и получить связанное состояние
Итак, StorageManager. Нужен геттер для тех носителей, которые уже вставлены в приставку, и сигнал для уведомления о появлении новых.
class StorageManager
{
public:
std::vector<StoragePtr> GetStorages() const;
boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded;
// ...
};
Увы, таким интерфейсом невозможно воспользоваться без того, чтобы получить race condition.
Не работает в таком порядке…
storageManager->OnStorageAdded.connect(&StorageHandler);
// Если пользователь вставляет флэшку до цикла, она будет обработана дважды
for (auto&& storage : storageManager->GetStorages())
StorageHandler(storage);
… и не работает в таком порядке.
for (auto&& storage : storageManager->GetStorages())
StorageHandler(storage);
// Если пользователь вставляет флэшку до подключения к сигналу, она не будет обработана совсем
storageManager->OnStorageAdded.connect(&StorageHandler);
Распространённое решение
Очевидно, раз мы получили race condition, нам нужен мьютекс.
class StorageManager
{
mutable std::recursive_mutex _mutex;
std::vector<StoragePtr> _storages;
public:
StorageManager()
{ /* ... */ }
boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded;
std::recursive_mutex& GetMutex() const
{ return _mutex; }
std::vector<StoragePtr> GetStorages() const
{
std::lock_guard<std::recursive_mutex> l(_mutex);
return _storages;
}
private:
void ReportNewStorage(const StoragePtr& storage)
{
std::lock_guard<std::recursive_mutex> l(_mutex);
_storages.push_back(storage);
OnStorageAdded(storage);
}
};
// ...
{
std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex());
storageManager->OnStorageAdded.connect(&StorageHandler);
for (auto&& storage : storageManager->GetStorages())
StorageHandler(storage);
}
Этот код будет работать, но у него есть несколько недостатков:
- Если вы хотите использовать std::mutex вместо std::recursive_mutex, вы теряете возможность захватывать его внутри метода GetStorages, что делает класс StorageManager непотокобезопасным
- Вы не можете избавиться от копирования коллекции внутри GetStorages, не потеряв потокобезопасность StorageManager'а
- Вам приходится показывать наружу тип std::vector<StoragePtr>, хотя на деле это всего лишь детали реализации
- Довольно объёмный код подключения к сигналу и получения текущего состояния, который при этом почти никак не отличается для разных сигналов
Как сделать лучше?
Давайте перенесём всё то, что мы делаем вокруг вызова connect (захват мьютекса и обход коллекции) внутрь.
Тут важно понимать, что алгоритм получения текущего состояния зависит от природы этого самого состояния. Если это коллекция, нужно вызвать обработчик для каждого элемента, если же это, например, enum, то нужно вызвать обработчик ровно один раз. Соответственно, нам нужна некая абстракция.
Добавим в сигнал популятор — функцию, принимающую обработчик, который сейчас подключается, и пусть владелец сигнала (StorageManager, в нашем случае) определяет, каким образом текущее состояние будет отправлено в этот обработчик.
template < typename Signature >
class signal
{
using populator_type = std::function<void(const std::function<Signature>&)>;
mutable std::mutex _mutex;
std::list<std::function<Signature> > _handlers;
populator_type _populator;
public:
signal(populator_type populator)
: _populator(std::move(populator))
{ }
std::mutex& get_mutex() const { return _mutex; }
signal_connection connect(std::function<Signature> handler)
{
std::lock_guard<std::mutex> l(_mutex);
_populator(handler); // Владелец сигнала определяет конкретный алгоритм получения состояния
_handlers.push_back(std::move(handler));
return signal_connection([&]() { /* удаляем обработчик из _handlers */ } );
}
// ...
};
Класс signal_connection пока принимает лямбда-функцию, которая удалит обработчик из списка в сигнале. Чуть дополненный код я приведу позже.
Перепишем StorageManager с использованием этой новой концепции:
class StorageManager
{
std::vector<StoragePtr> _storages;
public:
StorageManager()
: _storages([&](const std::function<void(const StoragePtr&)>& h) { for (auto&& s : _storages) h(s); })
{ /* ... */ }
signal<void(const StoragePtr&)> OnStorageAdded;
private:
void ReportNewStorage(const StoragePtr& storage)
{
// Мы должны захватить мьютекс именно тут, а не внутри вызова сигнала,
// потому что он защищает в том числе и коллекцию _storages
std::lock_guard<std::mutex> l(OnStorageAdded.get_mutex());
_storages.push_back(storage);
OnStorageAdded(storage);
}
};
Если вы используете C++14, популятор может быть совсем коротким:
StorageManager()
: _storages([&](auto&& h) { for (auto&& s : _storages) h(s); })
{ }
Обратите внимание, что при вызове популятора мьютекс захватывается в методе signal::connect, поэтому в теле самого популятора делать этого не нужно.
Клиентский код становится совсем коротким:
storageManager->OnStorageAdded.connect(&StorageHandler);
Одной строчкой мы одновременно подключаемся к сигналу и получаем текущее состояние объекта. Отлично!
Неблокирующее отключение от сигнала
Теперь пора писать MediaScanner. В конструкторе подключимся к сигналу StorageManager::OnStorageAdded, а в деструкторе отключимся.
class MediaScanner
{
private:
boost::signals2::connection _connection;
public:
MediaScanner(const StorageManagerPtr& storageManager)
{ _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); }
~MediaScanner()
{
_connection.disconnect();
// Обработчик сигнала может всё ещё исполняться в потоке, вызвавшем сигнал.
// В этом случае, далее он будет обращаться к разрушенному объекту MediaScanner.
}
private:
void StorageHandler(const StoragePtr& storage)
{ /* Здесь что-то долгое */ }
};
Увы, этот код время от времени будет падать. Причина в том, как работает метод disconnect во всех известных мне реализациях. Он гарантирует, что когда сигнал будет вызван в следующий раз, соответствующий обработчик не сработает. При этом, если обработчик в это время исполняется в другом потоке, то он не будет прерван, и продолжит работать с разрушенным объектом MediaScanner.
Решение в Qt
В Qt каждый объект принадлежит определённому потоку, и его обработчики вызываются исключительно в этом потоке. Для безопасного отключения от сигнала, вам следует вызывать метод QObject::deleteLater, который гарантирует, что реальное удаление будет произведено из нужного потока, и что ни один обработчик не будет вызван после удаления.
mediaScanner->deleteLater();
Это неплохой вариант, если вы готовы полноценно интегрироваться с Qt (отказаться от std::thread в ядре вашей программы в пользу QObject, QThread и прочего).
Решение в boost::signals2
Буст для решения этой проблемы предлагает использовать методы track/track_foreign в слоте (т. е. обработчике). Эти методы принимают weak_ptr на произвольный объект, и соединение обработчика с сигналом существует, пока жив каждый из объектов, за которым «следит» слот.
Работает это довольно просто: в каждом слоте есть коллекция weak_ptr'ов на отслеживаемые объекты, которые «лочатся» (простите) на время выполнения обработчика. Таким образом, эти объекты гарантированно не разрушаются, пока код обработчика имеет к ним доступ. Если же какой-либо из объектов уже был уничтожен, соединение разрывается.
Проблема в том, что нам для этого требуется иметь weak_ptr на подписываемый объект. На мой взгляд, самый адекватный способ этого достичь — сделать фабричный метод в классе MediaScanner, где подписать создаваемый объект на все интересные ему сигналы:
class MediaScanner
{
public:
static std::shared_ptr<MediaScanner> Create(const StorageManagerPtr& storageManager)
{
std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex());
MediaScannerPtr result(new MediaScanner);
boost::signals2::signal<void(const StoragePtr&)>::slot_type
slot(bind(&MediaScanner::StorageHandler, result.get(), _1));
slot.track_foreign(result);
storageManager->OnStorageAdded.connect(slot);
for (auto&& storage : storageManager->GetStorages())
result->StorageHandler(storage);
return result;
}
private:
MediaScanner() // приватный конструктор!
{ /* Проинициализировать всё, кроме обработчиков сигналов */ }
void StorageHandler(const StoragePtr& storage);
{ /* Здесь что-то долгое */ }
};
Итак, недостатки:
- О-очень много кода, который вы каждый раз будете копипастить
- Инициализация MediaScanner'а распалась на две части: подписывание на сигналы в методе Create, и всё остальное в конструкторе
- Вы обязаны использовать shared_ptr для хранения MediaScanner
- Вы не уверены, что MediaScanner удалён, когда вы отпустили последнюю внешнюю ссылку на него. Это может быть проблемой, если он использует какой-либо ограниченный ресурс, который вы хотите переиспользовать после освобождения MediaScanner
Как сделать лучше?
Давайте сделаем метод disconnect блокирующим, чтобы он гарантировал нам, что после того, как он вернёт управление, можно уничтожать всё, к чему имел доступ обработчик сигнала. Что-то вроде метода std::thread::join.
Забегая вперёд, скажу, что нам для этого понадобятся три класса:
- life_token — контролирует время жизни обработчика, позволяет пометить его, как «умирающий», и дождаться окончания исполнения, если необходимо
- life_token::checker — хранится внутри сигнала рядом с обработчиком, ссылается на свой life_token
- life_token::checker::execution_guard — создаётся на стеке на время исполнения обработчика, блокирует соответствующий life_token и позволяет проверить, не «умер» ли обработчик ранее
Код класса signal_connection:
class signal_connection
{
life_token _token;
std::function<void()> _eraseHandlerFunc;
public:
signal_connection(life_token token, std::function<void()> eraseHandlerFunc)
: _token(token), _eraseHandlerFunc(eraseHandlerFunc)
{ }
~signal_connection();
{ disconnect(); }
void disconnect()
{
if (_token.released())
return;
_token.release(); // Тут мы ждём, если обработчик сейчас заблокирован (т. е. исполняется)
_eraseHandler(); // Та самая лямбда-функция, которая удалит обработчик из списка
}
};
Тут нужно сказать, что я сторонник RAII-шного объекта соединения. Останавливаться подробно на этом не буду, скажу только, что это в данном контексте несущественно.
Класс signal у нас тоже немного поменяется:
template < typename Signature >
class signal
{
using populator_type = std::function<void(const std::function<Signature>&)>;
struct handler
{
std::function<Signature> handler_func;
life_token::checker life_checker;
};
mutable std::mutex _mutex;
std::list<handler> _handlers;
populator_type _populator;
public:
// ...
signal_connection connect(std::function<Signature> handler)
{
std::lock_guard<std::mutex> l(_mutex);
life_token token;
_populator(handler);
_handlers.push_back(Handler{std::move(handler), life_token::checker(token)});
return signal_connection(token, [&]() { /* удаляем обработчик из _handlers */ } );
}
template < typename... Args >
void operator() (Args&&... args) const
{
for (auto&& handler : _handlers)
{
life_token::checker::execution_guard g(handler.life_checker);
if (g.is_alive())
handler.handler_func(forward<Args>(args)...);
}
}
};
Теперь у нас рядом с каждым обработчиком лежит объект life_token::checker, который ссылается на life_token, лежащий в signal_connection. Его мы захватываем на время выполнения обработчика при помощи объекта life_token::checker::execution_guard
Реализацию этих объектов спрячу под спойлер. Если устали, можете пропустить.
Внутри life_token нам понадобятся следующие вещи:
Мьютекс захватывается на время жизни execution_guard. Соответственно, если в другом потоке в это время будет вызван метод life_token::release, он заблокируется на захвате того же мьютекса и дождётся окончания выполнения обработчика сигнала. После этого он сбросит флаг alive, и все последующие вызовы сигнала не приведут к вызову обработчика.
- Какой-то примитив операционной системы для ожидания в life_token::release (здесь для простоты возьмём мьютекс)
- Флаг «жив/мёртв»
- Счётчик блокировки через execution_guard (здесь для простоты опустим)
class life_token
{
struct impl
{
std::mutex mutex;
bool alive = true;
};
std::shared_ptr<impl> _impl;
public:
life_token() : _impl(std::make_shared<impl>()) { }
~life_token() { release(); }
bool released() const { return !_impl; }
void release()
{
if (released())
return;
std::lock_guard<std::mutex> l(_impl->mutex);
_impl->alive = false;
_impl.reset();
}
class checker
{
shared_ptr<impl> _impl;
public:
checker(const life_token& t) : _impl(t._impl) { }
class execution_guard
{
shared_ptr<Impl> _impl;
public:
execution_guard(const checker& c) : _impl(c._impl) { _impl->mutex.lock(); }
~execution_guard() { _impl->mutex.unlock(); }
bool is_alive() const { return _impl->alive; }
};
};
};
Мьютекс захватывается на время жизни execution_guard. Соответственно, если в другом потоке в это время будет вызван метод life_token::release, он заблокируется на захвате того же мьютекса и дождётся окончания выполнения обработчика сигнала. После этого он сбросит флаг alive, и все последующие вызовы сигнала не приведут к вызову обработчика.
Как теперь выглядит код MediaScanner? Ровно так, как нам и хотелось его написать в самом начале:
class MediaScanner
{
private:
signals_connection _connection;
public:
MediaScanner(const StorageManagerPtr& storageManager)
{ _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); }
~MediaScanner()
{ _connection.disconnect(); }
private:
void StorageHandler(const StoragePtr& storage)
{ /* Здесь что-то долгое */ }
};
Отключение асинхронного обработчика не отменяет вызовы, которые уже попали в очередь его потока
Пишем MediaUiModel, которая будет реагировать на найденные медиа-файлы и добавлять в себя строки для их отображения.
Для этого добавим в MediaScanner следующий сигнал:
signal<void(const MediaPtr&)> OnMediaFound;
Тут есть две важные вещи:
- Модель — это объект UI-библиотеки, поэтому все действия с ней должны производиться из потока UI.
- Часто в UI-библиотеках используется своя иерархия владения, поэтому мы не можем использовать shared_ptr для хранения модели. Соответственно, фокус с track/track_foreign тут не пройдёт, но это сейчас не главное, так что притворимся, что всё хорошо
class MediaUiModel : public UiModel<MediaUiModelRow>
{
private:
boost::io_service& _uiThread;
boost::signals2::connection _connection;
public:
MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner)
: _uiThread(uiThread)
{
std::lock_guard<std::recursive_mutex> l(scanner.GetMutex());
scanner.OnMediaFound.connect([&](const MediaPtr& m) { this->MediaHandler(m); });
for (auto&& m : scanner.GetMedia())
AppendRow(MediaUiModelRow(m))
}
~MediaUiModel()
{ _connection.disconnect(); }
private:
// Этот метод выполняется в потоке MediaScanner'а, и всю реальную работу перебрасывает в поток UI.
void MediaHandler(const MediaPtr& m)
{ _uiThread.post([&]() { this->AppendRow(MediaUiModelRow(m)); }); }
};
Помимо предыдущей проблемы, тут есть ещё одна. Каждый раз, когда срабатывает сигнал, мы перекладываем обработчик в поток UI. Если в какой-то момент мы удаляем модель (например, вышли из приложения «Галерея»), все эти обработчики приходят позже в мёртвый объект. И опять падение.
Решение в Qt
Всё тот же deleteLater, с теми же особенностями.
Решение в boost::signals2
Если вам повезло, и ваш UI-фреймворк позволяет сказать модели deleteLater, вы спасены. Вам достаточно сделать публичный метод, который будет сначала отключать модель от сигналов, а потом вызывать deleteLater, и вы получите примерно такое же поведение, как в Qt. Правда, предыдущую проблему вам всё же придётся решить. Для этого вы, скорее всего, сделаете внутри модели shared_ptr на некий класс, который и будете подписывать на сигналы. Кода не очень мало, но это дело техники.
Если же вам не повезло, и ваш UI-фреймворк требует удаления модели ровно тогда, когда ему захотелось, вы будете изобретать свой life_token.
Например, как-то так (тоже лучше не читайте, если устали).
Я даже не стану комментировать этот код, давайте просто немного погрустим.
template < typename Signature_ >
class AsyncToUiHandlerWrapper
{
private:
boost::io_service& _uiThread;
std::function<Signature_> _realHandler;
bool _released;
mutable std::mutex _mutex;
public:
AsyncToUiHandlerWrapper(boost::io_service& uiThread, std::function<Signature_> realHandler)
: _uiThread(uiThread), _realHandler(realHandler), _released(false)
{ }
void Release()
{
std::lock_guard<std::mutex> l(_mutex);
_released = true;
}
template < typename... Args_ >
static void AsyncHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args)
{
auto self = selfWeak.lock();
std::lock_guard<std::mutex> l(self->_mutex);
if (!self->_released) // AsyncToUiHandlerWrapper не был освобождён, значит _uiThread всё ещё ссылается на живой объект
self->_uiThread.post(std::bind(&AsyncToUiHandlerWrapper::UiThreadHandler<Args_&...>, selfWeak, std::forward<Args_>(args)...)));
}
private:
template < typename... Args_ >
static void UiThreadHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args)
{
auto self = selfWeak.lock();
if (!self)
return;
if (!self->_released) // AsyncToUiHandlerWrapper не был освобождён, значит, объекты, доступные _realHandler, ещё живы
self->_realHandler(std::forward<Args_>(args)...);
}
};
class MediaUiModel : public UiModel<MediaUiModelRow>
{
private:
using AsyncMediaHandler = AsyncToUiHandlerWrapper<void(const MediaPtr&)>;
private:
std::shared_ptr<AsyncMediaHandler> _asyncHandler;
public:
MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner)
{
try
{
_asyncHandler = std::make_shared<AsyncMediaHandler>(std::ref(uiThread), [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); });
std::lock_guard<std::recursive_mutex> l(scanner.GetMutex());
boost::signals2::signal<void(const MediaPtr&)>::slot_type
slot(std::bind(&AsyncMediaHandler::AsyncHandler<const MediaPtr&>, std::weak_ptr<AsyncMediaHandler>(_asyncHandler), std::placeholders::_1));
slot.track_foreign(_asyncHandler);
scanner.OnMediaFound.connect(slot);
for (auto&& m : scanner.GetMedia())
AppendRow(MediaUiModelRow(m));
}
catch (...)
{
Destroy();
throw;
}
}
~MediaUiModel()
{ Destroy(); }
private:
void Destroy()
{
if (_asyncHandler)
_asyncHandler->Release(); // Асинхронный код не обращается к MediaUiModel после этой строки, так что можно окончательно разрушать объект
_asyncHandler.reset();
}
};
Я даже не стану комментировать этот код, давайте просто немного погрустим.
Как сделать лучше?
Очень просто. Во-первых, сделать интерфейс для потока, как очереди задач:
struct task_executor
{
virtual ~task_executor() { }
virtual void add_task(const std::function<void()>& task) = 0;
};
Во-вторых, сделать в сигнале перегруженный метод connect, который принимает поток:
signal_connection connect(const std::shared_ptr<task_executor>& worker, std::function<Signature> handler);
В этом методе в коллекцию _handlers положить обёртку над обработчиком, которая при вызове перекладывает в нужный поток пару из обработчика и соответствующего life_token::checker. Для вызова реального обработчика в конечном потоке мы будем использовать execution_guard точно так же, как и раньше.
Таким образом, метод disconnect нам будет гарантировать в том числе и то, что асинхронные обработчики тоже не будут вызваны после того, как мы отключились от сигнала.
Код обёртки и перегруженного метода connect я здесь приводить не буду. Думаю, идея ясна и так.
Код модели же становится совсем простым:
class MediaUiModel : public UiModel<MediaUiModelRow>
{
private:
signal_connection _connection;
public:
MediaUiModel(const std::shared_ptr<task_executor>& uiThread, const MediaScanner& scanner)
{ _connection = scanner.OnMediaFound.connect(uiThread, [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); }
~MediaUiModel()
{ _connection.reset(); }
};
Здесь метод AppendRow будет вызываться строго в потоке UI, и лишь до тех пор, пока мы не отключимся.
Подводя итоги
Итак, есть три ключевые вещи, которые позволяют писать намного более простой код с использованием сигналов:
- Популяторы позволяют удобно получать текущее состояние во время подключения к сигналу
- Блокирующий метод disconnect позволяет отписывать объект в его же деструкторе
- Чтобы предыдущий пункт был верен и для асинхронных обработчиков, disconnect должен также помечать те вызовы, которые уже лежат в очереди потока, как «неактуальные»
Конечно, код сигналов, который я тут привёл, очень прост и примитивен, и работает не очень быстро. Моей целью было рассказать об альтернативном подходе, который мне кажется более привлекательным, чем доминирующие сегодня. В реальности все эти вещи можно написать гораздо эффективнее.
Этот подход мы используем в нашем проекте около пяти лет, и очень счастливы.
Готовая реализация
Я переписал с использованием C++11 с нуля те сигналы, что у нас были, улучшил те части реализации, которые давно стоило улучшить.
Пользуйтесь на здоровье: https://github.com/koplyarov/wigwag.
Мини-FAQ
Судя по реакции людей на реддите и в твиттере, в основном всех волнуют три вопроса:
Q: Тут же нужно блокировать life_token на вызов каждого обработчика. Не будет ли это медленно?
A: Как ни странно, нет. Можно вместо мьютекса использовать атомарные переменные, а если мы таки попали вызовом disconnect в тот момент, когда обработчик исполнялся, ждать на std::condition_variable. Тогда результат абсолютно противоположен: из-за отсутствующего оверхеда в виде track/track_foreign (которые требуют работы с коллекциями weak_ptr), эта реализация и по памяти и по скорости оставляет далеко позади boost::signals2, и даже опережает Qt.
Бенчмарки можно посмотреть тут.
Q: Не будет ли deadlock'ов из-за блокирующего метода disconnect?
A: Да, тут deadlock'и получить действительно чуть проще, чем в бусте и Qt. На мой взгляд, это окупается более простым кодом использования сигналов и более высокой скоростью их работы. К тому же, если аккуратно следить за тем, кто на кого подписан, то такие ситуации — скорее исключение.
Ну и, естественно deadlock'и нужно ловить и чинить. В Linux для этого рекомендую Helgrind. Для Windows двухминутный поиск в гугле даёт Intel Inspector и CHESS.
Если же по какой-то причине вы не можете себе позволить ничего из вышеперечисленного (например, на вашей платформе недостаточно памяти для запуска helgrind или вообще какая-нибудь маргинальная операционная система), есть костылерешение в виде вот такого (опять же, упрощённо) класса мьютекса:
class mutex
{
private:
std::timed_mutex _m;
public:
void lock()
{
if (_m.try_lock())
return;
while (!_m.try_lock_for(std::chrono::seconds(10)))
Logger::Warning() << "Could not lock mutex " << (void*)this << " for a long time:\n" << get_backtrace_string();
}
// ...
};
И в Visual Studio и в GCC есть средства для получения бэктрейса в коде. Кроме того, есть неплохой libunwind.
С этим подходом большую часть ваших deadlock'ов поймают QA, а вы при одном взгляде на логи поймёте, где всё заблокировалось. Останется только починить.
Q: Можно ли использовать один мьютекс на несколько сигналов? Можно ли обрабатывать исключения так, как я хочу? Можно ли не использовать синхронизацию, и получить быстрые однопоточные сигналы?
A: Можно, можно, можно. Для этого всего есть шаблонные стратегии. Подробнее — в документации.
darkxanter
В Qt при удалении объекта унаследованого от QObject автоматически отключаются сигналы связанные с ним.
koplyarov_da
Правильно, но это работает только тогда когда и владелец сигнала, и владелец слота принадлежат одному потоку. Это почти всегда верно в UI, и совсем не всегда верно в остальных частях проекта.
"Deleting a QObject while pending events are waiting to be delivered can cause a crash. You must not delete the QObject directly if it exists in a different thread than the one currently executing. Use deleteLater() instead, which will cause the event loop to delete the object after all pending events have been delivered to it."
http://doc.qt.io/qt-5/qobject.html#dtor.QObject
darkxanter
Прямое удаление через delete может привести к падению программы. Если удалять через deleteLater(), то безопасно. Вы ведь привели как раз эту цитату из документации.