Эта заметка о том, зачем нам нужен пакет coroutine для Go и как он будет выглядеть. Но прежде всего, что такое корутины?

Сегодня каждый программист знаком с вызовами функций (подпрограмм): F вызывает G, которая останавливает F и запускает G. G выполняет свою работу, потенциально вызывая и ожидая другие функции, и в конце концов возвращается. Когда G возвращается, G уже нет, а F продолжает работать. В этой схеме одновременно выполняется только одна функция, а ее вызывающие ожидают, поднимаясь вверх по стеку вызовов.

В отличие от подпрограмм, корутины выполняются конкурентно на разных стеках, но все равно верно, что одновременно выполняется только одна функция, а ее вызывающая сторона ждет. F запускает G, но G не выполняется немедленно. Вместо этого F должен явно возобновить (resume) выполнение G, который затем начинает выполняться. В любой момент G может развернуться и вернуться (yield) назад к F. Это приостановит G и продолжит F операцию возобновления. В конце концов F снова вызывает resume, который приостанавливает F и продолжает G после выхода. И так далее, туда-сюда, пока G не вернется, что приведет к очистке G и продолжению F с последней операции возобновления, с некоторым сигналом для F, что G закончена и F больше не должна пытаться возобновить G. В этом паттерне одновременно выполняется только одна корутина, а ее вызывающая сторона ждет на другом стеке. Они выполняются по очереди в четко определенном, скоординированном порядке.

Это несколько абстрактно. Давайте посмотрим на реальные программы.

Корутины в Lua

В качестве примера можно привести сравнение двух двоичных деревьев на предмет наличия в них одинаковой последовательности значений, даже если их структуры различны. Например, здесь приведен код на языке Lua 5 для генерации некоторых двоичных деревьев:

function T(l, v, r)
  return {left = l, value = v, right = r}
end

e = nil
t1 = T(T(T(e, 1, e), 2, T(e, 3, e)), 4, T(e, 5, e))
t2 = T(e, 1, T(e, 2, T(e, 3, T(e, 4, T(e, 5, e)))))
t3 = T(e, 1, T(e, 2, T(e, 3, T(e, 4, T(e, 6, e)))))

Деревья t1 и t2 содержат значения 1, 2, 3, 4, 5; t3 содержит 1, 2, 3, 4, 6.

Мы можем написать корутину для обхода дерева и получения каждого значения:

function visit(t)
  if t ~= nil then  -- note: ~= is "not equal"
    visit(t.left)
    coroutine.yield(t.value)
    visit(t.right)
  end
end

Тогда для сравнения двух деревьев мы можем создать две корутины visit и попеременно считывать и сравнивать последовательные значения:

function cmp(t1, t2)
  co1 = coroutine.create(visit)
  co2 = coroutine.create(visit)
  while true
  do
      ok1, v1 = coroutine.resume(co1, t1)
      ok2, v2 = coroutine.resume(co2, t2)
      if ok1 ~= ok2 or v1 ~= v2 then
          return false
      end
      if not ok1 and not ok2 then
          return true
      end
  end
end

Аргументы t1 и t2 в coroutine.resume используются только на первой итерации в качестве аргумента visit. Последующие возобновления возвращают это значение из coroutine.yield, но в коде оно игнорируется.

Более идиоматичная версия на языке Lua предполагает использование функции coroutine.wrap, которая возвращает функцию, скрывающую объект coroutine:

function cmp(t1, t2)
  next1 = coroutine.wrap(function() visit(t1) end)
  next2 = coroutine.wrap(function() visit(t2) end)
  while true
  do
      v1 = next1()
      v2 = next2()
      if v1 ~= v2 then
          return false
      end
      if v1 == nil and v2 == nil then
          return true
      end
  end
end

После завершения работы корутины, next функция возвращает nil (полный код).

Генераторы в Python (итераторы в CLU)

Python предоставляет генераторы, которые очень похожи на корутины Lua, но они не являются корутинами, поэтому стоит указать на различия. Главное отличие заключается в том, что "очевидные" программы не работают. Например, вот прямой перевод нашего дерева Lua и посетителя в Python:

def T(l, v, r):
  return {'left': l, 'value': v, 'right': r}

def visit(t):
  if t is not None:
    visit(t['left'])
    yield t['value']
    visit(t['right'])

Но этот очевидный перевод не работает:

>>> e = None
>>> t1 = T(T(T(e, 1, e), 2, T(e, 3, e)), 4, T(e, 5, e))
>>> for x in visit(t1):
...     print(x)
...
4
>>>

Мы потеряли 1, 2, 3 и 5. Что произошло?

В Python этот def visit не определяет обычную функцию. Поскольку тело содержит оператор yield, результатом является генератор:

>>> type(visit(t1))
<class 'generator'>
>>>

Вызов visit(t['left']) вообще не выполняет код в visit. Он только создает и возвращает новый генератор, который затем отбрасывается. Чтобы избежать отбрасывания этих результатов, необходимо выполнить цикл по генератору и повторно выдать их:

def visit(t):
  if t is not None:
    for x in visit(t['left']):
      yield x
    yield t['value']
    for x in visit(t['right'])
      yield x

В Python 3.3 появилась функция yield from, позволяющая:

def visit(t):
  if t is not None:
    yield from visit(t['left']):
    yield t['value']
    yield from visit(t['right'])

Объект генератора содержит состояние единственного вызова visit, то есть значения локальных переменных и то, какая строка выполняется. Это состояние выталкивается в стек вызовов при каждом возобновлении работы генератора и затем заталкивается обратно в объект генератора при каждом выходе (yield), который может произойти только в самом верхнем кадре вызова. Таким образом, генератор использует тот же стек, что и исходная программа, избегая необходимости в полной реализации корутин, но вводя вместо этого непонятные ограничения.

Генераторы в Python, похоже, почти полностью скопированы с CLU, который был пионером этой абстракции (и многих других вещей), хотя в CLU они называются итераторами, а не генераторами. Итератор дерева CLU выглядит следующим образом:

    visit = iter (t: cvt) yields (int):
    tagcase t
        tag empty: ;
        tag non_empty(t: node):
            for x: int
                in tree$visit(t.left) do
                    yield(x);
                    end;
            yield(t.value);
            for x: int
                in tree$visit(t.right) do
                    yield(x);
                    end;
        end;
    end visit;

Синтаксис отличается, особенно tagcase, который рассматривает представление дерева в виде тегированного объединения, но основная структура, включая вложенные циклы for, точно такая же, как и в нашей первой рабочей версии на Python. Кроме того, поскольку CLU был статически типизирован, visit четко обозначен как итератор (iter), а не как функция (proc в CLU). Благодаря этой информации о типе, неправильное использование visit в качестве обычного вызова функции, как в нашем глючном примере на Python, компилятор может диагностировать (и, как я полагаю, диагностировал).

О реализации CLU первоначальные разработчики писали: "Итераторы - это разновидность корутин; однако их использование достаточно ограничено, и они реализуются с использованием только стека программы. Поэтому использование итератора лишь немного дороже, чем использование процедуры". Это звучит точно так же, как объяснение, которое я дал выше для генераторов Python. Подробнее об этом см. в работе Барбары Лисков и др. 1977 года "Механизмы абстракции в CLU", в частности, разделы 4.2, 4.3 и 6.

Корутины, потоки и генераторы

На первый взгляд, корутины, потоки и генераторы выглядят одинаково. Все они обеспечивают конкурентность в той или иной форме, но имеют существенные отличия.

  • Корутины обеспечивают конкурентность без параллелизма: когда выполняется одна корутина, возобновляющая ее или уступающая ей не выполняется. Поскольку корутины выполняются по очереди и переключаются только в определенных точках программы, корутины могут обмениваться данными между собой без гонок. Явные переключения (coroutine.resume в первом примере Lua или вызов функции next во втором примере Lua) служат точками синхронизации, создавая happens-before edges. Поскольку планирование осуществляется в явном виде (без какого-либо прерывания) и полностью без участия операционной системы, переключение корутины занимает не более десяти наносекунд, а обычно и того меньше. Запуск и завершение работы также значительно дешевле, чем у потоков.

  • Потоки предоставляют больше возможностей, чем корутины, но и обходятся дороже. Дополнительная мощность - это параллелизм, а стоимость - это накладные расходы на планирование, включая более дорогие контекстные переключения и необходимость добавления преимуществ в той или иной форме. Обычно операционная система предоставляет потоки, и переключение потоков занимает несколько микросекунд. Для данной таксономии горутины Go являются дешевыми потоками: переключение горутины занимает ближе к нескольким сотням наносекунд, поскольку среда выполнения Go берет на себя часть работы по планированию, но горутины все равно обеспечивают полный параллелизм и вытеснение потоков. (Новые облегченные потоки Java по сути являются тем же самым, что и горутины).

  • Генераторы обеспечивают меньшую мощность, чем корутины, поскольку в корутине разрешен выход (yield) только самого верхнего кадра. Этот кадр перемещается туда-сюда между объектом и стеком вызовов для его приостановки и возобновления.

Корутины являются полезным строительным блоком для написания программ, которым нужна конкурентность для структурирования программы, но не для параллелизма. Подробный пример такого подхода приведен в моей предыдущей заметке "Хранилище данных в потоке управления". Другие примеры см. в статье Аны Лусии Де Моуры и Роберто Иерусалимши "Пересмотр корутин", опубликованной в 2009 году. Оригинальный пример приведен в работе Мелвина Конвея "Design of a Separable Transition-Diagram Compiler", опубликованной в 1963 году.

Зачем нужны корутины в Go?

Корутины - это паттерн конкурентности, который не обслуживается напрямую существующими библиотеками конкурентности Go. Горутины часто достаточно близки, но, как мы видели, это не одно и то же, и иногда это различие имеет значение.

Например, в докладе Роба Пайка "Лексическое сканирование в Go", опубликованном в 2011 году, представлены оригинальные лексер и парсер для пакета text/template. Они выполнялись в отдельных горутинах, соединенных каналом, несовершенно имитируя пару корутин: лексер и парсер работали параллельно, причем лексер просматривал следующую лексему, а парсер обрабатывал последнюю. Генераторов было бы недостаточно - лексер выдает значения из множества различных функций, но полноценные горутины оказались слишком сложными. Параллелизм, обеспечиваемый горутинами, вызвал гонки и в конечном итоге привел к отказу от этой конструкции в пользу хранилища состояния лексера в объекте, что было более точной имитацией корутин. Правильные корутины позволили бы избежать гонок и были бы более эффективны, чем горутины.

Предполагается, что в будущем корутины в Go будут использоваться для итераций над общими коллекциями. Мы обсуждали возможность добавления в Go поддержки ranging over functions, что побудило бы авторов коллекций и других абстракций предоставлять CLU-подобные функции-итераторы. Итераторы можно реализовать уже сегодня, используя значения функций, без каких-либо изменений в языке. Например, несколько упрощенный итератор деревьев в Go может выглядеть следующим образом:

func (t *Tree[V]) All(yield func(v V)) {
	if t != nil {
		t.left.All(yield)
		yield(t.value)
		t.right.All(yield)
	}
}

Сегодня этот итератор может быть вызван как:

t.All(func(v V) {
	fmt.Println(v)
})

и, возможно, его вариант может быть вызван в будущей версии Go как:

for v := range t.All {
	fmt.Println(v)
}

Иногда, однако, мы хотим выполнить итерацию по коллекции таким образом, что это не укладывается в рамки одного цикла for. Примером может служить сравнение бинарных деревьев: две итерации должны быть как-то чередованы. Как мы уже видели, ответ на этот вопрос дают корутины, позволяющие превратить функцию типа (*Tree).All (итератор "push") в функцию, возвращающую поток значений, по одному за вызов (итератор "pull").

Как реализовать корутины в Go

Если мы хотим добавить корутины в Go, мы должны стремиться сделать это без изменения языка. Это означает, что определение корутин должно быть доступно для реализации и понимания в терминах обычного кода Go. Позже я выскажусь за оптимизированную реализацию, предоставляемую непосредственно средой выполнения, но эта реализация должна быть неотличима от чистого определения Go.

Начнем с очень простой версии, в которой операция yield полностью игнорируется. Она просто запускает функцию в другой горутине:

package coro

func New[In, Out any](f func(In) Out) (resume func(In) Out) {
	cin := make(chan In)
	cout := make(chan Out)
	resume = func(in In) Out {
		cin <- in
		return <-cout
	}
	go func() { cout <- f(<-cin) }()
	return resume
}

New принимает функцию f, которая должна иметь один аргумент и один результат. New выделяет каналы, определяет resume, создает горутину для выполнения f и возвращает функцию resume. Новая горутина блокируется на <-cin, поэтому возможности для параллелизма отсутствуют. Функция resume разблокирует новую горутину, посылая значение in, а затем блокирует получение значения out. Эта пара "посылка-получение" образует корутинный переключатель. Мы можем использовать coro.New следующим образом (полный код):

func main() {
  resume := coro.New(strings.ToUpper)
  fmt.Println(resume("hello world"))
}

Пока что coro.New - это просто неуклюжий способ вызова функции. Нам необходимо добавить yield, который мы можем передать в качестве аргумента в f:

func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) Out) {

	cin := make(chan In)
	cout := make(chan Out)
	resume = func(in In) Out {
		cin <- in
		return <-cout
	}
	yield := func(out Out) In {
		cout <- out
		return <-cin
	}
	go func() { cout <- f(<-cin, yield) }()
	return resume
}

Заметим, что параллелизма здесь все равно нет: yield - это еще одна пара send-receive. Эти горутины ограничены коммуникационным паттерном и действуют неотличимо от корутин.

Пример: Парсер строк

Прежде чем перейти к преобразованию итераторов, рассмотрим несколько более простых примеров. В разделе "Хранилище данных в потоке управления" мы рассматривали проблему взятия функции

func parseQuoted(read func() byte) bool

и запустить ее в отдельном потоке управления, чтобы байты по одному передавались в метод Write. Вместо специальной канальной реализации, приведенной в этом посте, мы можем использовать:

type parser struct {
	resume func(byte) Status
}

func (p *parser) Init() {
	coparse := func(_ byte, yield func(Status) byte) Status {
		read := func() byte { return yield(NeedMoreInput) }
		if !parseQuoted(read) {
			return BadInput
		}
		return Success
	}
	p.resume = coro.New(coparse)
	p.resume(0)
}

func (p *parser) Write(c byte) Status {
	return p.resume(c)
}

Функция Init выполняет всю работу, и даже не очень. Она определяет функцию coparse, которая имеет сигнатуру, необходимую для coro.New, что означает добавление отбрасываемого входа типа byte. Эта функция определяет read, которое выдает NeedMoreInput, а затем возвращает байт, предоставленный вызывающей стороной. Затем выполняется parseQuoted(read), преобразующая bool-результат в обычный код состояния. Создав с помощью coro.New корутину для coparse, Init вызывает p.resume(0), чтобы позволить coparse перейти к первому чтению в parseQuoted. Наконец, метод Write представляет собой тривиальную обертку вокруг p.resume (полный код).

Эта установка абстрагирует нас от пары каналов, которые мы поддерживали вручную в предыдущем посте, позволяя работать на более высоком уровне при написании программы.

Пример: Простое сито

В качестве несколько более крупного примера рассмотрим Doug McIlroy's concurrent prime sieve. Он состоит из конвейера корутин, по одному для каждого простого числа p, каждая из которых выполняется:

loop:
    n = get a number from left neighbor
    if (p does not divide n)
        pass n to right neighbor

Счетная корутина, расположенная в левой части конвейера, передает в левый конец конвейера числа 2, 3, 4, .... Корутины печати, расположенные в крайней правой части, могут считывать простые числа, печатать их и создавать новые корутины фильтрации. Первый фильтр в конвейере удаляет кратные 2, следующий - кратные 3, следующий - кратные 5 и так далее.

Созданный нами примитив coro.New позволяет взять прямой цикл, выдающий значения, и преобразовать его в функцию, которая может быть вызвана для получения каждого значения по очереди. Вот счетчик:

func counter() func(bool) int {
	return coro.New(func(more bool, yield func(int) bool) int {
		for i := 2; more; i++ {
			more = yield(i)
		}
		return 0
	})
}

Логика счетчика - это литерал функции, передаваемый в New. Он принимает yield функцию типа func(int) bool. Код выдает значение, передавая его в yield, а затем получает обратно булево значение, говорящее о том, следует ли продолжать генерировать новые числа. При получении команды на остановку, либо потому, что при входе значение more было false, либо потому, что вызов yield вернул false, цикл завершается. Он возвращает конечное, игнорируемое значение, удовлетворяющее типу функции, требуемому New.

New превращает это в функцию цикла, обратную yield: func(bool) int, которая может быть вызвана с true для получения следующего значения или с false для остановки генератора. Корутина фильтрации лишь немного сложнее:

func filter(p int, next func(bool) int) (filtered func(bool) int) {
	return coro.New(func(more bool, yield func(int) bool) int {
		for more {
			n := next(true)
			if n%p != 0 {
				more = yield(n)
			}
		}
		return next(false)
	})
}

Она принимает простое p и функцию next, подключенную к корутине слева, а затем возвращает отфильтрованный выходной поток для подключения к корутине справа.

Наконец, у нас есть корутина печати:

func main() {
	next := counter()
	for i := 0; i < 10; i++ {
		p := next(true)
		fmt.Println(p)
		next = filter(p, next)
	}
	next(false)
}

Начиная со счетчика, main сохраняет в next вывод построенного на данный момент конвейера. Затем выполняется цикл: чтение простого числа p, печать p, а затем добавление нового фильтра на правом конце конвейера для удаления кратных p (полный код).

Обратите внимание, что отношения вызова между корутинами могут меняться со временем: любая корутина C может вызвать функцию next другой корутины D и стать корутиной, к которой D передает управление. Первый yield счетчика идет к main, в то время как его последующие yield идут ко 2-ому фильтру. Аналогично, каждый p-фильтр передает свой первый вывод (следующее простое число) в main, в то время как его последующие yield идут к фильтру для этого следующего простого числа.

Корутины и горутины

В определенном смысле неправильно называть эти потоки управления корутинами. Это полноценные горутины, и они могут делать все, что может обычная горутина, включая блок ожидания мьютексов, каналов, системных вызовов и т.д. Что делает coro.New, так это создает горутины с доступом к операциям переключения корутин внутри функций yield и resume (которые сито называет next). Возможность использования этих операций может быть даже передана разным горутинам, что и происходит при передаче main каждого своего next потока каждой следующей горутине filter. В отличие от оператора go, coro.New добавляет новую конкурентность в программу без нового параллелизма. Горутина, которую создает coro.New(f), может выполняться только тогда, когда какая-либо другая горутина явно одолжит ей возможность выполнения с помощью resume; этот заем возвращается с помощью yield или возврата f. Если у вас есть только одна главная горутина и выполняется 10 операторов go, то все 11 горутин могут быть запущены одновременно. Напротив, если у вас есть одна главная горутина и выполняется 10 вызовов coro.New, то теперь есть 11 потоков управления, но параллельность программы остается прежней: одновременно выполняется только один. То, какие именно горутины приостанавливаются в операциях корутин, может меняться по мере выполнения программы, но параллелизм при этом не увеличивается.

Короче говоря, go создает новый конкурентный параллельный поток управления, а coro.New - новый конкурентный непараллельный поток управления. Удобно продолжать говорить о непараллельных потоках управления как о корутинах, но следует помнить, что то, какие именно корутины являются "непараллельными", может меняться в процессе выполнения программы, точно так же, как может меняться в процессе выполнения программы то, какие горутины принимают или отправляют из каналов.

Надежные резюме

Есть несколько улучшений, которые мы можем внести в coro.New, чтобы он лучше работал в реальных программах. Первое - разрешить вызывать resume после завершения работы функции: сейчас это приводит к deadlock-у. Добавим bool-результат, указывающий, был ли результат resume получен в результате yield. Реализация coro.New, которую мы имеем на данный момент, выглядит следующим образом:

func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) Out) {
	cin := make(chan In)
	cout := make(chan Out)
	resume = func(in In) Out {
		cin <- in
		return <-cout
	}
	yield := func(out Out) In {
		cout <- out
		return <-cin
	}
	go func() {
		cout <- f(<-cin, yield)
	}()
	return resume
}

Чтобы добавить этот дополнительный результат, нам нужно отследить, выполняется ли f, и вернуть этот результат из resume:

func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool)) {
	cin := make(chan In)
	cout := make(chan Out)
	running := true
	resume = func(in In) (out Out, ok bool) {
		if !running {
			return
		}
		cin <- in
		out = <-cout
		return out, running
	}
	yield := func(out Out) In {
		cout <- out
		return <-cin
	}
	go func() {
		out := f(<-cin, yield)
		running = false
		cout <- out
	}()
	return resume
}

Обратите внимание, что поскольку resume может выполняться только тогда, когда вызывающая горутина заблокирована, и наоборот, совместное использование переменной running не является гонкой. Они синхронизируются, выполняясь по очереди. Если resume вызывается после выхода горутины, то resume возвращает нулевое значение и false.

Теперь мы можем определить, когда горутина завершила свою работу (полный код):

func main() {
	resume := coro.New(func(_ int, yield func(string) int) string {
		yield("hello")
		yield("world")
		return "done"
	})
	for i := 0; i < 4; i++ {
		s, ok := resume(0)
		fmt.Printf("%q %v\n", s, ok)
	}
}
$ go run cohello.go
"hello" true
"world" true
"done" false
"" false
$

Пример: Преобразование итератора

В примере с простым ситом было показано прямое использование coro.New, но аргумент more bool был несколько неудобен и не соответствовал рассмотренным ранее функциям итераторов. Рассмотрим преобразование любого push-итератора в pull-итератор с помощью coro.New. Нам понадобится способ завершить работу корутины, выполняющей push-итератор, если мы хотим остановиться раньше, поэтому мы добавим в yield булевый результат, указывающий, стоит ли продолжать, как в простом сите:

push func(yield func(V) bool)

Цель новой функции coro.Pull - превратить эту push-функцию в pull-итератор. Итератор будет возвращать следующее значение и булево число, указывающее, закончилась ли итерация, подобно приему канала или просмотру карты:

pull func() (V, bool)

Если мы хотим остановить push-итерацию раньше времени, нам нужно как-то сигнализировать об этом, поэтому Pull будет возвращать не только pull-функцию, но и stop-функцию:

stop func()

Если сложить эти функции вместе, то полная сигнатура Pull будет выглядеть следующим образом:

func Pull[V any](push func(yield func(V) bool)) (pull func() (V, bool), stop func()) {
    ...
}

Первое, что необходимо сделать Pull, - это запустить корутину для выполнения итератора push, а для этого нужна функция-обертка с нужным типом, а именно функция, которая принимает more bool, чтобы соответствовать результату bool из yield, и возвращает конечное значение V. Функция pull может вызывать resume(true), а функция stop - resume(false):

func Pull[V any](push func(yield func(V) bool)) (pull func() (V, bool), stop func()) {
	copush := func(more bool, yield func(V) bool) V {
		if more {
			push(yield)
		}
		var zero V
		return zero
	}
	resume := coro.New(copush)
	pull = func() (V, bool) {
		return resume(true)
	}
	stop = func() {
		resume(false)
	}
	return pull, stop
}

Вот и вся реализация. Благодаря возможностям coro.New нам потребовалось совсем немного кода и усилий для создания хорошего преобразователя итераторов.

Чтобы использовать coro.Pull, нам нужно переопределить метод All дерева, чтобы он ожидал и использовал новый результат bool из yield:

func (t *Tree[V]) All(yield func(v V) bool) {
    t.all(yield)
}

func (t *Tree[V]) all(yield func(v V) bool) bool {
    return t == nil ||
        t.Left.all(yield) && yield(t.Value) && t.Right.all(yield)
}

Теперь у нас есть все необходимое для написания функции сравнения деревьев на языке Go (полный код):

func cmp[V comparable](t1, t2 *Tree[V]) bool {
	next1, stop1 := coro.Pull(t1.All)
	next2, stop2 := coro.Pull(t2.All)
	defer stop1()
	defer stop2()
	for {
		v1, ok1 := next1()
		v2, ok2 := next2()
		if v1 != v2 || ok1 != ok2 {
			return false
		}
		if !ok1 && !ok2 {
			return true
		}
	}
}

Распространение паники

Еще одним улучшением является передача паники от корутины обратно ее вызывающей стороне, то есть той корутине, которая в последний раз вызывала resume для ее выполнения (и, следовательно, сидит заблокированной в resume в ожидании ее). Очень распространенным запросом является создание некоторого механизма для информирования одной горутины о панике другой, но в общем случае это может быть затруднительно, поскольку мы не знаем, какую горутину информировать и готова ли она услышать это сообщение. В случае с корутинами у нас вызывающая сторона заблокирована в ожидании новостей, поэтому имеет смысл передать сообщение о панике.

Чтобы сделать это, мы можем добавить defer, чтобы перехватить панику в новой корутине и снова вызвать ее в ожидающем resume.

type msg[T any] struct {
	panic any
	val   T
}

func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool)) {
	cin := make(chan In)
	cout := make(chan msg[Out])
	running := true
	resume = func(in In) (out Out, ok bool) {
		if !running {
			return
		}
		cin <- in
		m := <-cout
		if m.panic != nil {
			panic(m.panic)
		}
		return m.val, running
	}
	yield := func(out Out) In {
		cout <- msg[Out]{val: out}
		return <-cin
	}
	go func() {
		defer func() {
			if running {
				running = false
				cout <- msg[Out]{panic: recover()}
			}
		}()
		out := f(<-cin, yield)
		running = false
		cout <- msg[Out]{val: out}
	}()
	return resume
}

Протестируем его (полный код):

func main() {
	defer func() {
		if e := recover(); e != nil {
			fmt.Println("main panic:", e)
			panic(e)
		}
	}()
	next, _ := coro.Pull(func(yield func(string) bool) {
		yield("hello")
		panic("world")
	})
	for {
		fmt.Println(next())
	}
}

Новая корутина выдает hello, а затем вызывает панику world. Эта паника передается обратно в главную горутину, которая печатает значение и восстанавливается. Видно, что паника возникает в вызове resume:

% go run coro.go
hello true
main panic: world
panic: world [recovered]
    panic: world

goroutine 1 [running]:
main.main.func1()
    /tmp/coro.go:9 +0x95
panic({0x108f360?, 0x10c2cf0?})
    /go/src/runtime/panic.go:1003 +0x225
main.coro_New[...].func1()
    /tmp/coro.go.go:55 +0x91
main.Pull[...].func2()
    /tmp/coro.go.go:31 +0x1c
main.main()
    /tmp/coro.go.go:17 +0x52
exit status 2
%

Отмена

Распространение паники позаботилось о том, чтобы сообщить вызывающей стороне о досрочном выходе из корутины, но как сообщить корутине о досрочном выходе вызывающей стороны? По аналогии с функцией stop в pull-итераторе, нам нужно каким-то образом сигнализировать корутине, что она больше не нужна, возможно, потому, что вызывающий паникует, а возможно, потому, что вызывающий просто возвращается.

Для этого мы можем изменить coro.New так, чтобы он возвращал не только resume, но и функцию cancel. Вызов cancel будет похож на resume, только вместо возврата значения yield будет паниковать. Если во время отмены корутина паникует по-другому, мы хотим, чтобы cancel распространяла эту панику, как это делает resume. Но, конечно, мы не хотим, чтобы cancel распространял свою собственную панику, поэтому мы создаем уникальное значение паники, которое можно проверить. Мы также должны обработать отмену перед началом выполнения f.

var ErrCanceled = errors.New("coroutine canceled")

func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool), cancel func()) {
	cin := make(chan msg[In])
	cout := make(chan msg[Out])
	running := true
	resume = func(in In) (out Out, ok bool) {
		if !running {
			return
		}
		cin <- msg[In]{val: in}
		m := <-cout
		if m.panic != nil {
			panic(m.panic)
		}
		return m.val, running
	}
	cancel = func() {
		e := fmt.Errorf("%w", ErrCanceled) // unique wrapper
		cin <- msg[In]{panic: e}
		m := <-cout
		if m.panic != nil && m.panic != e {
			panic(m.panic)
		}
	}
	yield := func(out Out) In {
		cout <- msg[Out]{val: out}
		m := <-cin
		if m.panic != nil {
			panic(m.panic)
		}
		return m.val
	}
	go func() {
		defer func() {
			if running {
				running = false
				cout <- msg[Out]{panic: recover()}
			}
		}()
		var out Out
		m := <-cin
		if m.panic == nil {
			out = f(m.val, yield)
		}
		running = false
		cout <- msg[Out]{val: out}
	}()
	return resume, cancel
}

Мы могли бы изменить Pull, чтобы использовать паники и для отмены итераторов, но в этом контексте явный bool кажется более понятным, тем более что остановка итератора не является исключением.

Пример: Пересмотр простого сита

Давайте посмотрим, как распространение и отмена паники позволяют очистке простого сита "просто работать". Сначала обновим сито, чтобы оно использовало новый API. Функции counter и filter уже являются "однострочными" вызовами return coro.New(...). Изменим сигнатуру, чтобы включить в нее дополнительную cancel-функцию, возвращаемую из coro.New:

func counter() (func(bool) (int, bool), func()) {
	return coro.New(...)
}

func filter(p int, next func(bool) (int, bool)) (func(bool) (int, bool), func()) {
	return coro.New(...)
}

Затем преобразуем функцию main в функцию primes, которая выводит n простых чисел (полный код):

func primes(n int) {
	next, cancel := counter()
	defer cancel()
	for i := 0; i < n; i++ {
		p, _ := next(true)
		fmt.Println(p)
		next, cancel = filter(p, next)
		defer cancel()
	}
}

При выполнении этой функции, получив n простых чисел, она возвращается. Каждый из отложенных вызовов cancel очищает созданные корутины. А что если в одной из этих корутин произошла ошибка и она запаниковала? Если эта корутина была возобновлена next-вызовом в primes, то паника возвращается в primes, и отложенные cancel-вызовы primes очищают все остальные корутины. Если же работа была возобновлена next-вызовом в корутине filter, то паника распространяется до ожидающей корутины filter, затем до следующей ожидающей корутины filter и так далее, пока не дойдет до p := next(true) в primes, который снова очистит оставшиеся корутины.

API

Мы пришли к следующему API:

  • New создает новую приостановленную корутину, готовую выполнить функцию f. Новая корутина - это горутина, которая никогда не выполняется сама по себе: она выполняется только в то время, когда какая-либо другая горутина вызывает ее и ожидает, вызывая resume или cancel.

  • Горутина может приостановить свою работу и переключиться на новую корутину, вызвав resume(in). Первый вызов resume запускает f(in, yield). Вызов resume блокирует выполнение f до тех пор, пока f не вызовет yield(out) или не вернется в исходное состояние. Когда f вызывает yield, то yield блокируется, а resume возвращает out, true. Когда f возвращается, resume возвращает out, false. Если resume вернулся из-за yield, то следующий resume(in) переключается обратно на f, а yield возвращается in.

  • cancel прекращает выполнение f и закрывает корутину. Если вызов resume не был произведен, то f вообще не выполняется. В противном случае отмена приводит к панике заблокированного вызова yield с ошибкой, удовлетворяющей errors.Is(err, ErrCanceled).

  • Если f паникует и не восстанавливает панику, то паника останавливается в корутине f и перезапускается в ожидающей f горутине, вызывая повторную панику заблокированного ожидания resume или cancel с тем же значением паники. cancel не выполняет повторную панику, если паника f - это паника, которую вызвала сама cancel.

  • После возврата f или паники корутины больше не существует. Последующие вызовы resume возвращают нулевое значение и false. Последующие вызовы cancel просто возвращаются.

  • Функции resume, cancel и yield могут передаваться между различными горутинами и использоваться ими, тем самым динамически меняя, какая из горутин является "корутиной". Хотя New создает новую горутину, она также устанавливает инвариант, согласно которому одна горутина всегда заблокирована - либо в resume, cancel, yield, либо (сразу после New) в ожидании resume, которая вызовет f. Этот инвариант сохраняется до возвращения f, после чего новая горутина завершается. В результате coro.New создает новую конкурентность в программе без какого-либо нового параллелизма.

  • Если несколько горутин вызывают resume или cancel, то эти вызовы сериализуются. Аналогичным образом, если несколько горутин вызывают yield, эти вызовы сериализуются.

func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool), cancel func())

Эффективность

Как я уже говорил в начале, хотя важно иметь определение корутин, которое можно понять по ссылке на чистую реализацию Go, я считаю, что нам следует использовать оптимизированную реализацию во время выполнения. На моем MacBook Pro 2019 года передача значений туда и обратно с помощью основанного на каналах coro.New, описанного в этом посте, требует примерно 190 нс на переключение, или 380 нс на значение в coro.Pull. Помните, что coro.Pull не является стандартным способом использования итератора: стандартным способом является прямой вызов итератора, который вообще не имеет накладных расходов на корутину. coro.Pull нужен только в том случае, если требуется обрабатывать итерируемые значения инкрементально, а не с помощью одного цикла for. Тем не менее, мы хотим сделать coro.Pull настолько быстрым, насколько это возможно.

Сначала я попробовал заставить компилятор помечать пары send-receive и оставлять подсказки для среды выполнения, чтобы объединить их в одну операцию. Это позволило бы времени выполнения канала обойти планировщик и перейти непосредственно к другой корутине. Такая реализация требует около 118 нс на одно переключение, или 236 нс на одно pulled-значение (на 38% быстрее). Это уже лучше, но все равно не так быстро, как хотелось бы. Вся общность каналов добавляет слишком много накладных расходов.

Далее я добавил прямой переключатель корутин во время выполнения, полностью избегая каналов. Это позволило сократить переключение между корутинами до трех атомарных сравнений (одно в структуре данных корутины, одно для статуса планировщика блокирующей корутины и одно для статуса планировщика возобновляющей корутины), что я считаю оптимальным с учетом инвариантов безопасности, которые необходимо поддерживать. Такая реализация занимает 20 нс на переключение, или 40 нс на pulled-значение. Это примерно в 10 раз быстрее, чем исходная реализация канала. Возможно, более важно то, что 40 нс на pulled-значение кажутся достаточно малыми в абсолютном выражении, чтобы не стать узким местом для кода, которому нужен coro.Pull.


Расс Кокс - это один из ключевых разработчиков языка программирования Go. Он внес значительный вклад в развитие Go и активно участвует в его поддержке. Расс также выступает с докладами на конференциях, например, на GopherCon 2022, где он говорил о совместимости и том, как программы на Go продолжают работать. Кроме того, он ведет исследования в области моделей памяти. Вы можете найти его работы на GitHub.


Трудности перевода. В контексте программирования, yield обычно не переводится как "выход". Это потому, что yield не завершает функцию, в отличие от return. Вместо этого, yield приостанавливает выполнение функции и "производит" или "генерирует" значение. Когда функция возобновляет свое выполнение, она продолжает с того места, где был вызван yield. Таким образом, yield больше похож на "паузу" или "производство", а не "выход".

Комментарии (5)


  1. kovserg
    21.11.2023 21:10
    +2

    yield - "выдать"


    1. 0Bannon
      21.11.2023 21:10
      +2

      Уступить. Аналогично в штатах есть дорожный знак "уступить дорогу" с этим словом.


  1. mayorovp
    21.11.2023 21:10

    Вместо булевых переменных можно было бы закрывать каналы.


  1. VladimirFarshatov
    21.11.2023 21:10
    +3

    Положил в закладки, тут надо читать вдумчиво. Поверхностно есть ощущение, что вопрос сильно переусложнен и реализовать это на Го можно несколько проще.. надо думать.

    Спасибки автору перевода, давно не видел хороших статей именно программистского содержания.


  1. Ravager
    21.11.2023 21:10

    Выглядит жестковато, лучше бы конечно добавили как часть языка со стейт машиной. Вот генераторы в язык принести это мне кажется полезно.