И снова здравствуй. Какое-то время назад я писал о другом малоизвестном инструменте для любителей высокой производительности — System.IO.Pipelines. По своей сути, рассматриваемый System.Threading.Channels (в дальнейшем «каналы») построен по похожим принципам, что и Пайплайны, решает ту же задачу — Производитель-Потребитель. Однако имеет в разы более простое апи, которое изящно вольется в любого рода enterprise-код. При этом использует асинхронность без аллокаций и без stack-dive даже в асинхронном случае! (Не всегда, но часто).
Задача Производитель/Потребитель встречается на пути программистов довольно часто и уже не первый десяток лет. Сам Эдсгер Дейкстра приложил руку к решению данной задачи — ему принадлежит идея использования семафоров для синхронизации потоков при организации работы по принципу производитель/потребитель. И хотя ее решение в простейшем виде известно и довольно тривиально, в реальном мире данный шаблон (Производитель/Потребитель) может встречаться в гораздо более усложненном виде. Также современные стандарты программирования накладывают свои отпечатки, код пишется более упрощенно и разбивается для дальнейшего переиспользования. Все делается для понижения порога написания качественного кода и упрощения данного процесса. И рассматриваемое пространство имен — System.Threading.Channels — очередной шаг на пути к этой цели.
Какое-то время назад я рассматривал System.IO.Pipelines. Там требовалось более внимательная работа и глубокое осознание дела, в ход шли Span и Memory, а для эффективной работы требовалось не вызывать очевидных методов (чтобы избежать лишних выделений памяти) и постоянно думать в байтах. Из-за этого программный интерфейс Пайплайнов был нетривиален и не интуитивно понятен.
В System.Threading.Channels пользователю представляется гораздо более простое api для работы. Стоит упомянуть, что несмотря на простоту api, данный инструмент является весьма оптимизированным и на протяжении своей работы вполне вероятно не выделит память. Возможно это благодаря тому, что под капотом повсеместно используется ValueTask, а даже в случае реальной асинхронности используется IValueTaskSource, который переиспользуется для дальнейших операций. Именно в этом заключается весь интерес реализации Каналов.
Каналы являются обобщенными, тип обобщения, как несложно догадаться — тип, экземпляры которого будут производиться и потребляться. Интересно то, что реализация класса Channel, которая помещается в 1 строку (источник github):
Таким образом основной класс каналов параметризован 2 типами — отдельно под канал производитель и канал потребитель. Но для реализованых каналов это не используется.
Для тех, кто знаком с Пайплайнами, общий подход для начала работы покажется знакомым. А именно. Мы создаем 1 центральный класс, из которого вытаскиваем отдельно производителей(ChannelWriter) и потребителей(ChannelReader). Несмотря на названия, стоит помнить, что это именно производитель/потребитель, а не читатель/писатель из еще одной классической одноименной задачи на многопоточность. ChannelReader изменяет состояние общего channel (вытаскивает значение), которое более становится недоступно. А значит он скорее не читает, а потребляет. Но с реализацией мы ознакомимся позже.
Начало работы с каналами начинается с абстрактного класса Channel<T> и статического класса Channel, который создает наиболее подходящую реализацию. Далее из этого общего Channel можно получать ChannelWriter для записи в канал и ChannelReader для потребления из канала. Канал является хранилищем общей информации для ChannelWriter и ChannelReader, так, именно в нем хранятся все данные. А уже логика их записи или потребления рассредоточена в ChannelWriter и ChannelReader, Условно каналы можно разделить на 2 группы — безграничные и ограниченные. Первые более простые по реализации, в них можно писать безгранично (пока память позволяет). Вторые же ограничены неким максимальным значением количества записей.
Здесь вытекает немного разная природа асинхронности. В неограниченных каналах операция записи всегда будет завершаться синхронно, нет ничего, что могло бы остановить от записи в канал. Для ограниченных каналов ситуация иная. При стандартном поведении (которое можно заменить) операция записи будет завершаться синхронно до тех пор пока в канале есть место для новых экземпляров. Как только канал заполнен, операция записи не завершится, пока не освободится место (после того, как потребитель потребил потребляемое). Поэтому здесь операция будет реально асинхронной со сменой потоков и сопутствующими изменениями (или без смены, что будет описано чуть позже).
Поведения читателей по большей части одинаково — если в канале есть что-то, то читатель просто читает это и завершается синхронно. Если ничего нет, то ожидает пока кто-то что-то запишет.
Статический класс Channel содержит 4 метода для создания вышеперечисленных каналов:
При желании можно указать более точные опции для создания канала, которые помогут оптимизировать его под указанные нужды.
UnboundedChannelOptions содержит 3 свойства, значение которых по умолчанию false:
BoundedChannelOptions содержит те же 3 свойства и еще 2 сверху
В зависимости от переданных параметров и вызванного метода будет создана одна из 3 реализаций: SingleConsumerUnboundedChannel, UnboundedChannel, BoundedChannel. Но это не столь важно, ведь пользоваться каналом мы будем через базовый класс Channel<TWrite, TRead>.
У него есть 2 свойства:
А также, 2 оператора неявного приведения типа к ChannelReader<TRead> и ChannelWriter<TWrite>.
Пример начала работы с каналами:
Данные хранятся в очереди. Для 3 типов используются 3 разные очереди — ConcurrentQueue<T>, Deque<T> и SingleProducerSingleConsumerQueue<T>. На этом моменте мне показалось, что я устарел и пропустил кучу новых простейших коллекций. Но спешу огорчить — они не для всех. Помечены internal, так что использовать их не получится. Но если вдруг они понадобятся на проде — их можно найти здесь (SingleProducerConsumerQueue) и здесь (Deque). Реализация последней весьма проста. Советую ознакомится, ее очень быстро можно изучить.
Итак, приступим к изучению непосредственно ChannelReader и ChannelWriter, а также интересных деталей реализации. Они все сводятся к асинхронности без выделений памяти с помощью IValueTaskSource.
При запросе объекта потребителя возвращается одна из реализаций абстрактного класса ChannelReader<T>. Опять же в отличие от Пайплайнов АПИ несложное и методов немного. Достаточно просто знать список методов, чтобы понять, как использовать это на практике.
Методы:
Все аналогично потребителю, так что сразу смотрим методы:
Небольшой пример вышеописанного (для легкого начала ваших собственных экспериментов):
А теперь перейдем к самой интересной части.
В процессе написания и изучения кода, я осознал, что почти ничего интересного в реализации всех этих операций нет. В общем можно описать так — избегание лишних блокировок с помощью конкурентных коллекций и обильное использование ValueTask, который является структурой, что экономит память. Однако спешу напомнить, что не стоит быстрой заменой проходиться по всем файлам на вашей ПЭВМ и заменять все Task на ValueTask. Он имеет смысл только в случаях, когда операция в большинстве случаев завершается синхронно. Ведь, как мы помним, при асинхронности вполне вероятна смена потока, а значит и стек уже будет не тот, что прежде. Да и вообще, истинный профессионал в области производительности знает — не оптимизируй до возникновения проблем.
Радует одно, в профессионалы я себя записывать не буду, а поэтому самое время разобраться, в чем же секрет написания асинхронного кода без выделений памяти, что на первый взгляд звучит слишком хорошо для правды. Но бывает и такое.
Начнем наш путь с истоков — структуры ValueTask, которая была добавлена в .net core 2.0 и дополнена в 2.1. Внутри этой структуры скрывается хитрое поле object _obj. Несложно догадаться, опираясь на говорящее название, что в этом поле может скрываться одна из 3 вещей — null, Task/Task<T> или IValueTaskSource. На самом деле, это вытекает из способов создания ValueTask.
Как заверяет производитель, данную структуру следует использовать лишь очевидно — с ключевым словом await. То есть не следует применять await много раз к одному и тому же ValueTask, использовать комбинаторы, добавлять несколько продолжений и тп. Также не следует получать результат из ValueTask более одного раза. А связано это как раз с тем, что мы пытаемся понять — переиспользованием всего этого добра без выделения памяти.
Я уже упомянул интерфейс IValueTaskSource. Именно он помогает сэкономить память. Делается это с помощью переиспользования самого IValueTaskSource несколько раз для множества задач. Но именно из-за этого переиспользования и нет возможности баловаться с ValueTask.
Итак, IValueTaskSource. Данный интерфейс имеет 3 метода, реализовав которые вы будете успешно экономить память и время на выделении тех заветных байт.
Но несмотря на простой интерфейс, реализация потребует определенной сноровки. И тут можно вспомнить про то, с чего мы начали — Channels. В данной реализации используется класс AsyncOperation, который является реализацией IValueTaskSource. Данный класс скрыт за модификатором доступа internal. Но это не мешает разобраться, в основных механизмах. Напрашивается вопрос, почему не дать реализацию IValueTaskSource в массы? Первая причина (хохмы ради) — когда в руках молоток, повсюду гвозди, когда в руках реализация IValueTaskSource, повсюду неграмотная работа с памятью. Вторая причина (более правдоподобная) — в то время, как интерфейс прост и универсален, реальная реализация оптимальна при использований определенных нюансов применения. И вероятно именно по этой причине можно найти реализации в самых разных частях великого и могучего .net, как то AsyncOperation под капотом каналов, AsyncIOOperation внутри нового API сокетов и тд.
Однако, справедливости ради, есть все же одна общая реализация — ManualResetValueTaskSourceCore. Но это уже слишком далеко от темы статьи.
Довольно популярный метод популярного класса, позволяющий избежать накладных расходов на классические примитивы синхронизации. Думаю, большинство знакомы с ним, но все же стоит описать в 3 словах, ведь данная конструкция используется довольно часто в AsyncOperation.
В массовой литературе данную функцию называют compare and swap (CAS). В .net она доступна в классе Interlocked.
Сигнатура следующая:
Имеются также перегрузи с int, long, float, double, IntPtr, object.
Сам метод атомарный, то бишь выполняется без прерываний. Сравнивает 2 значения и, если они равны, выполняет присваивание нового значения в переменную. Решают проблему, когда нужно проверить значение переменной и в зависимости от него изменить переменную.
Допустим, вы хотите инкрементировать переменную, если ее значение меньше 10.
Далее идут 2 потока.
При использовании данного метода, вы либо изменяете именно то значение, что хотели, либо не изменяете, получив при этом актуальное значение переменной.
location1 — переменная, значение которой мы хотим поменять. Оно сравнивается с comparand, в случае равенства в location1 записывается value. Если операция удалась, то метод вернет прошлое значение переменной location1. Если же нет, то будет возращено актуальное значение location1.
Если говорить чуть глубже, то существует инструкция языка ассемблера cmpxchg, которая выполняет эти действия. Именно она и используется под капотом.
Рассматривая весь этот код я не раз наткнулся на упоминания «Stack Dive». Это очень крутая и интересная штука, которая на самом деле очень нежелательна. Суть в том, что при синхронном выполнении продолжений мы можем исчерпать ресурсы стека.
Допустим, мы имеем 10000 задач, в стиле
Допустим, первая задача завершает выполнение и этим освобождает продолжение второй, которое мы начинаем тут же выполнять синхронно в этом потоке, то есть забирая кусок стека стек фреймом данного продолжения. В свою очередь, данное продолжение разблокирует продолжение третей задачи, которое мы тоже начинаем сразу выполнять. И так далее. Если в продолжении больше нет await'ов или чего-то, что как-то сбросит стек, то мы просто будем потреблять стековое пространство до упора. Что может вызвать StackOverflow и крах приложения. В рассмотрении кода я упомяну, как с этим борется AsyncOperation.
Source code.
Внутри AsyncOperation есть поле _continuation типа Action<object>. Поле используется для, не поверите, продолжений. Но, как это часто бывает в слишком современном коде, у полей появляются дополнительные обязанности (как сборщик мусора и последний бит в ссылке на таблицу методов). Поле _continuation из той же серии. Есть 2 специальных значения, которые могут хранится в этом поле, кроме самого продолжения и null. s_availableSentinel и s_completedSentinel. Данные поля показывают, что операция доступна и завершена соответственно. Доступна она бывает как раз для переиспользования для совершенно асинхронной операции.
Также AsyncOperation реализует IThreadPoolWorkItem с единственным методом — void Execute() => SetCompletionAndInvokeContinuation(). Метод SetCompletionAndInvokeContinuation как раз и занимается выполнением продолжения. И данный метод вызывается либо напрямую в коде AsyncOperation, либо через упомянутый Execute. Ведь типы реализующие IThreadPoolWorkItem можно забрасывать в тред пул как-то вот так ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false).
Метод Execute будет выполнен тред пулом.
Само выполнение продолжения довольно тривиально.
Продолжение _continuation копируется в локальную переменную, на ее место записывается s_completedSentinel — искусственный объект-марионетка (иль часовой, не знаю, как глаголить мне в нашей речи), который указывает, что задача завершена. Ну а далее локальная копия реального продолжения просто выполняется. При наличии ExecutionContext, данные действия постятся в контекст. Никакого секрета тут нет. Этот код может быть вызван как напрямую классом — просто вызвав метод, инкапсулирующий эти действия, так и через интерфейс IThreadPoolWorkItem в тред пуле. Теперь можно догадаться, как работает функция с выполнением продолжений синхронно.
Первый метод интерфейса IValueTaskSource — GetResult (github).
Все просто, он:
Метод TrySetResult (github).
Метод тривиален. — он сохраняет принятый параметр в _result и сигнализирует о завершении, а именно вызывает метод SignalCompleteion, который довольно интересен.
Метод SignalCompletion (github).
В данном методе используется все, о чем мы говорили в начале.
В самом начале, если _continuation == null, мы записываем марионетку s_completedSentinel.
Далее метод можно разделить на 4 блока. Сразу скажу для простоты понимания схемы, 4 блок — просто синхронное выполнение продолжения. То есть тривиальное выполнение продолжения через метод, как я описано в абзаце про IThreadPoolWorkItem.
Второй метод интерфейса IValueTaskSource — GetStatus (github)
Просто как питерская пышка.
Если _continuation != _completedSentinel, то возвращаем ValueTaskSourceStatus.Pending
Если error == null, то возвращаем ValueTaskSourceStatus.Succeeded
Если _error.SourceException is OperationCanceledException, то возвращаем ValueTaskSourceStatus.Canceled
Ну а коль уж до сюда дошли, то возвращаем ValueTaskSourceStatus.Faulted
Третий и последний, но самый сложный метод интерфейса IValueTaskSource — OnCompleted (github)
Метод добавляет продолжение, которое выполняется по завершению.
При необходимости захватывает ExecutionContext и SynchronizationContext.
Далее используется Interlocked.CompareExchange, описанный выше, чтобы сохранить продолжение в поле, сравнивая его с null. Напоминаю, что CompareExchange возвращает актуальное значение переменной.
Если сохранение продолжения прошло, то возвращается значение, которое было в переменной до обновления, то есть null. Это означает, что операция еще не завершилась на момент записи продолжения. И тот, кто ее завершит сам со всем разберется (как мы смотрели выше). И нам нет смысла выполнять какие-то дополнительные действия. И на этом работа метода завершается.
Если сохранить значение не получилось, то есть из CompareExchange вернулось что-то кроме null. В этом случае кто-то успел положить значение в быстрее нас. То есть произошла одна из 2 ситуаций — или задача завершилась быстрее, чем мы до сюда дошли, или была попытка записать более 1 продолжения, что делать нельзя.
Таким образом проверяем возвращенное значение, равно ли оно s_completedSentinel — именно оно было бы записано в случае завершения.
А вот и пример синхронных продолжений:
Output:
Main, before await. Thread id: 1
Created thread for writing with delay, before await write. Thread id: 4
Main, after await (will be processed by created thread for writing). Thread id: 4
Created thread for writing with delay, after await write. Thread id: 4
Оглавление
- Введение. Задача производитель/потребитель
- Начало работы. Channel
- Потребление из канала. ChannelReader
- Запись в канал. ChannelWriter
- Асинхронность без алллокаций
- Интерфейс IValueTaskSource
- Немного про CompareExchange
- Проблема Stack dive
- AsyncOperation — детали реализации
Введение
Задача Производитель/Потребитель встречается на пути программистов довольно часто и уже не первый десяток лет. Сам Эдсгер Дейкстра приложил руку к решению данной задачи — ему принадлежит идея использования семафоров для синхронизации потоков при организации работы по принципу производитель/потребитель. И хотя ее решение в простейшем виде известно и довольно тривиально, в реальном мире данный шаблон (Производитель/Потребитель) может встречаться в гораздо более усложненном виде. Также современные стандарты программирования накладывают свои отпечатки, код пишется более упрощенно и разбивается для дальнейшего переиспользования. Все делается для понижения порога написания качественного кода и упрощения данного процесса. И рассматриваемое пространство имен — System.Threading.Channels — очередной шаг на пути к этой цели.
Какое-то время назад я рассматривал System.IO.Pipelines. Там требовалось более внимательная работа и глубокое осознание дела, в ход шли Span и Memory, а для эффективной работы требовалось не вызывать очевидных методов (чтобы избежать лишних выделений памяти) и постоянно думать в байтах. Из-за этого программный интерфейс Пайплайнов был нетривиален и не интуитивно понятен.
В System.Threading.Channels пользователю представляется гораздо более простое api для работы. Стоит упомянуть, что несмотря на простоту api, данный инструмент является весьма оптимизированным и на протяжении своей работы вполне вероятно не выделит память. Возможно это благодаря тому, что под капотом повсеместно используется ValueTask, а даже в случае реальной асинхронности используется IValueTaskSource, который переиспользуется для дальнейших операций. Именно в этом заключается весь интерес реализации Каналов.
Каналы являются обобщенными, тип обобщения, как несложно догадаться — тип, экземпляры которого будут производиться и потребляться. Интересно то, что реализация класса Channel, которая помещается в 1 строку (источник github):
namespace System.Threading.Channels
{
public abstract class Channel<T> : Channel<T, T> { }
}
Таким образом основной класс каналов параметризован 2 типами — отдельно под канал производитель и канал потребитель. Но для реализованых каналов это не используется.
Для тех, кто знаком с Пайплайнами, общий подход для начала работы покажется знакомым. А именно. Мы создаем 1 центральный класс, из которого вытаскиваем отдельно производителей(ChannelWriter) и потребителей(ChannelReader). Несмотря на названия, стоит помнить, что это именно производитель/потребитель, а не читатель/писатель из еще одной классической одноименной задачи на многопоточность. ChannelReader изменяет состояние общего channel (вытаскивает значение), которое более становится недоступно. А значит он скорее не читает, а потребляет. Но с реализацией мы ознакомимся позже.
Начало работы. Channel
Начало работы с каналами начинается с абстрактного класса Channel<T> и статического класса Channel, который создает наиболее подходящую реализацию. Далее из этого общего Channel можно получать ChannelWriter для записи в канал и ChannelReader для потребления из канала. Канал является хранилищем общей информации для ChannelWriter и ChannelReader, так, именно в нем хранятся все данные. А уже логика их записи или потребления рассредоточена в ChannelWriter и ChannelReader, Условно каналы можно разделить на 2 группы — безграничные и ограниченные. Первые более простые по реализации, в них можно писать безгранично (пока память позволяет). Вторые же ограничены неким максимальным значением количества записей.
Здесь вытекает немного разная природа асинхронности. В неограниченных каналах операция записи всегда будет завершаться синхронно, нет ничего, что могло бы остановить от записи в канал. Для ограниченных каналов ситуация иная. При стандартном поведении (которое можно заменить) операция записи будет завершаться синхронно до тех пор пока в канале есть место для новых экземпляров. Как только канал заполнен, операция записи не завершится, пока не освободится место (после того, как потребитель потребил потребляемое). Поэтому здесь операция будет реально асинхронной со сменой потоков и сопутствующими изменениями (или без смены, что будет описано чуть позже).
Поведения читателей по большей части одинаково — если в канале есть что-то, то читатель просто читает это и завершается синхронно. Если ничего нет, то ожидает пока кто-то что-то запишет.
Статический класс Channel содержит 4 метода для создания вышеперечисленных каналов:
Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);
При желании можно указать более точные опции для создания канала, которые помогут оптимизировать его под указанные нужды.
UnboundedChannelOptions содержит 3 свойства, значение которых по умолчанию false:
- AllowSynchronousContinuations — просто сумасводящая опция, которая позволяет выполнить продолжение асинхронной операции тому, кто ее разблокирует. А теперь по-простому. Допустим, мы писали в заполненный канал. Соответственно, операция прерывается, поток освобождается, а продолжение будет выполнено по завершению на новом потоке из пула. Но если включить эту опцию, продолжение выполнит тот, кто разблокирует операцию, то есть в нашем случае читатель. Это серьезно меняет внутреннее поведение и позволяет более экономно и производительно распоряжаться ресурсами, ведь зачем нам слать какие-то продолжения в какие-то потоки, если мы можем сами его выполнить;
- SingleReader — указывает, что будет использоваться один потребитель. Опять же, это позволяет избавиться от некоторой лишней синхронизации;
- SingleWriter — то же самое, только для писателя;
BoundedChannelOptions содержит те же 3 свойства и еще 2 сверху
- AllowSynchronousContinuations — то же;
- SingleReader — то же;
- SingleWriter — то же;
- Capacity — количество вмещаемых в канал записей. Данный параметр также является параметром конструктора;
- FullMode — перечисление BoundedChannelFullMode, которое имеет 4 опции, определяет поведение при попытке записи в заполненный канал:
- Wait — ожидает освобождения места для завершения асинхронной операции
- DropNewest — записываемый элемент перезаписывает самый новый из существующих, завершается синхронно
- DropOldest — записываемый элемент перезаписывает самый старый из существующих завершается синхронно
- DropWrite — записываемый элемент не записывается, завершается синхронно
В зависимости от переданных параметров и вызванного метода будет создана одна из 3 реализаций: SingleConsumerUnboundedChannel, UnboundedChannel, BoundedChannel. Но это не столь важно, ведь пользоваться каналом мы будем через базовый класс Channel<TWrite, TRead>.
У него есть 2 свойства:
- ChannelReader<TRead> Reader { get; protected set; }
- ChannelWriter<TWrite> Writer { get; protected set; }
А также, 2 оператора неявного приведения типа к ChannelReader<TRead> и ChannelWriter<TWrite>.
Пример начала работы с каналами:
Channel<int> channel = Channel.CreateUnbounded<int>();
//Можно делать так
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader;
//Или так
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;
Данные хранятся в очереди. Для 3 типов используются 3 разные очереди — ConcurrentQueue<T>, Deque<T> и SingleProducerSingleConsumerQueue<T>. На этом моменте мне показалось, что я устарел и пропустил кучу новых простейших коллекций. Но спешу огорчить — они не для всех. Помечены internal, так что использовать их не получится. Но если вдруг они понадобятся на проде — их можно найти здесь (SingleProducerConsumerQueue) и здесь (Deque). Реализация последней весьма проста. Советую ознакомится, ее очень быстро можно изучить.
Итак, приступим к изучению непосредственно ChannelReader и ChannelWriter, а также интересных деталей реализации. Они все сводятся к асинхронности без выделений памяти с помощью IValueTaskSource.
ChannelReader — потребитель
При запросе объекта потребителя возвращается одна из реализаций абстрактного класса ChannelReader<T>. Опять же в отличие от Пайплайнов АПИ несложное и методов немного. Достаточно просто знать список методов, чтобы понять, как использовать это на практике.
Методы:
- Виртуальное get-only свойство Task Completion { get; }
Обьект типа Task, который завершается, когда закрывается канал; - Виртуальное get-only свойство int Count { get; }
Тут следует заострить внимание, что возвращается текущее количество доступных для чтения объектов; - Виртуальное get-only свойство bool CanCount { get; }
Показывает, доступно ли свойство Count; - Абстрактный метод bool TryRead(out T item)
Пытается потребить объект из канала. Возвращает bool, показывающий, получилось ли у него прочитать. Результат помещается в out параметр (или null, если не получилось); - Абстрактный ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
Возвращается ValueTask со значением true, когда в канале появятся доступные для чтения данные, до тех пор задача не завершается. Возвращает ValueTask со значением false, когда канал закрывается(данных для чтения больше не будет); - Виртуальный метод ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
Потребляет значение из канала. Если значение есть, возвращается синхронно. В противном случае асинхронно ждет появления доступных для чтения данных и возвращает их.
У данного метода в абстрактном классе есть реализация, которая основана на методах TryRead и WaitToReadAsync. Если опустить все инфраструктурные нюансы (исключения и cancelation tokens), то логика примерно такая — попытаться прочитать объект с помощью TryRead. Если не удалось, то в цикле while(true) проверять результат метода WaitToReadAsync. Если true, то есть данные есть, вызвать TryRead. Если TryRead получается прочитать, то вернуть результат, в противном случае цикл по новой. Цикл нужен для неудачных попыток чтения — в результате гонки потоков, сразу много потоков могут получить завершение WaitToReadAsync, но объект будет только один, соответственно только один поток сможет прочитать, а остальные уйдут на повторный круг.
Однако данная реализация, как правило, переопределена на что-то более завязанное на внутреннем устройстве.
ChannelWriter — производитель
Все аналогично потребителю, так что сразу смотрим методы:
- Виртуальный метод bool TryComplete(Exception? error = null)
Пытается пометить канал как завершенный, т.е. показать, что в него больше не будет записано данных. В качестве необязательного параметра можно передать исключение, которое вызвало завершение канала. Возвращает true, если удалось завершить, в противном случае false (если канал уже был завершен или не поддерживает завершение); - Абстрактный метод bool TryWrite(T item)
Пытается записать в канал значение. Возвращает true, если удалось и false, если нет - Абстрактный метод ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default)
Возвращает ValueTask со значением true, который завершится, когда в канале появится место для записи. Значение false будет в том случае, если записи в канал более не будут разрешены; - Виртуальный метод ValueTask WriteAsync(T item, CancellationToken cancellationToken = default)
Асинхронно пишет в канал. Например, в случае, если канал заполнен, операция будет реально асинхронной и завершится только после освобождения места под данную запись; - Метод void Complete(Exception? error = null)
Просто пытается пометить канал как завершенный с помощью TryComplete, а в случае неудачи кидает исключение.
Небольшой пример вышеописанного (для легкого начала ваших собственных экспериментов):
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();
//Объекты ниже можно отправить в разные потоки, которые будут использовать их независимо в своих целях
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
//Первый поток может писать в канал
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//И завершить его, при исключении или в случае, когда записал все, что хотел
writer.Complete();
//Второй может читать данные из канала по мере их доступности
int valueFromChannel = await reader.ReadAsync();
А теперь перейдем к самой интересной части.
Асинхронность без алллокаций
В процессе написания и изучения кода, я осознал, что почти ничего интересного в реализации всех этих операций нет. В общем можно описать так — избегание лишних блокировок с помощью конкурентных коллекций и обильное использование ValueTask, который является структурой, что экономит память. Однако спешу напомнить, что не стоит быстрой заменой проходиться по всем файлам на вашей ПЭВМ и заменять все Task на ValueTask. Он имеет смысл только в случаях, когда операция в большинстве случаев завершается синхронно. Ведь, как мы помним, при асинхронности вполне вероятна смена потока, а значит и стек уже будет не тот, что прежде. Да и вообще, истинный профессионал в области производительности знает — не оптимизируй до возникновения проблем.
Радует одно, в профессионалы я себя записывать не буду, а поэтому самое время разобраться, в чем же секрет написания асинхронного кода без выделений памяти, что на первый взгляд звучит слишком хорошо для правды. Но бывает и такое.
Интерфейс IValueTaskSource
Начнем наш путь с истоков — структуры ValueTask, которая была добавлена в .net core 2.0 и дополнена в 2.1. Внутри этой структуры скрывается хитрое поле object _obj. Несложно догадаться, опираясь на говорящее название, что в этом поле может скрываться одна из 3 вещей — null, Task/Task<T> или IValueTaskSource. На самом деле, это вытекает из способов создания ValueTask.
Как заверяет производитель, данную структуру следует использовать лишь очевидно — с ключевым словом await. То есть не следует применять await много раз к одному и тому же ValueTask, использовать комбинаторы, добавлять несколько продолжений и тп. Также не следует получать результат из ValueTask более одного раза. А связано это как раз с тем, что мы пытаемся понять — переиспользованием всего этого добра без выделения памяти.
Я уже упомянул интерфейс IValueTaskSource. Именно он помогает сэкономить память. Делается это с помощью переиспользования самого IValueTaskSource несколько раз для множества задач. Но именно из-за этого переиспользования и нет возможности баловаться с ValueTask.
Итак, IValueTaskSource. Данный интерфейс имеет 3 метода, реализовав которые вы будете успешно экономить память и время на выделении тех заветных байт.
- GetResult – Вызывается единожды, когда в стейт машине, образованной на рантайме для асинхронных методов, понадобится результат. В ValueTask есть метод GetResult, который и вызывает одноименный метод интерфейса, который, как мы помним, может хранится в поле _obj.
- GetStatus – Вызывается стейт машиной для определения состояния операции. Также через ValueTask.
- OnCompleted – Опять же, вызывается стейт машиной для добавления продолжения к невыполненной на тот момент задаче.
Но несмотря на простой интерфейс, реализация потребует определенной сноровки. И тут можно вспомнить про то, с чего мы начали — Channels. В данной реализации используется класс AsyncOperation, который является реализацией IValueTaskSource. Данный класс скрыт за модификатором доступа internal. Но это не мешает разобраться, в основных механизмах. Напрашивается вопрос, почему не дать реализацию IValueTaskSource в массы? Первая причина (хохмы ради) — когда в руках молоток, повсюду гвозди, когда в руках реализация IValueTaskSource, повсюду неграмотная работа с памятью. Вторая причина (более правдоподобная) — в то время, как интерфейс прост и универсален, реальная реализация оптимальна при использований определенных нюансов применения. И вероятно именно по этой причине можно найти реализации в самых разных частях великого и могучего .net, как то AsyncOperation под капотом каналов, AsyncIOOperation внутри нового API сокетов и тд.
Однако, справедливости ради, есть все же одна общая реализация — ManualResetValueTaskSourceCore. Но это уже слишком далеко от темы статьи.
CompareExchange
Довольно популярный метод популярного класса, позволяющий избежать накладных расходов на классические примитивы синхронизации. Думаю, большинство знакомы с ним, но все же стоит описать в 3 словах, ведь данная конструкция используется довольно часто в AsyncOperation.
В массовой литературе данную функцию называют compare and swap (CAS). В .net она доступна в классе Interlocked.
Сигнатура следующая:
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;
Имеются также перегрузи с int, long, float, double, IntPtr, object.
Сам метод атомарный, то бишь выполняется без прерываний. Сравнивает 2 значения и, если они равны, выполняет присваивание нового значения в переменную. Решают проблему, когда нужно проверить значение переменной и в зависимости от него изменить переменную.
Допустим, вы хотите инкрементировать переменную, если ее значение меньше 10.
Далее идут 2 потока.
Поток 1 | Поток 2 |
---|---|
Проверяет значение переменной на некоторое условие (то есть меньше ли оно 10), которое срабатывает | - |
Между проверкой и изменением значения | Присваивает переменной значение, не удовлетворяющее условию (например, 15) |
Изменяет значение, хотя не должен, ведь условие уже не соблюдается | - |
При использовании данного метода, вы либо изменяете именно то значение, что хотели, либо не изменяете, получив при этом актуальное значение переменной.
location1 — переменная, значение которой мы хотим поменять. Оно сравнивается с comparand, в случае равенства в location1 записывается value. Если операция удалась, то метод вернет прошлое значение переменной location1. Если же нет, то будет возращено актуальное значение location1.
Если говорить чуть глубже, то существует инструкция языка ассемблера cmpxchg, которая выполняет эти действия. Именно она и используется под капотом.
Stack dive
Рассматривая весь этот код я не раз наткнулся на упоминания «Stack Dive». Это очень крутая и интересная штука, которая на самом деле очень нежелательна. Суть в том, что при синхронном выполнении продолжений мы можем исчерпать ресурсы стека.
Допустим, мы имеем 10000 задач, в стиле
//code1
await ...
//code2
Допустим, первая задача завершает выполнение и этим освобождает продолжение второй, которое мы начинаем тут же выполнять синхронно в этом потоке, то есть забирая кусок стека стек фреймом данного продолжения. В свою очередь, данное продолжение разблокирует продолжение третей задачи, которое мы тоже начинаем сразу выполнять. И так далее. Если в продолжении больше нет await'ов или чего-то, что как-то сбросит стек, то мы просто будем потреблять стековое пространство до упора. Что может вызвать StackOverflow и крах приложения. В рассмотрении кода я упомяну, как с этим борется AsyncOperation.
AsyncOperation как реализация IValueTaskSource
Source code.
Внутри AsyncOperation есть поле _continuation типа Action<object>. Поле используется для, не поверите, продолжений. Но, как это часто бывает в слишком современном коде, у полей появляются дополнительные обязанности (как сборщик мусора и последний бит в ссылке на таблицу методов). Поле _continuation из той же серии. Есть 2 специальных значения, которые могут хранится в этом поле, кроме самого продолжения и null. s_availableSentinel и s_completedSentinel. Данные поля показывают, что операция доступна и завершена соответственно. Доступна она бывает как раз для переиспользования для совершенно асинхронной операции.
Также AsyncOperation реализует IThreadPoolWorkItem с единственным методом — void Execute() => SetCompletionAndInvokeContinuation(). Метод SetCompletionAndInvokeContinuation как раз и занимается выполнением продолжения. И данный метод вызывается либо напрямую в коде AsyncOperation, либо через упомянутый Execute. Ведь типы реализующие IThreadPoolWorkItem можно забрасывать в тред пул как-то вот так ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false).
Метод Execute будет выполнен тред пулом.
Само выполнение продолжения довольно тривиально.
Продолжение _continuation копируется в локальную переменную, на ее место записывается s_completedSentinel — искусственный объект-марионетка (иль часовой, не знаю, как глаголить мне в нашей речи), который указывает, что задача завершена. Ну а далее локальная копия реального продолжения просто выполняется. При наличии ExecutionContext, данные действия постятся в контекст. Никакого секрета тут нет. Этот код может быть вызван как напрямую классом — просто вызвав метод, инкапсулирующий эти действия, так и через интерфейс IThreadPoolWorkItem в тред пуле. Теперь можно догадаться, как работает функция с выполнением продолжений синхронно.
Первый метод интерфейса IValueTaskSource — GetResult (github).
Все просто, он:
- Инкрементирует _currentId.
_currentId — то, что идентифицирует конкретную операцию. После инкремента она уже не будет ассоциирована с этой операцией. Поэтому не следует получать результат дважды и тп; - помещает в _continuation делегат-марионетку s_availableSentinel. Как было упомянуто, это показывает, что этот экземпляр AsyncOperation можно испоьзовать повторно и не выделять лишней памяти. Делается это не всегда, а лишь если это было разрешено в конструкторе (pooled = true);
- Возвращает поле _result.
Поле _result просто устанавливается в методе TrySetResult который описан ниже.
Метод TrySetResult (github).
Метод тривиален. — он сохраняет принятый параметр в _result и сигнализирует о завершении, а именно вызывает метод SignalCompleteion, который довольно интересен.
Метод SignalCompletion (github).
В данном методе используется все, о чем мы говорили в начале.
В самом начале, если _continuation == null, мы записываем марионетку s_completedSentinel.
Далее метод можно разделить на 4 блока. Сразу скажу для простоты понимания схемы, 4 блок — просто синхронное выполнение продолжения. То есть тривиальное выполнение продолжения через метод, как я описано в абзаце про IThreadPoolWorkItem.
- Если _schedulingContext == null, т.е. нет захваченного контекста (это первый if).
Далее необходимо проверить _runContinuationsAsynchronously == true, то есть явно указано, что продолжения нужно выполнять как все привыкли — асинхронно (вложенный if).
При соблюдении данный условий в бой идет схема с IThreadPoolWorkItem описанная выше. То есть AsyncOperation добавляется в очередь на выполнение потоком тред пула. И выходим из метода.
Следует обратить внимание, что если первый if прошел (что будет очень часто, особенно в коре), а второй нет, то мы не попадем в 2 или 3 блок, а спустимся сразу на синхронное выполнение продолжения — т.е. 4 блок; - Если _schedulingContext is SynchronizationContext, то есть захвачен контекст синхронизации (это первый if).
По аналогии мы проверяем _runContinuationsAsynchronously = true. Но этого не достаточно. Необходимо еще проверить, контекст потока, на котором мы сейчас находимся. Если он отличен от захваченного, то мы тоже не можем просто выполнить продолжение. Поэтому если одно из этих 2 условий выполнено, мы отправляем продолжение в контекст знакомым способом:
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
И выходим из метода. опять же, если первая проверка прошла, а остальные нет (то есть мы сейчас находимся на том же контексте, что и был захвачен), мы попадем сразу на 4 блок — синхронное выполнение продолжения; - Выполняется, если мы не зашли в первые 2 блока. Но стоит расшифровать это условие.
Хитрость в том, что _schedulingContext может быть на самом деле захваченным TaskScheduler, а не непосредственно контекстом. В этом случае мы поступаем также, как и в блоке 2, т.е. проверяем флаг _runContinuationsAsynchronously = true и TaskScheduler текущего потока. Если планировщик не совпадает или флаг не тот, то мы сетапим продолжение через Task.Factory.StartNew и передаем туда этот планировщик. И выходим из метода. - Как и сказал в начале — просто выполняем продолжение на текущем потоке. Раз мы до сюда дошли, то все условия для этого соблюдены.
Второй метод интерфейса IValueTaskSource — GetStatus (github)
Просто как питерская пышка.
Если _continuation != _completedSentinel, то возвращаем ValueTaskSourceStatus.Pending
Если error == null, то возвращаем ValueTaskSourceStatus.Succeeded
Если _error.SourceException is OperationCanceledException, то возвращаем ValueTaskSourceStatus.Canceled
Ну а коль уж до сюда дошли, то возвращаем ValueTaskSourceStatus.Faulted
Третий и последний, но самый сложный метод интерфейса IValueTaskSource — OnCompleted (github)
Метод добавляет продолжение, которое выполняется по завершению.
При необходимости захватывает ExecutionContext и SynchronizationContext.
Далее используется Interlocked.CompareExchange, описанный выше, чтобы сохранить продолжение в поле, сравнивая его с null. Напоминаю, что CompareExchange возвращает актуальное значение переменной.
Если сохранение продолжения прошло, то возвращается значение, которое было в переменной до обновления, то есть null. Это означает, что операция еще не завершилась на момент записи продолжения. И тот, кто ее завершит сам со всем разберется (как мы смотрели выше). И нам нет смысла выполнять какие-то дополнительные действия. И на этом работа метода завершается.
Если сохранить значение не получилось, то есть из CompareExchange вернулось что-то кроме null. В этом случае кто-то успел положить значение в быстрее нас. То есть произошла одна из 2 ситуаций — или задача завершилась быстрее, чем мы до сюда дошли, или была попытка записать более 1 продолжения, что делать нельзя.
Таким образом проверяем возвращенное значение, равно ли оно s_completedSentinel — именно оно было бы записано в случае завершения.
- Если это не s_completedSentinel, то нас использовали не по плану — попытались добавить более одного продолжения. То есть то, которое уже записано, и то, которое пишем мы. А это исключительная ситуация;
- Если это s_completedSentinel, то это один из допустимых исходов, операция уже завершена и продолжение должны вызвать мы, здесь и сейчас. И оно будет выполнено асинхронно в любом случае, даже если _runContinuationsAsynchronously = false.
Сделано это так, потому что если мы дошли до этого места, значит мы внутри метода OnCompleted, внутри awaiter'а. А синхронное выполнение продолжений именно здесь грозит упомянутым стек дайвом. Сейчас вспомним, для чего нам нужна эта AsyncOperation — System.Threading.Channels. А там ситуация может быть очень легко достигнута, если о ней не задуматься. Допустим, мы читатель в ограниченном канале. Мы читаем элемент и разблокируем писателя, выполняем его продолжение синхронно, что разблокирует очередного читателя(если читатель очень быстр или их несколько) и так далее. Тут стоит осознать тонкий момент, что именно внутри awaiter'а возможна эта ситуация, в других случаях продолжение выполнится и завершится, что освободит занятый стек фрейм. А постоянный зацеп новых продолжений вглубь стека порождается постоянным выполнением продолжения внутри awaiter'а.
В целях избежания данной ситуации, несмотря ни на что необходимо запустить продолжение асинхронно. Выполняется по тем же схемам, что и первые 3 блока в методе SignalCompleteion — просто в пуле, на контексте или через фабрику и планировщик
А вот и пример синхронных продолжений:
class Program
{
static async Task Main(string[] args)
{
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = true
});
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");
var writerTask = Task.Run(async () =>
{
Thread.Sleep(500);
int objectToWriteInChannel = 555;
Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writer.WriteAsync(objectToWriteInChannel);
Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
});
//Blocked here because there are no items in channel
int valueFromChannel = await reader.ReadAsync();
Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writerTask;
Console.Read();
}
}
Output:
Main, before await. Thread id: 1
Created thread for writing with delay, before await write. Thread id: 4
Main, after await (will be processed by created thread for writing). Thread id: 4
Created thread for writing with delay, after await write. Thread id: 4
win32nipuh
Спасибо, интересно, хоть и, как выясняется — не совсем ново. Еще в 2018 году были анонсы.
Вопрос по теме: сделал тестовое приложение на винформс, в форме1: создается канал, стартует читатель и ждет данных, открывается форма2(параметр — созданный канал), открывается писатель и начинает писать, читатель получает, вычитывает все данные, выходит из функции.
Но, когда я при открытых формах пытаюсь повторно начать писать в тот же канал — получаю ошибку, что канал закрыт. Как быть в таком случае, когда через паузу снова появились данные для записи? Создавать новый канал?
mayorovp
Правильнее не закрывать канал.
win32nipuh
Я понимаю, что где-то мой пробой, но хотелось бы выяснить для себя:
Вот мои писатель и читатель в разных формах, где закрытие и как избежать?
mayorovp
Вот тут закрытие:
writer.Complete();
. Очевидно, что после Complete в канал ничего записать уже нельзя, для того этот метод и сделан. Как сообщает Капитан Очевидность, если в канал планируется записывать что-то ещё — не надо вызывать Complete.Кстати, делать WaitToReadAsync + TryRead в нескольких потоках — не лучшая идея, это ведет к Thundering herd problem, в таких случаях лучше вызывать простой ReadAsync.
win32nipuh
Спасибо, понял, в чем ошибка.
Я хочу приспособить канал в приложении, которое читает данные из файловой системы (список файлов, фолдеров, может быть много), по ходу чтения в цикле проверяет существует ли файл, его размер и пишет в базу.
Хочу сделать так: приложение не пишет сразу в базу по одной записи на файл, а пишет в канал, на другом конце читатель вычитывает и пишет в базу.