Возникла сегодня идея написать асинхронную версию ManualResetEvent, которая в задаче будет «ожидать» через await и при этом не занимать никакой поток.

В теории все просто, для стейтмашины нужен объект, который имеет метод GetAwaiter, который вернет awaiter, в котором реализован INotifyCompletion с OnCompleted, свойство IsCompleted и метод GetResult.

Awaiter:

    public class ManualResetEventAwaiter : INotifyCompletion
    {
        Action _continuation;
        bool _isCompleted = false;
        public void OnCompleted(Action continuation)
        {
            Volatile.Write(ref _continuation, continuation);
        }

        public bool IsCompleted => _isCompleted;

        public bool GetResult() => true;

        public ManualResetEventAwaiter GetAwaiter() => this;

        public void Continue()
        {
            Volatile.Write(ref _isCompleted, true);

            var continuation = Interlocked.Exchange(ref _continuation, null);

            if (continuation != null)
                Task.Run(continuation);
        }
    }

В OnCompleted стейтмашина передаст действие которое продолжит нашу логику выполнения. Для нас это воспринимается как логика после await.

В Continue будет вызываться это продолжение работы. Я обернул его в Task для того что бы «ожидающие таски» так же продолжились асинхронно.

ManualResetEventAsync:

    public class ManualResetEventAsync
    {
        ConcurrentQueue<ManualResetEventAwaiter> awaitQueue = null;

        public ManualResetEventAsync(bool initialState)
        {
            if (!initialState)
                Reset();
        }

        public ManualResetEventAwaiter WaitOneAsync()
        {
            var awaitable = new ManualResetEventAwaiter();

            var queue = Volatile.Read(ref awaitQueue);

            if (queue == null)
                awaitable.Continue();
            else
            {
                queue.Enqueue(awaitable);

                //check queue
                var upd_queue = Volatile.Read(ref awaitQueue);               

                if (!ReferenceEquals(queue, upd_queue))
                    awaitable.Continue();
            }

            return awaitable;
        }

        public void Set()
        {
            var queue = Interlocked.Exchange(ref awaitQueue, null);

            if (queue != null)
                while (queue.TryDequeue(out var awaitable))
                    awaitable.Continue();
        }

        public void Reset()
        {
            Interlocked.CompareExchange(
                ref awaitQueue,
                new ConcurrentQueue<ManualResetEventAwaiter>(),
                null);
        }
    }

Метод WaitOneAsync будет использоваться с await.

Set вызовет все продолжения и установит сигнальное состояние.

Reset установит не сигнальное состояние.

UPD: Упростил немного код.

Ссылка на проект

Комментарии (22)


  1. Lelushak
    13.05.2019 01:32

    1. Писать такое нужно как extension к WaitHandle, т.к. благодаря этому поддержка асинхронного ожидания появится сразу у:

    WaitHandle
    AutoResetEvent
    EventWaitHandle
    ManualResetEvent
    Mutex
    Semaphore

    И других его наследников.

    2. Async and cancellation support for wait handles


    1. Ascar Автор
      13.05.2019 02:07

      Цель была написать это именно самому. Мне вообще сейчас кажется не правильно мешать асинхронную модель с методами синхронизации потоков, все таки это разное. Название класса только пока оставлю схожее.


      1. a-tk
        13.05.2019 08:48

        Тогда уже корректнее сказать, что цель — потренироваться писать такие вещи. Авось пригодится когда-нибудь навык.


    1. Ascar Автор
      13.05.2019 07:59

      Посмотрел исходники проекта, там сделано через TaskCompletionSource и ThreadPool.RegisterWaitForSingleObject который использует WaitHandle. Я же вообще не лезу в ThreadPool.


    1. mayorovp
      13.05.2019 10:09

      Вот только WaitHandle — это всегда объект ядра, а их использования зачастую хочется избежать.


      1. Lelushak
        13.05.2019 10:49

        Можете привести примеры? На ум приходит только требование кроссплатформенности. Гораздо чаще хочется избежать оверхеда по памяти (но тут ваша реализация снизу спасает автора) и усложнения кода: придется переписывать все примитивы синхронизации и вообще сущности множатся

        Если же говорить о разработке под Windows, то работа с WaitHandle позволит не писать обертки для работы со сторонними либами и использовать удобные конструкции типа cancellationToken.WaitHandle.WaitOneAsync()


        1. mayorovp
          13.05.2019 11:09
          +1

          Любое взаимодействие с объектом ядра — это системный вызов, а системные вызовы нынче дороги. Конечно же, совсем без них не обойтись, но делать системный вызов для выполнения операции Set многими уже считается перебором.


          Если же говорить о разработке под Windows, то работа с WaitHandle позволит не писать обертки для работы со сторонними либами

          Это отдельная задача. Да, если у нас уже есть WaitHandle — то решение по вашей ссылке будет оптимальным. А вот при написании своего кода пригодится управляемая реализация.


          и использовать удобные конструкции типа cancellationToken.WaitHandle.WaitOneAsync()

          Обманчиво удобная конструкция: она создает лишний объект ядра, делая аж четыре системных вызова в том месте, где их можно вовсе избежать!


          Вот так делать правильнее:


          var tsc = new TaskCompletionSource<bool>();
          using (cancellationToken.Register(() => tsc.SetResult(false))
              await tsc.Task;

          Разумеется, такой код можно вынести в отдельный метод.


          1. Lelushak
            13.05.2019 13:35

            Вот так делать правильнее


            Тогда уж

            var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
            


            Так как ваш вариант потенциально может привести к дедлоку из-за преобразования оставшейся части кода в синхронное «продолжение» таски


            1. mayorovp
              13.05.2019 13:56

              Не очень понимаю где конкретно там может быть дедлок, но исправление нормальное. Тогда уж надо и ConfigureAwait добавить (при условии что это все вынесено в отдельный метод).


              1. Lelushak
                13.05.2019 14:01

                Синтетический пример с дедлоком:

                var source = new CancellationTokenSource();
                var cancellationToken = source.Token;
                source.CancelAfter(2 * 1000);
                bool cleaned = false;
                
                cancellationToken.Register(() =>
                {
                	//some work
                	cleaned = true;
                });
                
                var tsc = new TaskCompletionSource<bool>();
                using (cancellationToken.Register(() => tsc.SetResult(false)))
                	await tsc.Task;
                
                while (true)
                {
                	if (!cleaned)
                	{
                		Thread.Sleep(100); //some synchronous work emulation
                	}
                	else break;
                }
                
                Console.WriteLine("Finished"); //never reach this
                


                После await tsc.Task последующий код метода продолжит выполняться в том же потоке, второй делегат с cleaned=true не начнет выполняться до тех пор, пока не закончится синхронный код либо пока не вызовется какая-нибудь другая асинхронная функция (например await Task.Delay(1))


                1. mayorovp
                  13.05.2019 14:13

                  Ну, блок кода вида "синхронная работа в бесконечном цикле" в асинхронной программе — сам по себе ошибка, он много где ещё может выстрелить.


                  С тем же успехом можно было написать вот так и жаловаться на дедлоки:


                  cancellationToken.Register(() =>
                  {
                      while (!cleaned)
                      {
                          Thread.Sleep(100); //some synchronous work emulation
                      }
                  });


                  1. Lelushak
                    13.05.2019 14:25

                    Ну так дедлоки не от хорошего кода появляются :)

                    Ну и это все же другое. Код из моего примера плохой, но в общем-то легальный: в самих делегатах бесконечной работы нет и интуитивно ожидается, что они быстренько отработают друг за другом (так бы и произошло без await). Ломается он только из-за специфичных особенностей разворачивания await'ов компилятором и найти такую ошибку в сколько-нибудь большой системе будет сложнее, чем бесконечный цикл внутри делегата, особенно если не знать где искать. Так что лучше предохраниться где это возможно и возможно кто-нибудь из читателей этой ветки в будущем не выстрелит себе в ногу


  1. mayorovp
    13.05.2019 09:52
    +1

    У вас гонка между методами Set и WaitOneAsync.


    Метод WaitOneAsync может положить awaitable в очередь уже в тот момент, когда метод Set закончил работу и ничего из очереди не читает...


    Нужно, во-первых, сделать State volatile — а во вторых, проверять его уже после выполнения метода Enqueue.


    1. Ascar Автор
      13.05.2019 12:18
      -1

      State свойство. Когда Set закончит работу и State будет false только тогда можно в очередь добавлять, иначе возвращается завершенный awaiter. А если State внезапно станет true до завершения Set, то чтение TryDequeue прекратится.


      1. mayorovp
        13.05.2019 12:25

        Вот только запросто может оказаться, что метод Set в одном потоке уже отработал, а State в другом потоке все ещё видно как false. Ибо кеш, реордеринг инструкций и прочее...


        1. Ascar Автор
          13.05.2019 12:56

          Вы предлагаете в get у State задать поле votatile?


          1. mayorovp
            13.05.2019 13:50

            Не очень понимаю при чем тут get, но да, State при вашем подходе должен быть volatile. Как раз это я и написал после слов "во-первых".


            1. Ascar Автор
              13.05.2019 15:53

              При том что свойство не делается volatile.


              1. mayorovp
                13.05.2019 16:03

                А с каких пор в C# запретили обычную реализацию свойств и оставили только автосвойства?


                И зачем вообще классу ManualResetEventAsync нужно открытое свойство State, которое невозможно безопасно использовать ни для чего кроме диагностики?


                1. Ascar Автор
                  14.05.2019 06:17

                  Вообщем выпилил вообще State.


  1. mayorovp
    13.05.2019 10:01

    Но, вообще говоря, вся логика, которую вы делали в ManualResetEventAwaiter, уже есть в TaskCompletionSource. Можно его и использовать. Заодно получится от очереди избавиться, ведь TaskCompletionSource поддерживает множественное ожидание...


    public class ManualResetEventAsync
    {
        private TaskCompletionSource<bool> tcs = null;
    
        public Task WaitOneAsync() 
        {
            var tcs = Volatile.Read(ref this.tcs);
            if (tcs == null)
                return Task.CompletedTask;
            else
                return tcs.Task;
        }
    
        public void Set()
        {
            var old_tcs = Interlocked.Exchange(ref this.tcs, null);
            old_tcs?.SetResult(false);
        }
    
        public void Reset()
        {
            var new_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
            Interlocked.CompareExchange(ref this.tcs, new_tcs, null);
        }
    }


    1. Ascar Автор
      14.05.2019 06:26

      Ага, вариант работает, еще конструктор с сигнальным состоянием добавить.