Перевод одной из статей Бена Джонсона из серии "Go Walkthrough" по более углублённому изучению стандартной библиотеки в контексте реальных задач.


Go является языком программирования, хорошо приспособленным для работы с байтами. Будь у вас списки байт, потоки байт или просто отдельные байты, в Go легко с ними работать. Это примитивы, на которых мы строим наши абстракции и сервисы.


Пакет io является одним из самых фундаментальных во всей стандартной библиотеке. Он предоставляет набор интерфейсов и вспомогательных функций для работы с потоками байтов.


Этот пост является одним из серии статей по более углублённому разбору стандартной библиотеки. Несмотря на то, что стандартная документация предоставляет массу полезной информации, в контексте реальных задач может быть непросто разобраться, что и когда использовать. Эта серия статей направлена на то, чтобы показать использование пакетов стандартной библиотеки в контексте реальных приложений.


Чтение байтов


При работе с байтами, есть две фундаментальные операции: чтение и запись. Давайте сначала взглянём на чтение байтов.


Интерфейс Reader


Простейшая конструкция для чтения байтов из потока это интерфейс Reader:


    type Reader interface {
        Read(p []byte) (n int, err error)
    }

Этот интерфейс многократно реализован в стандартной библиотеке для вообще всего — от сетевых соединений, до файлов и до врапперов для слайсов в памяти.


Reader принимает на вход буфер, p, как параметр для метода Read(), чтобы не нужно было выделять память. Если бы Read() возвращал новый слайс, вместо того, чтобы принимать его как аргумент, ридеру пришлось бы выделять память при каждом вызове Read(). Это была бы катастрофа для сборщика мусора.


Одна из проблем с интерфейсом Reader в том, что с ним идёт набор довольно витиеватых правил. Во-первых, он возвращает ошибку io.EOF при нормальном ходе дел, просто если поток данных завершился. Это может запутывать новичков. Во-вторых, нет гарантии, что ваш буфер будет заполнен целиком. Если вы передали 8-байтовый слайс, по факту вы можете прочитать от 0 до 8 байт. Обработка чтения по частям можем быть непростой и легко подвержена ошибкам. К счастью, у нас есть немало вспомогательных функций для решения этих задач.


Улучшаем гарантии чтения


Представим, что у вас есть протокол, который нужно распарсить и вы хотите прочесть 8-байтовое uint64 значение из ридера. В этом случае предпочтительней использовать io.ReadFull(), так как вы точно знаете, сколько хотите прочесть:


func ReadFull(r Reader, buf []byte) (n int, err error)

Эта функция проверяет, что буфер полностью заполнен перед тем, как вернуть значение. Если размер полученных данных отличается от размера буфера, то вы получите ошибку io.ErrUnexpectedEOF. Эта простая гарантия упрощает код довольно сильно. Чтобы прочесть 8 байт, достаточно сделать так:


    buf := make([]byte, 8)
    if _, err := io.ReadFull(r, buf); err != nil {
        return err
    }

Есть также довольно много более высокоуровневых парсеров вроде binary.Read(), которые умеют парсить определённые типы. Мы познакомимся ближе с ними в следующих постах о других пакетах.


Ещё одна чуть реже используемая вспомогательная функция это ReadAtLeast():


    func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error)

Эта функция записывает доступные для чтения данные в ваш буфер, но не менее указанного количества байт. Я не нашел надобности в этой функции лично для себя, но легко могу представить её пользу для случаев, когда вы хотите уменьшить количество вызовов Read() и буферизировать дополнительные данные.


Объединение потоков


Нередко вы можете встретить ситуацию, где вам необходимо объединить несколько ридеров вместе. Это легко сделать с помощью MultiReader:


    func MultiReader(readers ...Reader) Reader

Например, вы хотите отправить HTTP ответ, в котором заголовок читается из памяти, а содержимое тела ответа — из файла. Многие люди сначала прочитают файл в буфер в памяти перед отправкой, но это медленно и может требовать много памяти.


Вот более простой подход:


    r := io.MultiReader(
        bytes.NewReader([]byte("...my header...")),
        myFile,
    )
    http.Post("http://example.com", "application/octet-stream", r)

MultiReader даёт возможность http.Post() использовать оба ридера как один.


Дублирование потоков


Один из моментов, которые вам могут встретиться при работе с ридерами это то, что если данные были вычитаны, их нельзя прочитать ещё раз. Например, ваше приложение не смогло распарсить тело HTTP запроса, но вы не можете его проанализировать, потому что парсер уже прочёл данные, их больше нет в ридере.


TeeReader является тут хорошим решением — он позволяет сохранять вычитанные данные, при этом не мешая процессу чтения.


    func TeeReader(r Reader, w Writer) Reader

Эта функция создаёт новый ридер-обёртку вокруг вашего ридера r. Любая операция чтения из нового ридера будет также записывать данные в w. Этот райтер(writer) может представлять собой всё что угодно — от буфера в памяти, до лог файла и до потока стандартных ошибок STDERR.


Например, вы можете захватывать ошибочные запросы следующим образом:


    var buf bytes.Buffer
    body := io.TeeReader(req.Body, &buf)

    // ... process body ...

    if err != nil {
            // inspect buf
            return err
    }

Впрочем, тут важно быть внимательными с размерами вычитанного тела ответа, чтобы не израсходовать память.


Ограничение длины потока


Поскольку потоки никак не ограничены по размеру, иногда чтение из них может привести к проблемам с памятью или местом на диске. Типичный пример это хендлер, осуществляющий загрузку файла. Обычно существуют лимиты на максимальный размер загружаемого файла, чтобы не переполнять диск, но может быть утомительно имплементировать их вручную.


LimitReader даёт нам эту функциональность, предоставляя обёртку вокруг ридера, который ограничивает количество байт, доступных для вычитки.


    func LimitReader(r Reader, n int64) Reader

Один из моментов при работе с LimitReader-ом это то, что он не скажет вам, если r вычитал больше, чем n. Он просто вернёт io.EOF, как только вычитает n байт. Как вариант, можно выставить лимит в n+1 и потом проверить, прочитали ли вы больше, чем n байт в конце.


Запись байтов


Теперь, после того как мы познакомились с чтением байтов из потоков, давайте посмотрим, как их записывать в потоки.


Интерфейс Writer


Интерфейс Writer это, по сути, инвертированный Reader. Мы указываем набор байтов, которые нужно записать в поток:


    type Writer interface {
            Write(p []byte) (n int, err error)
    }

В общем случае, запись байтов это более простая операция, чем чтение. С ридерами сложность в том, чтобы правильно работать с частичными и не полными чтениями, но при частичной или неполной записи, мы просто получаем ошибку.


Дублирование записи


Иногда вам нужно отправить данные сразу в несколько writer-ов. Например, в лог файл и в STDERR. Это похоже на TeeReader, только мы хотим дублировать запись, а не чтение.


В этом случае нам подойдёт MultiWriter:


    func MultiWriter(writers ...Writer) Writer

Имя может немного сбивать толку, потому что это не совсем writer-версия MultiReader. Если MultiReader объединяет несколько ридеров в один, то MultiWriter возвращает writer, который дублирует записи во все writer-ы.


Я активно использую MultiWriter в unit-тестах, где я хочу убедиться, что сервисы пишут в лог корректно:


    type MyService struct {
            LogOuput io.Writer
    }
    ...
    var buf bytes.Buffer
    var s MyService
    s.LogOutput = io.MultiWriter(&buf, os.Stderr)

Использование MultiWriter позволяет мне проверить содержимое buf и при этом видеть полный вывод логов в терминале для отладки.


Копирование байт


Теперь, когда мы разобрались и с чтением, и с записью байт, логично разобраться, как мы можем объединять эти две операции вместе и копировать данные между ними.


Объединяя readers & writers


Самый простой способ скопировать из ридера во writer это использовать функцию Copy():


    func Copy(dst Writer, src Reader) (written int64, err error)

Эта функция использует буфер в 32 КБ, чтобы прочитать из src и записать в dst. Если случится ошибка, отличная от io.EOF, копирование остановится и вернётся ошибка.


Одна из проблем с Copy() заключается в том, что у вас нет способа гарантировать максимальное количество скопированных байт. Например, вы хотите скопировать лог файл до его текущего размера. Если же лог продолжит расти во время копирования, вы получите больше байт, чем ожидалось. В этом случае можно использовать функцию CopyN(), которая скопирует не больше указанного количества:


    func CopyN(dst Writer, src Reader, n int64) (written int64, err error)

Ещё один важный момент с Copy() заключается в том, что при каждом копировании выделяется буфер в 32КБ. Если вам нужно делать много операций копирования, вы можете переиспользовать уже выделенный буфер и использовать CopyBuffer():


    func CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)

Накладные расходы на Copy() на самом деле очень малы, поэтому я лично не использую CopyBuffer().


Оптимизируем копирование


Чтобы избежать использования промежуточного буфера, типы данных могут имплементировать специальные интерфейсы для чтения и записи в них напрямую. Если они имплементированы для типа, функция Copy() не будет использовать буфер, а будет использовать эти специальные методы.


Если тип имплементирует интерфейс WriterTo, то он может записывать данные напрямую:


    type WriterTo interface {
        WriteTo(w Writer) (n int64, err error)
    }

Я использовал его в функции BoltDB Tx.WriteTo(), которая позволяет пользователям создавать снапшот базы данных из транзакции.


С другой стороны, интерфейс ReaderFrom позволяет типу напрямую читать данные из ридера:


    type ReaderFrom interface {
            ReadFrom(r Reader) (n int64, err error)
    }

Адаптация ридеров и райтеров


Иногда вы оказываетесь в ситуации, когда у вас есть функция, принимающая Reader, но у вас есть только Writer. Возможно, вы хотите динамически записать данные в HTTP запрос, но http.NewRequest() принимает только Reader.


Вы можете инвертировать райтер, используя io.Pipe():


    func Pipe() (*PipeReader, *PipeWriter)

Тут вы получаете новый ридер и writer. Любая запись в PipeWriter переправится в PipeReader.


Я редко использовал эту функцию, но exec.Cmd использует её для реализации Stdin, Stdout и Stderr пайпов, которые могут быть очень полезны при работе c запускаемыми программами.


Закрытие потоков


Всё хорошее подходит к концу, и работа с потоками не исключение. Интерфейс Closer предоставляет общий способ закрывать потоки:


    type Closer interface {
            Close() error
    }

Тут особо не о чем писать, интерфейс этот очень простой, но я стараюсь всегда возвращать ошибку в моих Close() методах, чтобы мои типы реализовывали этот интерфейс, если потребуется. Closer не всегда используется напрямую, он чаще идёт в сочетании с другими интерфейсами, такими как ReadCloser, WriteCloser и ReadWriteCloser.


Навигация по потокам


Потоки обычно представляют собой постоянно появляющиеся данные от начала до конца, но бывают исключения. Файл, к примеру, может быть потоком, но при этом вы также можете произвольно перемещаться к любой позиции внутри файла.


Интерфейс Seeker предоставляет возможность перемещаться внутри потока:


    type Seeker interface {
            Seek(offset int64, whence int) (int64, error)
    }

Есть три способа прыгать на нужную позицию: переход от текущей позиции, переход с начала потока и переход с конца. Вы указываете этот способ аргументом whence. Аргумент offset указывает на сколько байт переместиться.


Перемещение по потоку может быть полезным, если вы используете блоки фиксированного размера или если ваш файл содержит индекс со смещениями. Иногда данные находятся в заголовке и логично использовать переход с начала потока, но иногда данные находятся в хвосте и удобнее перемещаться с конца.


Оптимизация под типы данных


Чтение и запись порциями могут быть утомительны, если всё что вам нужно это один байт или руна (rune). В Go для этого есть интерфейсы, которые облегчают жизнь.


Работа с индивидуальными байтами


Интерфейсы ByteReader и ByteWriter предоставляют простые методы для чтения и записи одного байта:


    type ByteReader interface {
            ReadByte() (c byte, err error)
    }
    type ByteWriter interface {
            WriteByte(c byte) error
    }

Заметьте, что тут нет параметра для количества байт, это всегда будет 0 или 1. Если байт не был прочитан или записан, возвращается ошибка.


Также есть есть ByteScanner интерфейс, позволяющий удобно работать с буферизированными ридерами для байт:


    type ByteScanner interface {
        ByteReader
        UnreadByte() error
    }

Этот интерфейс позволяет вернуть байт обратно в поток. Это бывает удобно, например при написании LL(1) парсеров, так как позволяет заглядывать на байт вперёд.


Работа с индивидуальными рунами


Если вы парсите Unicode данные, то вы должны работать с рунами вместо индивидуальных байт. В этом случае вы должны использовать интерфейсы RuneReader и RuneScanner:


    type RuneReader interface {
            ReadRune() (r rune, size int, err error)
    }
    type RuneScanner interface {
           RuneReader
           UnreadRune() error
    }

Вывод


Потоки байт важны для многих Go программ. Это интерфейсы для всего, от сетевых соединений до файлов на диске и до пользовательского ввода с клавиатуры. Пакет io предоставляет основные примитивы для работы со всем этим.


Мы посмотрели на чтение, запись и копирование байт, а также на оптимизацию этих операций под конкретные задачи. Эти примитивы могут выглядеть просто, но они являются базовыми строительными блоками для приложений, активно работающих с данными.


Пожалуйста, изучите внимательно пакет io и используйте его интерфейсы в своих программах. Также я буду рад, если вы поделитесь своими интересными способами использования пакета io, равно как и любым советам по тому, как можно улучшить эту серию статей.

Поделиться с друзьями
-->

Комментарии (29)


  1. kay
    02.08.2016 15:30
    +1

    Вопрос знатокам: как прервать выполнение go routine с io.Copy внутри? Сразу скажу, что блокировка происходит на этой строке https://golang.org/src/io/io.go#L380. Интерфейс Close тоже не помогает.
    Пока мне никто не смог ответить на этот вопрос.


    1. divan0
      02.08.2016 16:18

      Я думаю, тут нет универсального совета — если Read заблокирован, то это блок на уровне сискола. Close() может остановить чтение в случае, скажем, сетевого соединения, но не при чтении из файла, и это ещё может отличаться от ОС к ОС.
      Если ваш кейс — чтение файлов и ОС Linux, то может помочь вот этот пакет — https://github.com/npat-efault/poller


    1. creker
      02.08.2016 19:05

      Откуда чтение происходит? Файл на диске? Сокет? Для файлов Go не делает ничего особенного, просто вызывает syscall. Можете для каждой ОС поискать, что делать в случае блокировки. Если сокет, то в Go это пройдет через асинхронную прослойку, специфичную для конкретной ОС. В этом случае блокировка снимется, если закрыть сокет из какой-либо другой горутины или с другого конца соединения. Опять же, все работает так же, как если бы писали сами на С.


      1. kay
        02.08.2016 20:24

        В моем случае это stdin и stdout.


        1. nikitadanilov
          02.08.2016 20:51

          Пошлите соответствующему треду сигнал, на который установлен обработчик.


          1. kay
            02.08.2016 21:02

            И как этот тред завершит io.Copy, если src.Read(buf) заблокирован?


            1. nikitadanilov
              02.08.2016 21:05

              Сигнал (man 2 kill) прерывает заблокированный системный вызов, тот возвращается с errno == EINTR.


              1. kay
                02.08.2016 21:52

                go routine это же легковесный тред. тред процесса нельзя завершить не завершив сам процесс. если же ты говоришь о сигнале, который передаётся через канал, то Read(buf) блокирует всю go routine функцию, и нет возможности обработать сигнал, полученный с канала.


                1. nikitadanilov
                  02.08.2016 23:22

                  Речь идет про POSIX сигналы. Никакого завершения тредов не происходит. Нужно установить обработчик сигнала, так же, как делает sigaction(3) (предположительно, в go есть для этого интерфейс), а потом послать заблокированному треду сигнал, например SIGINTR. В случае, если чтение идет с терминала это эквивалентно нажатию ^C пользователем. Системный вызов read немедленно вернется с EINTR.


                  1. kay
                    03.08.2016 00:37

                    Может я тебя не так понял, но вот пример. Висит, пока его не прибьёшь KILL'ом.
                    https://play.golang.org/p/s2_qrgnbPQ
                    Потом как ты себе представляешь production приложение, которое само себе сигналы шлет? Тесты уж куда ни шло.


                    1. nikitadanilov
                      03.08.2016 00:46
                      +2

                      В чем проблема слать себе сигналы? Это стандартный POSIX интерфейс, с четкой семантикой.


                      1. kay
                        03.08.2016 08:49

                        По мне это выглядит как самому себе почту отправлять. Можно, но зачем?


                        1. nikitadanilov
                          04.08.2016 13:32

                          Почтальон звонит (дважды). :-)


                          1. kay
                            04.08.2016 13:35

                            Если интерес всё еще не пропал, то ниже выложил пример, который необходимо заставить правильно работать. https://habrahabr.ru/post/306914/#comment_9733560


                      1. ZyXI
                        03.08.2016 09:15

                        И жутко асинхронный. Ну пошлёт горутина сигнал, а ну как он прилетит когда read() уже отработал и Go уже начал обработку совершенно другой горутины? Ведь это вполне возможно, между принятием решения и, собственно, посылкой сигнала проходит время. Прилетает он тоже не мгновенно.


                        Полагаю, обработчик в этом случае «просто ничего не делай»: read и так вернётся с EINTR, а влияния на код никакого не будет… если только «совершенно другая горутина» не начала свой read, который вы немедленно прибьёте задолго до timeout’а (если он вообще будет нужен).


                        И ещё, откуда вы собрались добывать актуальный PID нужной нити? Go может взять и переместить горутину с одной системной нити на другую, никаких гарантий тут нет.


                        В общем, это попытка подковать блоху кузнечным молотом. Если руки алмазные, то пройдёт, но, скорее, добавите себе пачку ошибок.


                        1. creker
                          03.08.2016 14:30
                          +1

                          Справедливости ради, горутина скорее всего будет висеть на одном потоке и никуда не денется. Она заблокирована на syscall, а для этого им выделяется отдельный поток, который нигде больше не участвует кроме как для ожидания возвращения управления. Найти его PID можно с помощью ОС зависимых библиотек, я это успешно делал. Для пущей уверенности можно закрепить поток за горутиной с помощью «LockOSThread», хоть не известно, сохранится ли этот поток за горутиной при syscall или выделится новый.

                          В общем можно, но сломаться это может в любой момент, когда чего-нить в планировщике поменяют.


        1. creker
          02.08.2016 21:02

          Т.е. это обычный файловый дескриптор, значит висит syscall. Виснет это точно так же, как зависла бы в C или где-либо еще попытка чтения из stdin без каких-либо входящих данных. Тут или писать код, который умеет с этим работать, т.е. UNIX/Go way. Либо пытаться прикрутить асинхронность с таймаутами как советовали выше с помощью отдельной библиотеки. Мой совет, лучше подумать над архитектурой приложения, чтобы блокировку на stdin не нужно было посреди работы приложения завершать. Можете завести одну горутину, которая будет висеть и читать из stdin. Дальше уже можете на каналах с таймаутами реализовать все то, что нужно, чем городить epoll'ы и select'ы в сторонних библиотеках.


          1. kay
            02.08.2016 21:04

            Проще скинуть ссылку на github issue: https://github.com/coreos/fleet/issues/1499#issuecomment-209445880


            1. creker
              02.08.2016 21:24

              Нет желания разбираться, что у вас там конкретно как работает, но все более менее варианты вроде перечислили, даже с сигналами. Либо выделить одну горутину на stdin и через каналы пробрасывать из нее ввод дальше, либо пробовать асинхронность с таймаутами посредством сторонней библиотеки. Собственно, проблема не специфична для Go. Вообще, сам io.Copy вызов тут скорее всего не сильно уместен. Он отлично подходит для сокетов. Как только один из концов закроется или вылетит какая-либо ошибка, то все прервется и можно завершить работу. Но здесь единственное условие выхода это действие пользователя.


              1. kay
                02.08.2016 21:59

                Таймауты не вариант, они решают только последствия частных случаев. Даже если выделить goroutine на stdin — stdin останется открытым и следующая go routine не сможет с ним нормально работать, пока пользователь не нажмет enter.


                я пробовал закрыть файловый дескриптор на stdin, тогда goroutine завершается. но и терминал перестаёт работать.


                1. neolink
                  03.08.2016 09:23

                  > Даже если выделить goroutine на stdin — stdin останется открытым и следующая go routine не сможет с ним нормально работать
                  в смысле последующая?

                  func input(dc chan []byte) {
                      defer close(dc)
                      for {
                          data := make([]byte, 128)
                          n, err := os.Stdin.Read(data)
                          if n > 0 {
                              dc <- data[0:n]
                          }
                          if err != nil {
                              break
                          }
                      }
                  }
                  

                  вот такая горутина будет читать из stdin и писать в канал а дальше уже где нужно делаете worker, process или что у вас по смыслу где будете читать из этого канала:

                  func worker(dc chan []byte, done chan bool) {
                      select {
                          case data, ok := <- dc:
                              if !ok {
                                  // chanel closed exit
                                  return
                              }
                  
                              // do some work
                          case <- done:
                              // exit
                              return 
                      }
                  }
                  

                  соответственно когда нужно завершить эту горутину пишите в канал done


                  1. neolink
                    03.08.2016 09:32

                    внутри worker забыл for вокруг select написать, но думаю общий смысл передан


                  1. creker
                    03.08.2016 14:31

                    Ага, тот самый паттерн, который циркулирует в сети на эту тему. В общем-то, самый идиоматичный вариант.


                  1. kay
                    04.08.2016 13:20

                    Эту, это какую? worker? А какой смысл мне её завершать? Мне в данном случае требуется завершить input и прервать os.Stdin.Read(). Чтобы в следующем цикле я бы смог использовать os.Stdin для других целей.


                    1. Falstaff
                      04.08.2016 19:36
                      +1

                      @neolink меня поправит, если что, но думаю, имелось в виду другое — вы создаёте себе одну горутину, которая у вас будет создаваться сразу на старте, жить до самого завершения вашей программы, и задачей у неё будет читать os.Stdin и писать всё прочитанное в канал. (Или pipe, например, можно попробовать.) Тогда этот канал становится вашим stdin, вы можете завершать одних его слушателей и назначать других по необходимости. А горутина, которая читает os.Stdin, завершится сама на выходе приложения.


                  1. kay
                    04.08.2016 13:33

                    Вот пример кода, который необходимо заставить правильно работать:


                    https://gist.github.com/kayrus/2753b4710e78dd0f5e544baa0f5f4fa1


                    Нужно завершить go routine (и вернуть stdin в исходное состояние) три раза так, чтобы результирующий вывод консоли при вводе 123 был:


                    Scanning:
                    123
                    What was scanned at i=0: "123"
                    Scanning:
                    123
                    What was scanned at i=1: "123"
                    Scanning:
                    123
                    What was scanned at i=2: "123"


                    1. neolink
                      04.08.2016 20:37

                      эм, а можете словами описать, что вы хотите получить…
                      а то у вас прям сразу рейс между основным кодом и той горутиной что вы запускаете, как бы не очень понятно какой результат вы ожидаете


                      1. kay
                        04.08.2016 20:51

                        Запустить io.Copy, завершить io.Copy. Использовать stdin по другим назначениям (в моём случае Scanf). И так три раза. Scanf должен отработать правильно в трех случаях.


      1. Falstaff
        02.08.2016 21:20
        +3

        Это файл, скорее всего. Думаю, kay вот на эту проблему налетел: Issue 10001. Тогда Close() может не помочь, POSIX не обещает, что read(2) и close(2) не вызовут гонок, если вызвать из разных потоков. Поллер по ссылке выше, наверное, самое идиоматичное решение — жаль, что из коробки не идёт.