Привет. В этой публикации речь пойдёт о многопоточке, каналах, подписках, планировании регулярных задач, мониторинге, нейросетях и самоэволюционирующих алгоритмах.
Для начала небольшое вступление, а потом пойдёт C++ код.
На текущем месте работы мы разрабатываем SDK которое используется в нескольких известных крупных приложениях. Это гигабайты исходного кода. Под такой крупный проект были разработаны собственная архитектура и инструментарий так как в какой-то момент существующих инструментов C++ STL и Qt просто перестало хватать. Все вы знаете как работает std::future. Это неудобно и поэтому у нас разработали альтернативный future который позволяет собирать цепочку из нескольких future связывая их между собой логикой с последующей отправкой на целевой поток для выполнения(на экзекьютор). Например собирается такая цепочка: после того как future будет готов затем выполни следующий future передав параметром результат работы предыдущего, параллельно запусти ещё одну future -> если все фьючи готовы то запусти 2 future по подготовке результата по offline данным и online данным -> дальше результат того кто успел выполниться первым кидай на рендеринг через канал публикации.. При этом есть возможность явно указать на каких потоках(одно/много поточных экзекьюторах) должна выполняться каждая из future - это позволяет с рендер потока вынести сложные вычисления и с результатом прийти обратно в рендер поток. Также у нас есть каналы - своя реализация паттерна Издатель&Подписчики. Подписчики на сигналы могут указать на каком экзекьюторе должен вызываться колбэк. Это позволило написать однопоточные библиотеки внутри которых нет ни одного мьютекса, а синхронизация с внешними потребителями API происходит через прокси интерфейсы (прокси при необходимости перекидывает вызовы на внутренний поток объекта). Архитектура приложения строится на создании крупных сущностей (библиотек/сервисов) которые вовне выставляют свои интерфейсы с исходящими каналами на которые можно подписаться, и методы через которые можно запушить данные во входящие каналы. SDK пишут несколько команд разработчиков, у каждой команды свой репозиторий со своими правилами и некоторые команды продолжают разрабатывать традиционно: обычные методы которые могут дёргать из разных потоков, а значит мьютексы на которых периодически кто-то подрывается. Так например иногда доставка сигналов и прямые вызовы методов из разных мест образует некий узор в котором параллельные потоки упираются в один мьютекс. Это может быть тяжело воспроизводимый дедлок или борьба за мьютекс когда два параллельных потока начинают работать строго по очереди. В большом проекте где работают десятки потоков экзекьюторов найти причину остановки/замедления бывает очень непросто, экзекьютор с зависшим потоком можно вообще не заметить (ну перестали какие-то данные обновляться с серверов - может же просто новых данных нет). Очевидным решением кажется отказ от мьютексов и мониторинг фризов. В идеале если бы экзекьюторы могли само восстанавливаться и жаловаться о проблемах (в лог или на сервер или иначе).
В очередные полгода я оглянулся посмотреть что было сделано за этот период и обнаружил что занимался только тем что разбирался с проблемами. Тогда я решил переосмыслить то как должны работать экзекьюторы/каналы/фьючи и начал по выходным реализовывать свои идеи в виде C++ библиотеки (состоит только из заголовочных файлов).
На данный момент эта библиотека предоставляет:
1) Однопоточные/многопоточные само восстанавливающиеся магистрали (так решил назвать экзекьюторы чтобы сломать шаблон). Постановка задач на исполнение происходит без мьютексов(сделал простой стек на атомиках). Многопоточные магистрали умеют динамически наращивать/сокращать потоки. Система мониторинга умеет ремонтировать зависшие магистрали. Все магистрали умеют жаловаться на падающие и зависающие задачи в колбэк лога(если установили). Магистрали можно ставить на паузу (например чтобы соответствовать жизненному циклу приложения на мобильнике - незачем тратить ресурсы мобильника на то что пользователь уже не смотрит). Магистрали сообщают выполняющимся задачам о том что выполнение следует прекратить (в параметры колбэка подаётся id с которым задача запускалась и atomic id который инкрементируется если магистраль останавливается или меняет поток). Магистрали умеют перепланировать выполнение задачи - в колбэк задачи подаётся управляющая структура где задача может указать надо ли её ещё раз запускать и через какой интервал времени (например можно запостить микросервисы которые будут перепланировать себя каждые 5-10-15 секунд кому как надо и так шарить один поток подобно корутинам). Постить на магистраль можно лямбды, функторы, уникальные или шаред указатели на функторы, в колбэках может быть такой интерфейс какой удобно (во время компиляции определяется с какими параметрами допустимо вызывать колбэк - только эти параметры в Ваш колбэк и передаются).
2) Исполняемые задачи защищённые протектором - обычно это std::weak_ptr на объект чей код исполняется или просто любой std::weak_ptr (вообще можно использовать любой объект реализующий метод lock()) который удобно сделать невалидным если условия изменились (например пользователь перелетел на экране в другую область и нет уже смысла трудиться над тем что ушло с экрана). Задачи могут перепланировать себя (например функтор может регулярно исполняться сохраняя своё текущее состояние внутри своего объекта).
3) Каналы (реализация шаблона Publisher&Subscribers) позволяют доставлять сообщения подписчикам, каждому в своём потоке и без единого мьютекса. Подписчик может удалять свои подписки, Издатель может удалять протухших подписчиков. Подписчик протух если включённый в подписку протектор на вызов lock() вернул false(например объект-хозяин лямбды колбэка был удалён и теперь std::weak_ptr на него больше не работает). BufferedRetransmitter позволяет буферизовать последний сигнал, фильтровать повторяющиеся значения, отправлять последний полученный сигнал новым подписчикам в момент подключения (похоже на Sticky intent на Android). Есть каналы извещающие Издателя о первом подключении и последнем отключившемся подписчике - это удобно для включения/выключения сервисов Издателя чтобы те не потребляли ресурсов если продукт сервиса в данный момент никому не нужен.
4) Узлы блок-схемы алгоритма (реализация Future + входящие/исходящие каналы) - из них можно построить логику любой сложности и отправить на исполнение. Обычно из фьюч собирали только линейные цепочки - теперь можно построить блок-схему с циклами, логическим ветвлением, агрегирующими узлами, узкоспециализированными или кастомными самописными узлами. Связь между узлами происходит через каналы подписки - это можно делать динамически. Так можно во время исполнения блок-схемы создавать новые части блок-схемы, подключать/менять/удалять, Runtime менять логику работы программы. Например тренировать под возникающие задачи нейросети и встраивать такие узлы в основной алгоритм. Или в системе распознавания речи в случае неудач выделять новые тембры голоса, тренировать акустическую модель под новые тембры и добавлять в блок схему распознавания новый узел в разветвитель по тембрам. Ещё можно использовать для визуального программирования соединяя одним пальцем узлы кода например в конвейер видеоаналитики.
5) Безмьютексный почтовый ящик - благодаря ему всё вышеописанное работает. Кроме передачи данных между потоками у почтового ящика ещё одна очень важная функция: ограничение потребления оперативной памяти. При передаче данных между потоками есть опасность: если один поток генерирует объекты быстрее чем другой поток их потребляет, то рано или поздно очередь сгенерированных объектов съест всю оперативную память. Возможность ограничивать потребляемую память появилась как побочный бонус от ускорения: чтобы объект поместить в очередь нужен контейнер с указателем для построения связанного списка - холдер. Количество таких холдеров ограничено(==контроль оперативной памяти) и они переиспользуются (==скорость). Кому ограничение не нужно могут установить максимальное количество холдеров в недостижимо большое число - так будет гарантировано что программа упадёт от нехватки памяти раньше чем столкнётся с нехваткой холдеров. Кому же важна стабильность тот может разделить какие задачи обязательно должны быть выполнены, а какие могут быть пропущены - при постановке задач на выполнение и в подписках указывается параметр send_may_fail, если он false то будет ожидание на семафоре появления свободного холдера.
1. Хайвеи и задачи
Простой пример запуска лямбды на хайвее:
// Create executor
const auto highway = hi::make_self_shared<hi::SerialHighWay<>>();
// Some data that task need to work
auto unique = std::make_unique<std::string>("hello to std::function<> ;))");
// Post task
highway->post([&, unique = std::move(unique)]
{
scope.print(std::string{"unique: "}.append(*unique));
});
В этом примере захватывается в лямбду некий внешний объект "scope" по ссылке, и если объект успеет уничтожиться к тому моменту как на другом потоке начнёт обрабатываться лямбда, то будет не очень. Поэтому рекомендованным решением является добавление протектора (std::weak_ptr на объект порождающий лямбду) - тогда если к моменту запуска std::weak_ptr протух, то запускать смысла уже нет. Такой подход удобен в том числе чтобы снимать задачи потерявшие актуальность с очереди на исполнение.
Тут есть нюанс - как в конструкторе объекта сразу получать std::weak_ptr на себя чтобы можно было сразу настраивать подписки&расписания (STL так не умеет)?
В библиотеке эта проблема решена. В следующем примере используется std::weak_ptr<SelfProtectedTask> в качестве защиты от запуска лямбды если целевой объект уже разрушился при выходе из скоупа:
{ // scope
auto service1 = hi::make_self_shared<SelfProtectedTask>("service1 shouldn't start", highway, scope);
}
auto service2 = hi::make_self_shared<SelfProtectedTask>("service2 must start", highway, scope);
Кроме описанной выше защиты от SIGSEGV (при обращении к разрушенному объекту), ещё есть защита от зависаний - каждый поток на старте получает идентификатор с которым он стартовал и ссылку на глобальный atomic идентификатора текущего потока. Если global_run_id и your_run_id перестали совпадать - значит пора останавливаться. Эти же переменные опционально передаются как параметры в исполняемые задачи:
highway->post(
[&](const std::atomic<std::uint32_t> & global_run_id, const std::uint32_t your_run_id)
{
// Long running algorithm
while (global_run_id == your_run_id)
{
std::this_thread::sleep_for(10ms);
}
});
Хайвей запоминает время старта каждой задачи и можно снаружи через метод self_check() попросить хайвей проверить не завис ли он на этой задаче. Если хайвей обнаруживает что поток завис, то он стартует новый поток который продолжает обработку новых задач. Старый поток остановится когда увидит что у него global_run_id и your_run_id перестали совпадать. Для регулярного запуска метода self_check() был создан HighWaysMonitoring: пример использования, юнит тест.
В примерах Вы могли заметить что в хайвеи пробрасывается колбэк для логирования - если от SIGSEGV защититься не удалось и приложение упало, то в "Debug"версиях хайвеев в логе будут координаты кода (файл & номер строки) который запускался последним == лёгкий способ локализовать где случилось падение:
{"milliseconds_since_epoch":1652608167955,"thread":140737348142848,"msg":{"what":"MailBoxMessage:start","highway_name":"SerialHighWayDebug:monitor_SIGSEGV","code_filename":"/home/dbond/workspace/thread_highways_simulator/thread_highways/examples/highways/monitoring/src/monitoring.cpp","code_line":150}}
Конечно можно анализировать дампы, но когда твою библиотеку SDK третьи фирмы встраивают в свой софт, в дампе адреса часто вообще не попадают в код или попадают в код где ломаться нечему так как глючный код третьих разработчиков прошёлся по всей доступной процессу памяти).
Перейдём к примеру запуска сервиса который регулярно совершает некую работу:
highway->add_reschedulable_runnable(
[&, i = 0](hi::ReschedulableRunnable::Schedule & schedule) mutable
{
++i;
scope.print(std::string{"number of launches: "}.append(std::to_string(i)));
if (i < 10)
{
schedule.schedule_launch_in(10ms);
} else {
promise.set_value(true);
}
},
__FILE__,
__LINE__);
В колбэк подаётся управляющая структура Schedule которую смотрит хайвей после завершения работы колбэка. Если дальнейшие перезапуски не запланированы, то хайвей удаляет задачу как завершённую. Иначе в следующее окно времени Schedule будет проанализирован на предмет не пора ли запускать эту задачу. Так некий регулярный сервис может перепланировать свои запуски с требуемой частотой (например проверять данные на сервере раз в 10 секунд).
Обычно нагрузка идёт волнами - то задач много, то ни одной. В ConcurrentHighWay реализован механизм инерционного наращивания и сокращения потоков согласно текущей нагрузке:
if (last_change_time > workers_change_period_)
{
last_time_workers_count_change = sleep_finish_time;
if (average_work_time > average_sleep_time)
{
workers_count = increase_workers(workers);
}
else if (average_work_time < average_sleep_time)
{
workers_count = decrease_workers(workers);
}
}
Как потоки наращиваются под нагрузкой и сокращаются без нагрузки можно понаблюдать на примере. То что изменение количества потоков происходит с инерцией хорошо для продакшена, но не очень удобно для юнит теста так как поведение не детерминировано. В юнит тесте можно увидеть как тюнинговать логику работы хайвея при помощи параметра шаблона FreeTimeLogic - эта фишка появилась когда я обратил внимание что обычно главный цикл(реактор) непонятно чем занять пока задач нет и решил инжектировать при компиляции некую логику времени простоя. Инжектируемая логика позволяет оценить насколько загружен данный поток и нужны ли дополнительные потоки или добавить рычаг постановки потока на паузу или добавить реактор своего кастомного сервиса/корутины. В упомянутом юнит тесте я не стал полагаться на расчёты среднего времени нагрузки и простоя, добавил переключатель HRstrategy для регуляции количества потоков:
struct CustomFreeTimeLogic
{
enum class HRstrategy
{
DecreaseWorkers,
FreezeNumberOfWorkers,
IncreaseWorkers
};
Кто разрабатывал мобильные приложения скорее всего задумывался над тем что делать когда работа приложения приостанавливается - если приложение тяжёлое, то всё выгружать/загружать каждый раз когда пользователь отвлекается == это будет выглядеть как тормоза. С другой стороны тратить ресурсы в фоне тоже не лучшее решение (аккумулятор надо экономить). А вот бы можно было std::thread-ы ставить на паузу, а когда пользователь вернётся к игре - просто снимать с паузы...
Хайвеи можно ставить на паузу - смотри юнит тест.
2. Издатели и подписчики
Существует много различных реализаций каналов передачи публикаций. У тех, с которыми встречался я ранее, были проблемы которые приводили к вынужденному использованию рекурсивных мьютексов, а также дедлоки при обращению к закешированной в канале публикации. Мьютексы и спинлоки - это зло: мьютекс похож на мину, а спинлок греет процессор и некоторые операционки такое могут просто SIGABRTировать. Зло удалось победить разбив кейсы на частные случаи:
PublishManyForOne - кто угодно с любого потока может послать данные подписчику (который раз и навсегда задаётся сразу в конструкторе). Подписчик получает данные в своём потоке.
PublishOneForMany - издатель со своего потока может публиковать для большого количества подписчиков. Издатель отслеживает подписчиков в своём потоке, подписчики каждый получает публикацию в своём потоке. Подписки удаляются когда протухает протектор подписки и это происходит без мьютексов так как издатель в однопоточке.
PublishManyForMany - можно слать данные с любого потока, и подписчики могут подписываться с любого потока. Этот канал создан для случаев когда подписчиков удалять не требуется.
PublishManyForManyCanUnSubscribe - как PublishManyForMany + умеет удалять протухших подписчиков (но под мьютексом).
Вообще в обычных ситуациях должно хватать первых двух:
PublishManyForOne - если много потоков отправляют свои сигналы в общий коллектор
PublishOneForMany - если один поток публикует для динамически меняющихся подписчиков.
Если подписаться PublishManyForOne->PublishOneForMany, то с обоих концов получается Many, и это работает без мьютексов с удалением протухших подписчиков.
BufferedRetransmitter позволяет новым подписчикам сразу получать сигнал с последней закешированной публикацией и/или фильтровать публикации от дублей(например когда интересны только изменения) - это будет работать без мьютекса если тип данных у публикации позволит поместить её в std::atomic.
У паттерна Издатель&Подписчик (Publish&Subscribe) есть изъян: обычно Издатель делает публикации вне зависимости от того есть ли в данный момент подписчики. Представьте что в вашем приложении десяток микросервисов выедает последний аккумуляторный запас на мобильнике в то время как пользователь в этот момент просто вводит буквы в форме на весь экран и весь этот бесценный контент, который публикуют микросервисы, просто не видит. На работе эту проблему победили тем что на существующие каналы я сделал враппер который отслеживает количество подключений и дёргает колбэки когда подключается первый подписчик или отключается последний - появился инструмент для запуска сервиса когда появляются подписчики, и останавливать когда все подписчики отключились. В библиотеке thread_highways сделал иначе: PublishManyForManyWithConnectionsNotifier и PublishOneForManyWithConnectionsNotifier - юнит тест и пример использования:
const auto publisher =
hi::make_self_shared<hi::PublishOneForManyWithConnectionsNotifier<std::string>>
([&]{scope.print("First subscriber connected");},
[&]{scope.print("Last subscriber disconnected");});
Подписаться и отписаться на канал напрямую можно так
auto simple_protector = std::make_shared<bool>();
auto weak_from_protector = std::weak_ptr(simple_protector);
publisher->subscribe(
[&, protector = std::move(simple_protector)]
(std::string publication) mutable
{
scope.print(std::string{"single shot subscriber received: "}.append(std::move(publication)));
// Break subscription
protector.reset();
}, std::move(weak_from_protector), highway->mailbox());
В примере лямбда при первом же вызове сломает протектор и тем самым удалит подписку. Ещё в примере идёт обращение напрямую к Издателю, это не обязательно - из каждого издателя можно получить объект hi::ISubscribeHerePtr<> и разослать будущим подписчикам. Этот объект крут тем что не удерживает объект Издателя от удаления:
ISubscribeHerePtr<Publication> subscribe_channel()
{
struct SubscribeHereImpl : public ISubscribeHere<Publication>
{
SubscribeHereImpl(std::weak_ptr<PublishOneForMany<Publication>> self_weak)
: self_weak_{std::move(self_weak)}
{
}
void subscribe(Subscription<Publication> && subscription) override
{
if (auto self = self_weak_.lock())
{
self->subscribe(std::move(subscription));
}
}
const std::weak_ptr<PublishOneForMany<Publication>> self_weak_;
};
return std::make_shared<SubscribeHereImpl>(SubscribeHereImpl{self_weak_});
}
Вообще библиотека построена так чтобы не накручивать понапрасну счётчик внутри std::shared_ptr (это только что была пасхалка тем кто C++ сложные объекты в Java/Kotlin пробовал пробрасывать, и потом ждал когда наконец garbage collector всех отпустит).
В следующем примере имитируется сервис который сам запускается и останавливается в зависимости от наличия подписчиков. В результате на экране будет видно что сервис выполняет работу только когда появляется подписчик на сервис. В этом примере подписчики самоликвидируются совсем. Иногда же хочется просто отписаться от одного сервиса и подписаться к другому - для этого необходимо разрушать протектор который прикладывается к подписке, подписка будет удалена издателем при следующей рассылке. Пример такого поведения есть в юнит тестах.
3. Узлы обработки и блок-схема
На предыдущем месте работы я занимался видеоаналитикой и тогда родилась идея что было бы удобно в конвейер обработки видеоизображения пальцем(мышкой) добавлять узлы с различной логикой. Например добавить контрастность, добавить получение разности со средним от предыдущих изображений или наоборот всё размыть пятном определённого размера чтобы убрать снежинки - такое визуальное программирование где видишь всю цепочку обработки изображения абстрагируясь от кода. В боевых алгоритмах в блок схемах нужны были бы циклы (например для тренировки эталона для детекции трещин на трубах), параллельные вычисления(в одном потоке новые грани ищешь, в другом цветовые отклонения от эталона), логические ветвления при принятии решения о включении тревоги. На вход такой блок-схемы бы регулярно подавались кадры, а на выходе сигналы с результатами видеоаналитики.
Инструментарий библиотеки thread_highways я создал с оглядкой на видеоаналитические задачи: например ограничение количества холдеров в которых может быть передан сигнал становится понятным если вспомнить что одно изображение с камеры занимает от 8Мб и больше, а камера делает 50 изображений в секунду - конвейер переваривает изображения медленнее и если не пропускать кадры, то память быстро закончится. Например в дефектоскопии неважно обнаружишь ты дефект на ленте на 35м кадре или 45м - лист металла просто сдвинется и дефект будет отловлен просто выше или ниже по движению. Поэтому в подписках указывается параметр "bool send_may_fail " - это удобно: ограничиваешь количество изображений в работе исходя из объёма оперативной памяти на вычислителе и дальше уверен что система будет работать стабильно. Есть задачи когда пропуск сигнала неприемлем, например при построении 3D карты поверхности по отражению лазерного луча - такие задачи можно решать семафором который по сигналу будет морозить сканирование если память заканчивается (чтобы гарантировать стабильность работы можно подписать Издателя кадров на сигналы с конца конвейера чтобы те инкрементировали семафор.. Хотя зачем? Ограничение холдеров хайвея так и работает.. Можно просто поставить send_may_fail = false).
На текущем месте работы вместо std::future используется самописный future который позволяет подписаться на результат других future с указанием экзекьютора на каком потоке хочешь каждую future запускать. Механизм реализован через врапперы времени компиляции, поэтому способов склеить несколько future друг с другом не так много. Но это уже намного удобнее чем std::future так как всю цепочку расчётов можно спланировать заранее и манипулировать как единым объектом - например удалить не дожидаясь результата в силу изменившихся обстоятельств. В последнем future узле цепочки алгоритма обычно необходимо доставить результат работы - чаще всего это отправка сигнала с данными в один из каналов. Подписываться на результат намного удобнее чем механизмы std::future где в каком-то потоке (каком?) надо следить готова ли std::future. Коллеги пришли к тому что future лучше использовать вместе с каналом издателя&подписчика и архитектуру строить как бы в будущем которое может и не наступить если ситуация изменилась. С одной стороны такие future алгоритмы позволяют экономить ресурсы за счёт отмены того что вдруг стало ненужным, но с другой переиспользовать такую цепочку невозможно - она одноразовая. Я подумал что, наверно, каждый раз под каждый кадр видеоаналитики пересоздавать весь future конвейер с OpenCV обработкой и нейросетями - это не самое удачное решение.
Поэтому попробовал сделать следующий эволюционный шаг:
future объекты сразу со встроенными каналами для получения входящих сигналов и каналами для публикации результатов
future объекты можно переиспользовать и собирать в блок-схемы подписываясь на сигналы друг друга. Блок-схемы можно менять динамически (тут родилась идея программы которая будет сама себя развивать)
future выполняется на заданном хайвее - это позволяет построить безмьютексную архитектуру
подписки и логика future объектов защищены протекторами - помимо защиты от использования разрушенных объектов, побочно это позволяет перестраивать блок-схему разрушая подписки и создавая новые связи
в качестве выполняемой логики можно использовать лямбды, функторы, а также указатели (std::share_ptr/std::unique_ptr/вообще всё то что реализует оператор *дереференсинга) на функторы
- future объекты разные и их номенклатура растёт под возникающие задачи.
На сегодня уже созданы такие типы future узлов блок-схемы:
VoidFutureNode - узел появился для простых случаев когда нет сигнала с входящими данными или когда хочется подписаться на каналы с данными любого типа которые не будут передаваться на вход колбэка. Стартует по факту любого сигнала на вход и содержимое сигнала не смотрит (void на вход).
FutureNode - отличие от VoidFutureNode в том что узел можно подписать на получение входящих сигналов которые подаются первым параметром в логику кода
IfElseFutureNode - отличие от FutureNode в том что добавляется второй исходящий канал публикации результатов: первый исходящий для ветки "if" логики, второй исходящий канал для подключения "else" логики.
OperationWithTwoOperandsFutureNode - отличие от FutureNode в том что добавляется второй входящий канал параметров. Узел ждёт когда придут сигналы во входящие 2 канала, причём сигналы могут приходить с разной частотой. В момент времени когда появились данные для 2-х входящих параметров запускается логика узла, после чего значения параметров сбрасываются и узел опять встаёт в ожидание когда заполнятся данные с двух входящих каналов. Этот узел можно использовать для реализации бинарных операций или в качестве некого семафора (сигнал во второй канал пропускает текущие на данный момент данные из первого канала .. или наоборот).
AggregatingFutureNode - отличие от FutureNode в том что входящих каналов может быть любое количество. Входящие сигналы доставляются сразу, но указывается номер канала с которого были получены данные и общее количество каналов. Для хранения агрегированного состояния инжектируется структура, класс которой задаётся в шаблоне узла - эта структура также подаётся как параметр в логику узла чтобы можно было сохранять поступающие данные и принимать агрегированное решение. Узел задумывался в качестве замены future логики when_all/when_any, но тут правила можно придумывать любые - например слать результат при 50% заполнении, или на каждый сигнал отправлять текущее среднее, или можно разделить входящие на обязательные и необязательные параметры начиная обработку в момент когда соберётся обязательный кворум..
Между собой future узлы блок схемы можно соединять подписками как угодно, организовывать циклы, реверсивные течения и обратное воздействие. Узлы и подписки - это аналог подключения нейронов аксонами, поэтому позволяет повторить архитектуру головного мозга. Вот только в каждом узле можно разместить что-то покруче чем уравнение с весами классических нейронок.
Как вышеописанное работает лучше смотреть в коде примеров и юнит тестах.
Чтобы не раздувать публикацию опишу лишь немного фишек:
Простой пример как сделать фьючу из функтора в котором много всего включая мьютексы и поэтому его нельзя мувать на чём традиционный std::future скорее всего бы просто не скомпилировался:
auto future_node = hi::VoidFutureNode<std::string>::create(
std::make_unique<ScienceOfNotMovable>(scope),
highway->protector_for_tests_only(), highway);
В коде выше задействован минимум параметров:
"std::string" = в канал с результатом будет кидаться сигнал такого типа, поэтому подписчики на результат должны будут уметь принимать в свой колбэк параметром этот std::string.. хотя и не обязательно
"std::make_unique<ScienceOfNotMovable>(scope)" = инжектирование логики фьючи, функтор обернул в указатель но это не обязательно. На работе обычно всегда постим лямбду - это удобнее.
"highway->protector_for_tests_only()" = это протектор, по хорошему это должен быть std::weak_ptr на объект с которым работает лямбда с логикой которую могли запостить выше вместо функтора. Нужен чтобы когда лямбда доедет до исполнения на потоке не оказалось что объект с которым она работает уже давно удалили. Для тестов можно взять любой weak_ptr.
"highway" = на каких мощностях будет исполняться созданная фьюча.
Интерфейс колбека (лямбды или функтора) может принимать различное количество параметров из списка предоставляемых. Решение о том какие параметры подавать принимается во время компиляции начиная с максимального списка, например если задействовать все фишки hi::VoidFutureNode :
auto future_node = hi::VoidFutureNode<std::string>::create(
[&](hi::IPublisher<std::string> & result_publisher,
hi::INode & node,
const std::atomic<std::uint32_t> & global_run_id,
const std::uint32_t your_run_id)
{
if (your_run_id != global_run_id) return;
scope.print(std::string{"Executed future_node №"}.append(std::to_string(node.node_id())));
node.publish_progress_state(true, 99);
result_publisher.publish("Scientific discovery: you can publish which ExecutionTree node is currently executing and what % of progress is made");
},
highway->protector_for_tests_only(),
highway,
__FILE__, __LINE__,
execution_tree.current_executed_node_publisher(), 555);
Этот пример делает то же что и предыдущий - публикует строку с полученным результатом. Но в добавок:
следит за тем не начала ли останавливаться магистраль через "your_run_id != global_run_id" (вдруг получение строки занимает много времени, разумно следить не нажал ли юзер на Стоп)
может использовать свой id узла(555) и публиковать прогресс (можно использовать например для визуализации работы блок-схемы: активные узлы подсвечивать зелёным и рисовать линию прогресса)
указал координаты (__FILE__, __LINE__) своего кода и теперь система мониторинга знает на кого надо нажаловаться если магистраль зависла
У обычного hi::FutureNode в полном интерфейсе колбэка добавляется значение входящего сигнала:
void operator()(
[[maybe_unused]] Parameter publication,
[[maybe_unused]] IPublisher<Result> & result_publisher,
[[maybe_unused]] INode & node,
[[maybe_unused]] const std::atomic<std::uint32_t> & global_run_id,
[[maybe_unused]] const std::uint32_t your_run_id) override
{
if constexpr (std::is_invocable_v<R,Parameter,IPublisher<Result> &, INode &
,const std::atomic<std::uint32_t> &, const std::uint32_t>)
{
safe_invoke_void(
callback_,
protector_,
std::move(publication),
result_publisher,
node,
global_run_id,
your_run_id);
} else if constexpr (
...
Вы наверно обратили внимание что все параметры промечены [[maybe_unused]] - во время компиляции будет проанализирован тот колбэк что вы передали на предмет какие параметры в него возможно передать, по максимуму. Можно вообще не принимать никаких параметров в колбэк - тогда Ваш код отработает как реакция на факт получения сигнала по подписке. VoidFutureNode появился когда оказалось что полезен узел который можно подписать вообще на что угодно с разнородных каналов и колбэк будет отрабатывать при получении данных любого типа - данные в параметр "Parameter publication" вообще не передаются, обрабатывается просто факт сигнала.
У логического ветвления IfElseFutureNode в полном интерфейсе колбэка уже два канала для публикаций в ответвления if и else:
void operator()(
Parameter publication,
IPublisher<IfResult> & if_result_publisher,
[[maybe_unused]] IPublisher<ElseResult> & else_result_publisher,
[[maybe_unused]] INode & node,
[[maybe_unused]] const std::atomic<std::uint32_t> & global_run_id,
[[maybe_unused]] const std::uint32_t your_run_id) override
То что только 2 исходящих канала if_result_publisher и else_result_publisher не означает что в блок схеме это простое разветвление: на каждый канал могут подписаться несколько подписчиков. И входящий канал можно подписать сразу на результаты работы нескольких других узлов. А для AggregatingFutureNode входящих каналов может быть любое количество... Узлы можно как нейроны головного мозга соединять аксонами подписок в большой серый клубок и такой ExecutionTree уже проблематично будет нарисовать на листочке в виде блок-схемы..
AggregatingFutureNode можно использовать когда данные поступают с различных источников с различной скоростью, но для дальнейшей обработки нужен набор нескольких. Например система прицеливания использует данные с геопозицией, дальномер до цели, триангуляцию, фото с инфракрасной камеры, фото с ультрафиолетовой камеры, фото с обычной камеры и т.д. - на агрегирующем узле данные могли бы обновляться с той скоростью с какой они поступают и при накоплении достаточного кворума отправляться наборами в следующие по блок-схеме узлы для дальнейшего анализа. Канал геопозиции может быть подписан на GPS, Глонасс, Beidou, Galileo и чего ещё там бывает - поступающие с разных источников данные агрегировать и передавать дальше по блок-схеме уже более точную геопозицию.
Каждый узел создаётся как потомок класса INode сразу в виде std::shared_ptr<INode> и получает в конструкторе std::weak_ptr self_weak на себя. self_weak используется как дополнительный протектор в подписках, гарантирующий что колбэки будут вызваны только если узел ещё живой. Указатели на узлы надо было где-то хранить чтобы блок-схема продолжала существовать после создания - для этого был создан необязательный класс ExecutionTree и чтобы повысить его полезность я в него же поместил канал публикации текущего прогресса. На этот канал можно подписаться и получать сигналы с узлов о том кто начал/закончил работу и какого прогресса добился. Сам ExecutionTree тоже где-то надо было хранить и показалось логичным это делать в узле с результатом - для этого сделал узел ResultWaitFutureNode откуда всегда можно получить объект результата уже не через подписку к каналу, а через блокирующий вызов get_result(). Подписаться можно вообще к любому узлу блок-схемы и получать промежуточные результаты. Вместо ExecutionTree указатели на узлы можно хранить в соседних узлах как это делает двусвязанный список - тогда сегменты некой гипернейросети будут самоликвидироваться если на них больше никто не ссылается.
Все возможности библиотеки thread_highways охватить в рамках этой короткой публикации наверно невозможно хотя бы потому что возможности я добавлял пока писал эту самую публикацию. И планирую инструментарий дальше развивать когда буду натыкаться на интересные задачи. Для себя я переоткрыл программирование блок-схемами (про которые уже было позабыл давно) на новом уровне. Программирую теперь однопоточно без мьютексов и вообще без оглядки на другие потоки: каждая крупная сущность базируется на своём однопоточном хайвее (какие-то сложные расчёты можно размещать на многопоточном хайвее, но результаты стекаются обратно на основной поток), а этот однопоточный хайвей можно шарить между несколькими объектами. Из реальных замеров по передаче подгружаемых из кэша тайлов в рендер для отрисовки: с мьютексами на это уходило 300ms-600ms, без мьютексов стало уходить 0ms так как цикл перестал морозиться на мьютексе (цикл не мог отработать мгновенно так как на каждой итерации ждал мьютекс который успевал захватить рендер поток == 2 потока работали строго по очереди).
В этом тесте цепочка узлов блок-схемы строит сама себя. По одним и тем же каналам&подпискам передаются данные разных типов и даже сами узлы которые дальше инжектятся в ExecutionTree(напоминает как по ДНК в клетке собираются аминокислоты, куда-то транспортируются и там прикручиваются отстраивая таким образом организм). Представьте себе программу которая бы могла создавать сама себя: добавлять себе логику, менять себе логику, адаптироваться. Все мы знаем про нейросети - их возможности растут.. но:
нейросеть это чёрный ящик с неточными алгоритмами
тренировка и работа нейросети требует значительных вычислительных ресурсов
нет возможности залогировать&отдебажить логику работы нейросети, нет детальной блок-схемы алгоритмов.
Композит из саморазвивающегося алгоритма который бы смог сам тренировать нейросети для решения встречающихся задач и затем эти нейро-решения встраивать узлами в основной алгоритм - это было бы очень похоже на то как развиваемся сами мы. И такой композит будет намного эффективнее чем чистая нейросеть или чистый C++ код: незачем решать задачи калькулятора нейросетью, и наоборот для нечёткой логики.. Робот с таким ExecutionTree в голове мог бы нейросетью искать объект, а затем по точному C++ алгоритму объект обрабатывать == композит из гибкости нейросети и точности калькулятора. Появилась бы возможность дебажить автопилот автомобиля после аварии так как была бы блок схема виртуального водителя с логом исполнения этой блок схемы.
Что думаете?
Комментарии (3)
eao197
17.05.2022 10:38Спасибо за интересный рассказ, даже удивительно, что пока здесь был всего лишь один комментарий.
В библиотеке эта проблема решена. В следующем примере используется std::weak_ptr<SelfProtectedTask> в качестве защиты от запуска лямбды если целевой объект уже разрушился при выходе из скоупа
Позволю себе позанудствовать и высказать свои опасения в легальности примененного вами решения. А именно:
struct SharedHolder : public std::enable_shared_from_this<SharedHolder> { alignas(T) unsigned char buffer_[sizeof(T)]; T * ptr_{nullptr}; std::shared_ptr<T> construct(Args &&... args) { auto ptr = std::launder(reinterpret_cast<T *>(buffer_)); // (1) auto alias = std::shared_ptr<T>(this->shared_from_this(), ptr); // (2) ::new (ptr) T(alias, std::forward<Args>(args)...); // (3) ptr_ = ptr; return alias; }
Здесь в точке (1) вы "отмываете" указатель и получаете якобы легальный указатель на объект типа T. Оставим вопрос о том, насколько легально делать это, если по факту в buffer_ на момент вызова
launder
не было корректно сформированного экземпляра типа T. Допустим, чтоptr
валиден и передача его вshared_ptr
в точке (2) допустима.Но в точке (3) вы переписываете содержимое
buffer_
, тем самым делая невалидными все предыдущие указатели, которые указывали на T внутриbuffer_
. Поэтому и самptr
после вызова placement new перестает быть валидным. Чтобы он остался таковым вам нужно было бы написать, например, так:ptr = ::new(buffer_) T(alias, std::forward<Args>(args)...);
Или же сделать еще один
std::launder
после завершения placement new:::new (ptr) T(alias, std::forward<Args>(args)...); // (3) ptr_ = std::launder(reinterpret_cast<T*>(buffer_));
Без этих действий в
ptr_
окажется указатель, который является невалидным. По крайней мере в C++17.Но даже если вы корректно "отмываете" значение для
ptr_
, то остается вопрос с тем, что вalias
вы передали указатель, который перестал быть валидным после вызова placement new.А это UB.
По крайней мере на уровне моего понимания того, как вся эта механика работает в C++17 (см. пояснения здесь). В C++20 вроде бы стало попроще, но в C++20 у меня познаний нет.
PS. Есть ощущение, что SharedHolder можно было бы сделать проще за счет использования
std::optional<T>
вместоbuffer_
иptr_
.
eao197
17.05.2022 11:13Позволю себе еще позанудствовать и озвучить пару не то, чтобы замечаний, но моментов, которые вызвали что-то типа когнитивного диссонанса.
Момент первый. Сочетание терминов "канал" и "издатель/подписчик". Я, пожалуй, в первый раз сталкиваюсь с ситуацией, когда эти термины используются совместно. Обычно "канал" -- это всего лишь один писатель и всего лишь один читатель. Никаких подписок, просто в канал кто-то пишет, кто-то читает (причем читает когда ему это удобно). Тогда как с "издатель/подписчик" обычно используется термин "топик". Издатель публикует данные в топик, подписчики подписываются на топик. Понятно, что вы, как автор библиотеки, можете выбирать любые термины, которые вам кажутся наиболее уместными. Но для стороннего читателя создается дополнительная когнитивная нагрузка для того, чтобы привыкнуть, что "канал" здесь, это не то же самое, к чему читатель привык.
Момент второй. Вы в описании достаточно часто начинаете говорить о мутексах или weak_ptr. Для вас, как для автора, достаточно просто понимать что к чему и почему упоминания мутексов/weak_ptr важны. Но для того, кто впервые знакомиться с библиотекой, такие погружения в дебри картину происходящего только усложняют. Ведь сначала хочется понять что это и зачем, что может, как это выглядит. А уже когда общее понимание достигнуто, тогда можно и про какие-то потроха поговорить. Однако, у вас такого последовательного погружения (на мой взгляд) не получилось и вы начинаете нагружать читателя техническими подробностями слишком рано. Из-за этого после первого прочтения я вообще не понял зачем о мутексах и weak_ptr здесь нужно было говорить. Вроде как библиотека нужна чтобы освободить пользователя от всех этих низкоуровневых деталей, но нет, эти самые детали чуть ли не подчеркиваются специально. ИМХО, вполне можно было бы обойтись и без этого.
mrbald
Классная штука! Какая у неё производительность на поток и overhead? Я обычно не пишу на плюсах когда можно тратить больше микросекунды на message/event/data-row или когда надо память много динамически аллокировать, так как AKKA actors/streams могут пару сотен тысяч сообщений в секунду обрабатывать, CompletableFuture - того же порядка и GC очень хорошо работает в новых JVM (JRE11+).