В теории все просто, для стейтмашины нужен объект, который имеет метод GetAwaiter, который вернет awaiter, в котором реализован INotifyCompletion с OnCompleted, свойство IsCompleted и метод GetResult.
Awaiter:
public class ManualResetEventAwaiter : INotifyCompletion
{
Action _continuation;
bool _isCompleted = false;
public void OnCompleted(Action continuation)
{
Volatile.Write(ref _continuation, continuation);
}
public bool IsCompleted => _isCompleted;
public bool GetResult() => true;
public ManualResetEventAwaiter GetAwaiter() => this;
public void Continue()
{
Volatile.Write(ref _isCompleted, true);
var continuation = Interlocked.Exchange(ref _continuation, null);
if (continuation != null)
Task.Run(continuation);
}
}
В OnCompleted стейтмашина передаст действие которое продолжит нашу логику выполнения. Для нас это воспринимается как логика после await.
В Continue будет вызываться это продолжение работы. Я обернул его в Task для того что бы «ожидающие таски» так же продолжились асинхронно.
ManualResetEventAsync:
public class ManualResetEventAsync
{
ConcurrentQueue<ManualResetEventAwaiter> awaitQueue = null;
public ManualResetEventAsync(bool initialState)
{
if (!initialState)
Reset();
}
public ManualResetEventAwaiter WaitOneAsync()
{
var awaitable = new ManualResetEventAwaiter();
var queue = Volatile.Read(ref awaitQueue);
if (queue == null)
awaitable.Continue();
else
{
queue.Enqueue(awaitable);
//check queue
var upd_queue = Volatile.Read(ref awaitQueue);
if (!ReferenceEquals(queue, upd_queue))
awaitable.Continue();
}
return awaitable;
}
public void Set()
{
var queue = Interlocked.Exchange(ref awaitQueue, null);
if (queue != null)
while (queue.TryDequeue(out var awaitable))
awaitable.Continue();
}
public void Reset()
{
Interlocked.CompareExchange(
ref awaitQueue,
new ConcurrentQueue<ManualResetEventAwaiter>(),
null);
}
}
Метод WaitOneAsync будет использоваться с await.
Set вызовет все продолжения и установит сигнальное состояние.
Reset установит не сигнальное состояние.
UPD: Упростил немного код.
Ссылка на проект
Комментарии (22)
mayorovp
13.05.2019 09:52+1У вас гонка между методами
Set
иWaitOneAsync
.
Метод
WaitOneAsync
может положитьawaitable
в очередь уже в тот момент, когда методSet
закончил работу и ничего из очереди не читает...
Нужно, во-первых, сделать
State
volatile — а во вторых, проверять его уже после выполнения методаEnqueue
.Ascar Автор
13.05.2019 12:18-1State свойство. Когда Set закончит работу и State будет false только тогда можно в очередь добавлять, иначе возвращается завершенный awaiter. А если State внезапно станет true до завершения Set, то чтение TryDequeue прекратится.
mayorovp
13.05.2019 12:25Вот только запросто может оказаться, что метод Set в одном потоке уже отработал, а State в другом потоке все ещё видно как false. Ибо кеш, реордеринг инструкций и прочее...
Ascar Автор
13.05.2019 12:56Вы предлагаете в get у State задать поле votatile?
mayorovp
13.05.2019 13:50Не очень понимаю при чем тут get, но да, State при вашем подходе должен быть volatile. Как раз это я и написал после слов "во-первых".
Ascar Автор
13.05.2019 15:53При том что свойство не делается volatile.
mayorovp
13.05.2019 16:03А с каких пор в C# запретили обычную реализацию свойств и оставили только автосвойства?
И зачем вообще классу ManualResetEventAsync нужно открытое свойство State, которое невозможно безопасно использовать ни для чего кроме диагностики?
mayorovp
13.05.2019 10:01Но, вообще говоря, вся логика, которую вы делали в
ManualResetEventAwaiter
, уже есть вTaskCompletionSource
. Можно его и использовать. Заодно получится от очереди избавиться, ведьTaskCompletionSource
поддерживает множественное ожидание...
public class ManualResetEventAsync { private TaskCompletionSource<bool> tcs = null; public Task WaitOneAsync() { var tcs = Volatile.Read(ref this.tcs); if (tcs == null) return Task.CompletedTask; else return tcs.Task; } public void Set() { var old_tcs = Interlocked.Exchange(ref this.tcs, null); old_tcs?.SetResult(false); } public void Reset() { var new_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); Interlocked.CompareExchange(ref this.tcs, new_tcs, null); } }
Ascar Автор
14.05.2019 06:26Ага, вариант работает, еще конструктор с сигнальным состоянием добавить.
Lelushak
1. Писать такое нужно как extension к WaitHandle, т.к. благодаря этому поддержка асинхронного ожидания появится сразу у:
WaitHandle
AutoResetEvent
EventWaitHandle
ManualResetEvent
Mutex
Semaphore
И других его наследников.
2. Async and cancellation support for wait handles
Ascar Автор
Цель была написать это именно самому. Мне вообще сейчас кажется не правильно мешать асинхронную модель с методами синхронизации потоков, все таки это разное. Название класса только пока оставлю схожее.
a-tk
Тогда уже корректнее сказать, что цель — потренироваться писать такие вещи. Авось пригодится когда-нибудь навык.
Ascar Автор
Посмотрел исходники проекта, там сделано через TaskCompletionSource и ThreadPool.RegisterWaitForSingleObject который использует WaitHandle. Я же вообще не лезу в ThreadPool.
mayorovp
Вот только WaitHandle — это всегда объект ядра, а их использования зачастую хочется избежать.
Lelushak
Можете привести примеры? На ум приходит только требование кроссплатформенности. Гораздо чаще хочется избежать оверхеда по памяти (но тут ваша реализация снизу спасает автора) и усложнения кода: придется переписывать все примитивы синхронизации и вообще сущности множатся
Если же говорить о разработке под Windows, то работа с WaitHandle позволит не писать обертки для работы со сторонними либами и использовать удобные конструкции типа cancellationToken.WaitHandle.WaitOneAsync()
mayorovp
Любое взаимодействие с объектом ядра — это системный вызов, а системные вызовы нынче дороги. Конечно же, совсем без них не обойтись, но делать системный вызов для выполнения операции Set многими уже считается перебором.
Это отдельная задача. Да, если у нас уже есть WaitHandle — то решение по вашей ссылке будет оптимальным. А вот при написании своего кода пригодится управляемая реализация.
Обманчиво удобная конструкция: она создает лишний объект ядра, делая аж четыре системных вызова в том месте, где их можно вовсе избежать!
Вот так делать правильнее:
Разумеется, такой код можно вынести в отдельный метод.
Lelushak
Тогда уж
Так как ваш вариант потенциально может привести к дедлоку из-за преобразования оставшейся части кода в синхронное «продолжение» таски
mayorovp
Не очень понимаю где конкретно там может быть дедлок, но исправление нормальное. Тогда уж надо и ConfigureAwait добавить (при условии что это все вынесено в отдельный метод).
Lelushak
Синтетический пример с дедлоком:
После await tsc.Task последующий код метода продолжит выполняться в том же потоке, второй делегат с cleaned=true не начнет выполняться до тех пор, пока не закончится синхронный код либо пока не вызовется какая-нибудь другая асинхронная функция (например await Task.Delay(1))
mayorovp
Ну, блок кода вида "синхронная работа в бесконечном цикле" в асинхронной программе — сам по себе ошибка, он много где ещё может выстрелить.
С тем же успехом можно было написать вот так и жаловаться на дедлоки:
Lelushak
Ну так дедлоки не от хорошего кода появляются :)
Ну и это все же другое. Код из моего примера плохой, но в общем-то легальный: в самих делегатах бесконечной работы нет и интуитивно ожидается, что они быстренько отработают друг за другом (так бы и произошло без await). Ломается он только из-за специфичных особенностей разворачивания await'ов компилятором и найти такую ошибку в сколько-нибудь большой системе будет сложнее, чем бесконечный цикл внутри делегата, особенно если не знать где искать. Так что лучше предохраниться где это возможно и возможно кто-нибудь из читателей этой ветки в будущем не выстрелит себе в ногу