ValueTask и IValueTaskSource

Обычно ValueTask используют ради оптимизации.

Например, возврат закэшированного результата, или ValueTask.FromCancelled с переданным CancellationToken.

Но нет предела оптимизациям и одним ранним выходом теперь не обойтись. Поэтому был добавлен IValueTaskSource.

В ValueTask можно создать не только передав готовый результат или Task, но
и упомянутый выше IValueTaskSource.

// Конструкторы
public ValueTask(IValueTaskSource source, short token);
public ValueTask(Task task);
public ValueTask<T>(T result);

Что это такое мы с вами сейчас и узнаем.

Устройство и алгоритм работы IValueTaskSource

Интерфейс IValueTaskSource - набор из 3 методов:

public interface IValueTaskSource<out TResult>
{
    // Получить статус выполнения текущей операции  
    ValueTaskSourceStatus GetStatus(short token);

    // Запланировать продолжение на выполнение при завершении работы, 
    // если на момент проверки работа не завершена
    void OnCompleted(
      Action<object?> continuation,
      object? state,
      short token,
      ValueTaskSourceOnCompletedFlags flags);

    // Получить готовый результат операции
    TResult GetResult(short token);
}

GetStatus - получает статус выполнения.

Статус представляется перечислением ValueTaskSourceStatus

public enum ValueTaskSourceStatus
{
    // В процессе
    Pending,
    // Успешно завершилась
    Succeeded,
    // Завершилась ошибкой (исключением)
    Faulted,
    // Отменена
    Canceled,
}

Этот метод вызывается 1 раз, после создания ValueTask.

Если операция уже завершилась, то вызывается GetResult для получения результата.

Если операция в процессе, то вызывается OnCompleted для регистирования продолжения на выполнение.

OnCompleted - регистрирует переданное продолжение на выполнение по окончании операции.

Вызывается после GetStatus.

На вход ему подаются:

  • Action<object> continuation - само продолжение

  • object state - объект состояния, который передается continuation

  • ValueTaskSourceOnCompletedFlags flags - специальные флаги, указывающие поведение при вызове продолжения

Флаги представляются перечислением ValueTaskSourceOnCompletedFlags:

[Flags]
public enum ValueTaskSourceOnCompletedFlags
{
    // Без указаний
    None = 0,
    // Необходимо использовать текущий SynchronizationContext для продолжения
    UseSchedulingContext = 1,
    // В продолжении нужно использовать текущий ExecutionContext
    FlowExecutionContext = 2,
}

Между вызовом GetStatus и OnCompleted может пройти какое-то время и операция завершится.

Поэтому во время выполнения OnCompleted работа может быть уже закончена.
В таких случаях, продолжение обычно выполняется тут же.

GetResult - получает результат операции.

Этот метод вызывается 1 раз при завершении работы для получения результата: возвращаемый объект или исключение.

Он должен быть вызван тогда, когда операция только завершилась.

Моей ошибкой во время первой реализации было то, что я использовать семафор для ожидания выполнения.

Но из-за неправильных вызовов случился дедлок:

Фоновый поток завершил операцию и выставил результат.

В этот момент вызвалось продолжение.

Продолжение зашло в GetResult и остановилось на семафоре.

Фоновый поток не получил обратно управление, т.к. продолжение было вызвано, но семафор еще не выставлен.

Также во всех методах присутствует token. Это специальное значение для обнаружения множественных await. Зачем они нужны поговорим далее.

Алгоритм работы зависит от GetStatus:

  • Pending - операция не завершилась, поэтому нужно запланировать дальнейшее выполнение:

    1. GetStatus

    2. OnCompleted

    3. GetResult

  • В остальных случаях выполнение уже завершилось, поэтому получаем результат сразу же:

    1. GetStatus

    2. GetResult

Здесь можно провести аналогию с тем, как работает магия async/await и ее машины состояний. Грубо говоря, мы создали свой собственный Task с блэкджеком, но пока не такой эффективный.

Реализуем своими руками

Теперь сделаем свою реализацию.

Представим, перед нами задача получения статистики ПК.

У нас есть класс PcMonitor, отдающий эту статистику.

Он вызывается очень часто, поэтому для оптимизации мы решили:

  • Опрашивать ПК не на каждый вызов, а с определенным интервалом и хранить полученные значения в кэше

  • Если при вызове значение из кэша еще актуально, то вернуть его, иначе ждать до следующего сбора

Детали реализации

Статистика представляется структурой PcStatistics.
Пока там только температура процессора, но вы придумайте, что туда еще можно добавить.

public readonly record struct PcStatistics(double CpuTemperature);

В реализации используется класс ValueTaskSourcePcMonitor.

Сбор статистики реализован с помощью System.Threading.Timers.Timer,
который с определенным интервалом кладет в кэш новое значение и обновляет время сбора.

Время сбора представляет TimeSpan, получаемый с помощью Stopwatch (не самая лучшая идея, но сойдет).

Наша реализация IValueTaskSource представляется классом ManualValueTaskSource.

Он хранит в себе необходимые для работы данные.

public class ManualValueTaskSource: IValueTaskSource<PcStatistics>
{
    // Результат работы
    private CancellationToken _cancellationToken;
    private PcStatistics _cachedResult = new();
    private Exception? _exception;

    // Инфраструтура для работы IValueTaskSource
    private object? _state;
    private object? _scheduler;
    private Action<object?>? _continuation;
    private short _version;
    private ExecutionContext? _ec;

    // Инфраструктура бизнес-логики
    private readonly Timer _timer;
    private TimeSpan _lastMeasurementTime = TimeSpan.Zero;
    private CustomPcMonitor? _monitor;
}

GetStatus

Пожалуй, его реализация самая простая

public ValueTaskSourceStatus GetStatus(short token)
{
    CheckVersion(token);
    
    if (_exception is not null)
    {
        return ValueTaskSourceStatus.Faulted;
    }

    if (_cachedResult != default)
    {
        // Предположим, что настоящий результат не должен быть default
        return ValueTaskSourceStatus.Succeeded;
    }

    return ValueTaskSourceStatus.Pending;
}

Дальше OnCompleted

public void OnCompleted(Action<object?> continuation,
                        object? state,
                        short token,
                        ValueTaskSourceOnCompletedFlags flags)
{
    CheckVersion(token);
    
    if (UseExecutionContext())
    {
        _ec = ExecutionContext.Capture();
    }

    if (UseSchedulingContext() && 
        GetScheduler() is {} scheduler)
    {
        _scheduler = scheduler;
    }

    // Здесь может быть состояние гонки, когда 
    // результат выставляется быстрее, чем заканчивается вызов OnCompleted.
    // В нашем случае, такое может случиться, когда время ожидания таймера было очень мало
    _state = state;
    var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
    if (prev is null)
    {
        return;
    }
    
    _state = null;
    
    // Sentinel - маркер, выставляемый, когда операция завершилась, 
    // но колбэк еще не был выставлен
    if (!ReferenceEquals(prev, Sentinel))
    {
        throw new InvalidOperationException("Обнаружено множественное ожидание");
    }
    
    // Вызываем продолжение синхронно, т.к. уже результат уже готов
    InvokeContinuation(continuation, state, synchronously: true);
    
    bool UseExecutionContext() => ( flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext ) is not ValueTaskSourceOnCompletedFlags.None;

    bool UseSchedulingContext() =>
        ( flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext ) is not ValueTaskSourceOnCompletedFlags.None;

    object GetScheduler() => ( object? ) SynchronizationContext.Current ?? TaskScheduler.Current;
}

Теперь доходим до реализации GetResult

public PcStatistics GetResult(short version)
{
    CheckVersion(version);
    
    if (_exception is not null)
    {
        throw _exception;
    }

    if (_cachedResult == default)
    {
        // Результат еще не готов
        throw new InvalidOperationException("Работа еще не завершена");
    }

    return _cachedResult;
}

Все приведенные выше методы довольно просты в реализации, но на прод их не принесешь:

  • Нет поддержки отмены

  • Плохая работа с конкурентностью

  • ExecutionContext не используется

Реализация InvokeContinuation
private void InvokeContinuation(Action<object?>? continuation, object? state, bool synchronously)
{
    if (continuation is null)
    {
        return;
    }

    if (_scheduler is not null)
    {
        if (_scheduler is SynchronizationContext sc)
        {
            sc.Post(s =>
            {
                var t = ( Tuple<Action<object?>, object?> ) s!;
                t.Item1(t.Item2);
            }, Tuple.Create(continuation, state));
        }
        else
        {
            var ts = ( TaskScheduler ) _scheduler;
            Task.Factory.StartNew(continuation, 
                state, CancellationToken.None,
                TaskCreationOptions.DenyChildAttach, ts);
        }
    }
    else if (synchronously)
    {
        continuation(state);
    }
    else
    {
        ThreadPool.QueueUserWorkItem(continuation, state, true);
    }
}

Добавляем ManualResetValueTaskSourceCore

Реализацию написали. Мы молодцы. А теперь все выбрасываем, так как реализация за нас уже сделана - ManualResetValueTaskSourceCore.

Она реализует все выше приведенные методы логики IValueTaskSource.

Теперь перепишем старые методы с его использованием.

public PcStatistics GetResult(short token)
{
    return _source.GetResult(token); 
}

public ValueTaskSourceStatus GetStatus(short token)
{
    return _source.GetStatus(token);
}

public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
    _source.OnCompleted(continuation, state, token, flags);
}

Благодаря этому, можно писать свои ValueTaskSource и задумываться, только о бизнес-логике.

Добавляем пулинг + почему нельзя заново await'ить

Теперь время для оптимизаций.

Заметим, что GetResult вызывается только 1 раз, причем самым последним и наш IValueTaskSource после этого трогать не должны.

Почему бы не переиспользовать создаваемые ValueTaskSource и возвращать их обратно при вызове GetResult.

Так и сделаем.

В PcMonitor добавим пул этих объектов: при вызове GetStatisticsAsync берем из, а в GetResult будем возвращать обратно в.

// PcStatisticsManualResetValueTaskSource
private ObjectPool<PcStatisticsManualResetValueTaskSource>? _pool;

public PcStatistics GetResult(short token)
{
    try
    {
        var result = _source.GetResult(token);
        Reset();
        return result;
    }
    catch (Exception e) when (e is not InvalidOperationException)
    {
        Reset();
        throw;
    }

    void Reset()
    {
        // Возвращаем в пул
        _pool!.Return(this);
        
        // Перетираем состояние, чтобы переиспользовать
        _source.Reset();
    }
}

// PcMonitor
private ObjectPool<PcStatisticsManualResetValueTaskSource> _pool;

public ValueTask<PcStatistics> GetStatisticsAsync(CancellationToken token = default)
{
    var source = _pool.Get();
    return source.Start(this, _pool, token);
}

Теперь мы можем создать ограниченное количество ValueTaskSource и постоянно их переиспользовать без лишних аллокаций памяти!

Но что, если кто-то попытается заawait'ить ValueTask несколько раз?

Тогда весь путь вызовов повторится и в лучшем случае ему вернется старый результат.

Но даже если эти вызовы были крайне близки во времени, никто не гарантирует, что за это время тот же самый IValueTaskSource не будет передан в другой ValueTask и будет хранить новое значение.

Вот тут и нужен short token, передававшийся в любой метод.

Он призван проверять, что вызвавший код, обращается к ValueTask, в которой находится актуальный IValueTaskSource.

В ManualResetValueTaskSourceCore он реализован в виде простого счетчика, поэтому там он называется Version. Но в общем случае это не обязательно - сойдет любое неповторяющееся значение.

Этот токен задается в самом начале и не изменяется в процессе работы ValueTask

public ValueTask<PcStatistics> Start(ValueTaskSourcePcMonitor monitor, ObjectPool<PcStatisticsManualResetValueTaskSource> pool, CancellationToken token = default)
{
    // ...
    
    // ManualResetValueTaskSourceCore.Version - токен, который инкрементируется при вызове Reset()
    return new ValueTask<PcStatistics>(this, _source.Version);
}

Использование в .NET

Когда кто-то представляет IValueTaskSource, почти всегда в пример приводят сокет.
Я не буду исключением.

Читать или писать в сокет можно только одним потоком (только один или читает или пишет).
Растратно каждый раз создавать новые Task'и на каждый чих (особенно учитывая что "Сеть надежна").

Поэтому внутри себя сокет содержит 2 буфера IValueTaskSource - для чтения и записи

public partial class Socket
{
    /// <summary>Cached instance for receive operations that return <see cref="ValueTask{Int32}"/>. Also used for ConnectAsync operations.</summary>
    private AwaitableSocketAsyncEventArgs? _singleBufferReceiveEventArgs;
    /// <summary>Cached instance for send operations that return <see cref="ValueTask{Int32}"/>. Also used for AcceptAsync operations.</summary>
    private AwaitableSocketAsyncEventArgs? _singleBufferSendEventArgs;
    
    // ...
    
    internal sealed class AwaitableSocketAsyncEventArgs 
      : SocketAsyncEventArgs, 
        IValueTaskSource, 
        IValueTaskSource<int>, 
        IValueTaskSource<Socket>, 
        IValueTaskSource<SocketReceiveFromResult>, 
        IValueTaskSource<SocketReceiveMessageFromResult>
    {
        // ...
    }
}

Например, при чтении из сокета буфер используется таким образом:

internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
{
    // Получаем закэшированный IValueTaskSource или создаем новый (потом положим обратно в кэш)
    AwaitableSocketAsyncEventArgs saea =
        Interlocked.Exchange(ref _singleBufferReceiveEventArgs, null) ??
        new AwaitableSocketAsyncEventArgs(this, isReceiveForCaching: true);
    
    // Обновляем состояние IValueTaskSource для новой работы
    saea.SetBuffer(buffer);
    saea.SocketFlags = socketFlags;
    saea.WrapExceptionsForNetworkStream = fromNetworkStream;

    // Запускаем асинхронную операцию
    return saea.ReceiveAsync(this, cancellationToken);
}

internal sealed class AwaitableSocketAsyncEventArgs 
{
    public ValueTask<int> ReceiveAsync(Socket socket, CancellationToken cancellationToken)
    {
        if (socket.ReceiveAsync(this, cancellationToken))
        {
            // Операция не завершена синхронно - запускаем асинхронную операцию
            _cancellationToken = cancellationToken;
            return new ValueTask<int>(this, _token);
        }
        // ...

        // Операция завершилась синхронно
        return error == SocketError.Success ?
            new ValueTask<int>(bytesTransferred) :
            ValueTask.FromException<int>(CreateException(error));
    }
}

IValueTaskSource используется также в Channel'ах.

Он используется как в Bounded так и в Unbounded, но пример сделаю на Bounded.

В BoundedChannel есть следующие поля

internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
{
    /// <summary>Readers waiting to read from the channel</summary>
    private readonly Deque<AsyncOperation<T>> _blockedReaders = new Deque<AsyncOperation<T>>();
    /// <summary>Writers waiting to write to the channel.</summary>
    private readonly Deque<VoidAsyncOperationWithData<T>> _blockedWriters = new Deque<VoidAsyncOperationWithData<T>>();
    /// <summary>Linked list of WaitToReadAsync waiters.</summary>
    private AsyncOperation<bool>? _waitingReadersTail;
    /// <summary>Linked list of WaitToWriteAsync waiters.</summary>
    private AsyncOperation<bool>? _waitingWritersTail;
    // ...
}

Как можно заметить, здесь есть поля, использующие AsyncOperation - тот самый IValueTaskSource и в нем есть уже знакомые поля:

internal partial class AsyncOperation<TResult> 
    : AsyncOperation, 
      IValueTaskSource, 
      IValueTaskSource<TResult>
{
    // Предназначен ли для пулинга
    private readonly bool _pooled;
    // Асинхронное продолжение
    private readonly bool _runContinuationsAsynchronously;
    // Результат операции
    private TResult? _result;
    // Исключение в процессе работы
    private ExceptionDispatchInfo? _error;
    // continuation из OnCompleted
    private Action<object?>? _continuation;
    // state из OnCompleted
    private object? _continuationState;
    // SynchronizationContext или TaskScheduler
    private object? _schedulingContext;
    private ExecutionContext? _executionContext;
    // token
    private short _currentId;
}

Для чтения из канала используется ValueTask<T> ReadAsync:

public override ValueTask<T> ReadAsync(CancellationToken cancellationToken)
{
    BoundedChannel<T> parent = _parent;
    lock (parent.SyncObj)
    {
        // Если есть свободные элементы - вернуть их
        if (!parent._items.IsEmpty)
        {
            return new ValueTask<T>(DequeueItemAndPostProcess());
        }
        
        // Используем закэшированный IValueTaskSource
        if (!cancellationToken.CanBeCanceled)
        {
            AsyncOperation<T> singleton = _readerSingleton;
            if (singleton.TryOwnAndReset())
            {
                parent._blockedReaders.EnqueueTail(singleton);
                return singleton.ValueTaskOfT;
            }
        }

        // Возвращаем новый IValueTaskSource
        var reader = new AsyncOperation<T>(parent._runContinuationsAsynchronously | cancellationToken.CanBeCanceled, cancellationToken);
        parent._blockedReaders.EnqueueTail(reader);
        return reader.ValueTaskOfT;
    }
}

Для записи - ValueTask WriteAsync:

public override ValueTask WriteAsync(T item, CancellationToken cancellationToken)
{
    // Количество элемент в очереди
    int count = parent._items.Count;

    if (count == 0)
    {
        // Добавляем элемент в свободную очередь или заблокированного читателя
    }
    else if (count < parent._bufferedCapacity)
    {
        // Синхронно добавляем элемент в свободную очередь
    }
    else if (parent._mode == BoundedChannelFullMode.Wait)
    {
        // Очередь полна, создаем асинхронную операцию записи

        // Используем закэшированный IValueTaskSource
        if (!cancellationToken.CanBeCanceled)
        {
            VoidAsyncOperationWithData<T> singleton = _writerSingleton;
            if (singleton.TryOwnAndReset())
            {
                singleton.Item = item;
                parent._blockedWriters.EnqueueTail(singleton);
                return singleton.ValueTask;
            }
        }

        // Создаем новый IValueTaskSource
        var writer = new VoidAsyncOperationWithData<T>(runContinuationsAsynchronously: true, cancellationToken);
        writer.Item = item;
        parent._blockedWriters.EnqueueTail(writer);
        return writer.ValueTask;
    }
    else if (parent._mode == BoundedChannelFullMode.DropWrite)
    {
        // Отбрасываем элемент, т.к. очередь полна
    }
    else
    {
        // Удаляем последний/первый элемент в очереди и записываем новый
    }
    
    return default;
}

Полезные ссылки

Надеюсь, теперь стало понятно, как работают IValueTaskSource и почему переawait'ить ValueTask плохая затея.

Если кому-то стала интересна эта тема, то прилагаю полезные ссылки:

Комментарии (1)


  1. AndreyAf
    10.07.2023 04:50

    Читать или писать в сокет можно только одним потоком (только один или читает или пишет).

    а вот и нет, для сокетов эти операции независимы друг от друга, другое дело что протокол который реализуется на сокетах скорей всего синхронный.