При использовании языка Go чувствуется нехватка хорошей библиотеки для работы с потоками данных. Имеющиеся библиотеки в основном написаны во времена до дженериков и имеют не очень удобный интерфейс.
С выходом Go 1.18 появилась возможность реализовать библиотеку потоковой обработки данных goio, вдохновлённую превосходными Scala-библиотеками cats-effect и fs2.
Потребности потоковых программ
При обработке данных, объём которых превышает объём памяти вычислительного узла, необходимо использовать технологии потоковой обработки данных. При этом данные загружаются в память по частям, выполняется их обработка, а затем данные выгружаются из памяти.
Для написания таких программ необходимо каким-либо образом решить ряд вопросов:
обработка ошибок;
представление конечных автоматов, реализующих отдельные этапы обработки данных;
композиция конечных автоматов;
использование нескольких ядер для параллельной обработки данных.
От того, каким образом эти вопросы решены, зависит надёжность получающейся системы, читабельность кода, производительность.
На мой вгляд, наиболее интересное решение предлагают Scala-библиотеки cats-effect и fs2.
Обработка ошибок
Многие функции не являются "тотальными". То есть они не всегда возвращают осмысленное значение. В языке Go принято представлять такие частичные функции с помощью возврата пары - значения функции и ошибки:
func f[A any]() (A, error) {
...
}
При этом вызывающая сторона должна сразу проверить наличие ошибки:
a, err = f()
if err != nil {
return
}
Такой способ представления ошибок создаёт неудобства при композиции нескольких вычислений.
Принятым в функциональном программировании способом представления вычислений, которые могут завершиться неудачей, является IO[A]
. IO[A]
- объект, представляющий вычисление, которое в случае успеха возвращает значение типа A
. (Это, несколько упрощённое представление IO
, позволяет сфокусироваться на аспекте, связанном с обработкой ошибок.)
В рассматриваемой библиотеке goio этот тип реализован. Например, вышеприведённая функция f
может быть непосредственно "завёрнута" в IO[A]
:
fio := io.Eval(f)
(Также имеется целый набор удобных способов заворачивания вычислений в IO[A]
- Lift
, Fail
, Delay
.)
Чтобы выполнить вычисление IO[A]
, используется метод UnsafeRunSync
:
a, err = io.UnsafeRunSync(fio)
операции создания IO
и выполнения почти эквивалентны непосредственному вызову f
. Отличие заключается в том, что при возникновении panic
, производится восстановление (recover
) в error
.
Какие возможности предоставляет тип IO[A]
?
Главная возможность - композиция вычислений. Выполнить одно вычисление, результат подставить в другое, и всё вместе представить как одно вычисление. Такая операция в библиотеке goio носит название FlatMap
(также известна как bind
):
func g[A any, B any](a A) io.IO[B] {
...
}
h := io.FlatMap(f, g)
Предусмотрены и другие привычные комбинаторы - Map
, AndThen
, Sequence
...
Потоковая обработка данных
Если IO[A]
представляет собой однократное вычисление,то Stream[A]
представляет потенциально бесконечный поток элементов A
. Выполняя некоторое действие, можно последовательно получить все элементы из потока.
Создать поток можно, например, из массива данных:
ints := stream.LiftMany(1, 2, 3)
Или можно создать бесконечный поток с использованием генератора:
var nats = stream.Unfold(0, func(s int) int {
return s + 1
})
Также имеются и другие способы создания потоков - Empty
, Lift
, Repeat
, Eval
.
Обработка потока осуществляется конечным автоматом, имеющим вход и выход. Простейший случай - вход преобразуется в выход без внутреннего состояний. Такое преобразование осуществляется комбинатором Map
:
powers := stream.Map(nats, func (i int) int { return i * i })
Универсальный механизм, позволяющий представить произвольный конечный автомат с состоянием - StateFlatMapWithFinish
(см. пример в тестах). Имеются и другие комбинаторы - Filter
, Flatten
, AndThen
, Take
, Drop
,...
Чтобы выполнить все вычисления и получить результат обработки потока, используются финальные операции - DrainAll
, ToSlice
, Collect
,...
В библиотеке также реализованы простые средства для работы с текстовыми файлами или http-запросами - ReadLines
, WriteLines
.
Параллельное выполнение
Асинхронная операция выполняется вне текущего потока выполнения. То есть её можно запустить, передав в качестве аргумента callback
, и продолжить работу текущего потока:
//io.Async[A any](k func(Callback[A])) IO[A]
helloIO := io.Async(func (cb Callback[string]) {
// start operation
go func(){
time.Sleep(100 * time.Millisecond)
// at the end call cb:
cb("hello", nil)
}()
})
Когда асинхронная операция завершится, она должна вызвать переданный callback. (Если callback не вызывается, то получится операция Never
.)
Текущий поток, после того, как выполнит свою оставшуюся работу, в какой-то момент может захотеть дождаться (Join
) результата асинхронной операции. При этом текущий поток блокируется до получения результата (или ошибки).
Для представления такой семантики параллельного исполнения используется Fiber[A]
- вычисление, запускаемое в отдельной горутине, результат которой мы можем получить с помощью Join
.
Для создания Fiber
используется функция Start(IO[A]) Fiber[A]
.
sleep100msAndReturnHello := io.Start(io.Pure(func() string {
time.Sleep(100 * time.Millisecond)
return "hello"
}))
dosomething := ... // выполняем какие-то действия после запуска.
getHello := io.FlatMap(sleep100msAndReturnHello, func (fiber Fiber[A]) IO[string] {
return io.FlatMap(dosomething, fiber.Join())
})
hello, err := io.UnsafeRunSync(getHello)
Для создания сразу большого числа параллельных вычислений используются комбинаторы Parallel
и ConcurrentlyFirst
.
Ресурсы
Одним из аспектов корректной обработки ошибок является освобождение ресурсов по окончании операций независимо от того, произошла ошибка или нет. Для представления таких ресурсов используется тип Resource[A]
. Ресурсами могут быть соединения с базой данных, файлы, каналы... Основная операция, предусмотренная для ресурса - Use
, позволяющая в ограниченном конексте получить доступ к экземпляру ресурса. При выходе за границы контекста производится автоматическое закрытие ресурса. Пример работы с ресурсами.
Каналы
Для организации межпоточного взаимодействия в языке Go предусмотрены каналы. Для корректной работы с каналами в библиотеке goio предусмотренна поддержка на всех уровнях.
В пакете io : открытие/закрытие канала, отправка и чтение данных.
В пакете stream: отправка потока данных в канал, формирование потока данных из канала, использование внешней системы, подключенной посредством входного и выходного каналов.
В пакете resource: представление канала в виде ресурса - UnbufferedChannel[A any]() Resource[chan A]
.
Функциональные операции с массивами (slice)
Без привычных операций Map
, FlatMap
, Filter
, ... работа с массивами не очень удобна. В библиотеке goio содержатся эти операции, реализованные с использованием generics.
Заключение
Библиотека goio содержит удобный набор инструментов для написания надёжных и корректных программ, осуществляющих потоковую обработку данных. Реализованы:
представление вычислений, которые могут возвращать ошибки, и комбинация таких вычислений;
представление абстрактных потоков данных;
представление конечных автоматов, реализующих отдельные этапы обработки данных;
композиция цепочки конечных автоматов, выполняющих операции над потоками данных;
параллельная обработка данных;
взаимодействие с другими частями программ, реализованными с использованием каналов Go.
Библиотека доступна по лицензии MIT. PR приветствуются.
Комментарии (3)
zuborg
25.05.2022 17:01Интересно, как там с накладными расходами на все это счастье..
Сколько дополнительных аллокаций, сколько процессора...
MrLoki
25.05.2022 20:09+1С ходу много — простой бенч даёт 15 аллокаций на итерацию (извлечение 1 значения) на бесконечный стрим (nats из примера) + 1 map (powers из примера).
При этом если брать пачкой (используя не
Head
, аTake
), то количество аллокаций растёт кратно, что печально.Но это не выглядит как шоустоппер — сейчас код идеоматичный и простой. Если нужно выжимать из него соки, есть несколько понятных путей: реюз внутренних структур, батчинг при работе с массивом (почти наверняка можно сделать
Take(x,10)
более экономичной чемHead(x)
x10 в близкое к 10 количество раз), пуллинг.
gohrytt
ой началось...