При использовании языка Go чувствуется нехватка хорошей библиотеки для работы с потоками данных. Имеющиеся библиотеки в основном написаны во времена до дженериков и имеют не очень удобный интерфейс.

С выходом Go 1.18 появилась возможность реализовать библиотеку потоковой обработки данных goio, вдохновлённую превосходными Scala-библиотеками cats-effect и fs2.

Потребности потоковых программ

При обработке данных, объём которых превышает объём памяти вычислительного узла, необходимо использовать технологии потоковой обработки данных. При этом данные загружаются в память по частям, выполняется их обработка, а затем данные выгружаются из памяти.

Для написания таких программ необходимо каким-либо образом решить ряд вопросов:

  • обработка ошибок;

  • представление конечных автоматов, реализующих отдельные этапы обработки данных;

  • композиция конечных автоматов;

  • использование нескольких ядер для параллельной обработки данных.

От того, каким образом эти вопросы решены, зависит надёжность получающейся системы, читабельность кода, производительность.

На мой вгляд, наиболее интересное решение предлагают Scala-библиотеки cats-effect и fs2.

Обработка ошибок

Многие функции не являются "тотальными". То есть они не всегда возвращают осмысленное значение. В языке Go принято представлять такие частичные функции с помощью возврата пары - значения функции и ошибки:

func f[A any]() (A, error) {
  ...
}

При этом вызывающая сторона должна сразу проверить наличие ошибки:

a, err = f()
if err != nil {
  return
}

Такой способ представления ошибок создаёт неудобства при композиции нескольких вычислений.

Принятым в функциональном программировании способом представления вычислений, которые могут завершиться неудачей, является IO[A]. IO[A] - объект, представляющий вычисление, которое в случае успеха возвращает значение типа A. (Это, несколько упрощённое представление IO, позволяет сфокусироваться на аспекте, связанном с обработкой ошибок.)

В рассматриваемой библиотеке goio этот тип реализован. Например, вышеприведённая функция f может быть непосредственно "завёрнута" в IO[A]:

fio := io.Eval(f)

(Также имеется целый набор удобных способов заворачивания вычислений в IO[A] - Lift, Fail, Delay.)

Чтобы выполнить вычисление IO[A], используется метод UnsafeRunSync:

a, err = io.UnsafeRunSync(fio)

операции создания IO и выполнения почти эквивалентны непосредственному вызову f. Отличие заключается в том, что при возникновении panic , производится восстановление (recover) в error.

Какие возможности предоставляет тип IO[A]?

Главная возможность - композиция вычислений. Выполнить одно вычисление, результат подставить в другое, и всё вместе представить как одно вычисление. Такая операция в библиотеке goio носит название FlatMap (также известна как bind):

func g[A any, B any](a A) io.IO[B] {
...
}

h := io.FlatMap(f, g)

Предусмотрены и другие привычные комбинаторы - Map, AndThen, Sequence...

Потоковая обработка данных

Если IO[A] представляет собой однократное вычисление,то Stream[A] представляет потенциально бесконечный поток элементов A. Выполняя некоторое действие, можно последовательно получить все элементы из потока.

Создать поток можно, например, из массива данных:

ints := stream.LiftMany(1, 2, 3)

Или можно создать бесконечный поток с использованием генератора:

var nats = stream.Unfold(0, func(s int) int {
	return s + 1
})

Также имеются и другие способы создания потоков - Empty, Lift, Repeat, Eval.

Обработка потока осуществляется конечным автоматом, имеющим вход и выход. Простейший случай - вход преобразуется в выход без внутреннего состояний. Такое преобразование осуществляется комбинатором Map:

powers := stream.Map(nats, func (i int) int { return i * i })

Универсальный механизм, позволяющий представить произвольный конечный автомат с состоянием - StateFlatMapWithFinish (см. пример в тестах). Имеются и другие комбинаторы - Filter, Flatten, AndThen, Take, Drop,...

Чтобы выполнить все вычисления и получить результат обработки потока, используются финальные операции - DrainAll, ToSlice, Collect,...

В библиотеке также реализованы простые средства для работы с текстовыми файлами или http-запросами - ReadLines, WriteLines.

Параллельное выполнение

Асинхронная операция выполняется вне текущего потока выполнения. То есть её можно запустить, передав в качестве аргумента callback, и продолжить работу текущего потока:

//io.Async[A any](k func(Callback[A])) IO[A]

helloIO := io.Async(func (cb Callback[string]) {
  // start operation
  go func(){
    time.Sleep(100 * time.Millisecond)
    // at the end call cb:
    cb("hello", nil)
  }()
})

Когда асинхронная операция завершится, она должна вызвать переданный callback. (Если callback не вызывается, то получится операция Never.)

Текущий поток, после того, как выполнит свою оставшуюся работу, в какой-то момент может захотеть дождаться (Join) результата асинхронной операции. При этом текущий поток блокируется до получения результата (или ошибки).

Для представления такой семантики параллельного исполнения используется Fiber[A] - вычисление, запускаемое в отдельной горутине, результат которой мы можем получить с помощью Join.

Для создания Fiber используется функция Start(IO[A]) Fiber[A].

sleep100msAndReturnHello := io.Start(io.Pure(func() string {
  time.Sleep(100 * time.Millisecond)
  return "hello"
}))
dosomething := ... // выполняем какие-то действия после запуска.
getHello := io.FlatMap(sleep100msAndReturnHello, func (fiber Fiber[A]) IO[string] {
  return io.FlatMap(dosomething, fiber.Join())
})
hello, err := io.UnsafeRunSync(getHello)

Для создания сразу большого числа параллельных вычислений используются комбинаторы Parallel и ConcurrentlyFirst.

Ресурсы

Одним из аспектов корректной обработки ошибок является освобождение ресурсов по окончании операций независимо от того, произошла ошибка или нет. Для представления таких ресурсов используется тип Resource[A]. Ресурсами могут быть соединения с базой данных, файлы, каналы... Основная операция, предусмотренная для ресурса - Use, позволяющая в ограниченном конексте получить доступ к экземпляру ресурса. При выходе за границы контекста производится автоматическое закрытие ресурса. Пример работы с ресурсами.

Каналы

Для организации межпоточного взаимодействия в языке Go предусмотрены каналы. Для корректной работы с каналами в библиотеке goio предусмотренна поддержка на всех уровнях.

В пакете io : открытие/закрытие канала, отправка и чтение данных.

В пакете stream: отправка потока данных в канал, формирование потока данных из канала, использование внешней системы, подключенной посредством входного и выходного каналов.

В пакете resource: представление канала в виде ресурса - UnbufferedChannel[A any]() Resource[chan A].

Функциональные операции с массивами (slice)

Без привычных операций Map, FlatMap, Filter, ... работа с массивами не очень удобна. В библиотеке goio содержатся эти операции, реализованные с использованием generics.

Заключение

Библиотека goio содержит удобный набор инструментов для написания надёжных и корректных программ, осуществляющих потоковую обработку данных. Реализованы:

  • представление вычислений, которые могут возвращать ошибки, и комбинация таких вычислений;

  • представление абстрактных потоков данных;

  • представление конечных автоматов, реализующих отдельные этапы обработки данных;

  • композиция цепочки конечных автоматов, выполняющих операции над потоками данных;

  • параллельная обработка данных;

  • взаимодействие с другими частями программ, реализованными с использованием каналов Go.

Библиотека доступна по лицензии MIT. PR приветствуются.

Комментарии (3)


  1. gohrytt
    23.05.2022 08:51
    -4

    ой началось...


  1. zuborg
    25.05.2022 17:01

    Интересно, как там с накладными расходами на все это счастье..

    Сколько дополнительных аллокаций, сколько процессора...


    1. MrLoki
      25.05.2022 20:09
      +1

      С ходу много — простой бенч даёт 15 аллокаций на итерацию (извлечение 1 значения) на бесконечный стрим (nats из примера) + 1 map (powers из примера).

      При этом если брать пачкой (используя не Head, а Take), то количество аллокаций растёт кратно, что печально.

      Но это не выглядит как шоустоппер — сейчас код идеоматичный и простой. Если нужно выжимать из него соки, есть несколько понятных путей: реюз внутренних структур, батчинг при работе с массивом (почти наверняка можно сделать Take(x,10) более экономичной чем Head(x)x10 в близкое к 10 количество раз), пуллинг.