Параллельное программирование — одна из самых интересных фич, которые может предложить вам Golang.

Идея, лежащая в основе параллелизма, заключается в одновременной работе над несколькими разными процессами, что помогает избежать застревания в задачах, выполнение которых занимает много времени. Среди основных инструментов, которые предлагает GO для воплощения этого подхода программирования, можно выделить:

  • Горутины (goroutines): они представляют из себя функции, которые выполняются независимо. Мы можем представить себе их в виде процессов, выполняющихся в другом легковесном и дешевом потоке.

  • Каналы (channels): они позволяют обмениваться данными между работающими параллельно функциями и при необходимости синхронизировать их.

Горутины в сочетании с каналами дают нам простой способ реализации выполнения независимых задач и общения между ними.

За время работы GO-разработчиком я успел освоить несколько паттернов параллельного программирования, решающих распространенные проблемы, с которыми мы обычно сталкиваемся. В этой статье я собираюсь поделиться с вами пятью из них, которые, как я считаю, будут для вас особенно полезны.

Конструкция For-Select-Done

Но прежде чем говорить о самих паттернах, думаю, очень важно будет обратить ваше внимание на относительно простую структуру, которая активно используется в этой статье.

Поскольку мы оперируем несколькими горутинами, нам нужно быть особенно внимательными, чтобы не допустить никаких утечек памяти в нашей программе. На всякий случай напомню, что утечка происходит всякий раз, когда горутина остается висеть в нашей программе после того, как она нам больше не нужна. Решение заключается в том, чтобы посылать горутине сигнал, который дает ей знать, что она может завершиться.

Наиболее распространенным способом реализации этого механизма является объединение цикла for-select с отдельным каналом, который отправляет сигнал завершения горутине. Обычно его называют done-каналом.

func printIntegers(done <-chan struct{}, intStream <-chan int) {
  for{
    select {
      case i := <-intStream:
        fmt.Println(i)
      case <-done:
        return
    }
  }
}

forselectdone.go

В этом примере, как только мы запускаем функцию printIntegers в отдельной горутине, она сразу начинает прослушивать канал integerStream. Когда работающая в фоновом режиме функция printIntegers нам больше не нужна, мы можем устранить ее, просто закрыв done-канал.

Следует отметить, что мы можем добиться того же эффекта, используя пакет Context:

func printIntegers(ctx context.Context, intStream <-chan int) {
  for{
    select {
      case i := <-intStream:
        fmt.Println(i)
      case <-ctx.Done():
        return
    }
  }
}

forselectctx.go

Больше информации о функциях пакета Context вы можете найти в моем блогпосте “Как использовать пакет Context в Go”.

Когда контекст отменяется, функция printIntegers завершается. Так как конструкция for-select-done служит в качестве основы для остальных паттернов, разобравшись с ней, мы можем смело приступать к их изучению.

1. Паттерн Fan-In (слияние) 

Допустим, мы получаем данные из нескольких каналов. Мы можем захотеть перенаправить данные из нескольких источников в один поток.

Диаграмма паттерна Fan-In. Изображение автора
Диаграмма паттерна Fan-In. Изображение автора

Благодаря этому решению нам нужно будет управлять только одним каналом передачи данных вместо целого множества. Этот паттерн был представлен Робом Пайком (Rob Pike) в докладе о паттернах параллельного программирования в GO (который вы можете посмотреть на YouTube).

На практике этого можно реализовать, создав новый выходной канал и запустив горутины для всех источников данных, которые будут просто такими же каналами.

Каждая горутина будет отвечать за прослушивание определенного канала и пересылку полученных данных на выходной канал. Таким образом, мы объединяем все имеющиеся у нас каналы в один. В приведенном ниже коде показан пример реализации этого паттерна:

func fanIn(ctx context.Context, fetchers ...<-chan interface{}) <-chan interface{} {
  combinedFetcher := make(chan interface{})
  // 1
  var wg sync.WaitGroup
  wg.Add(len(fetchers))
  
  // 2
  for _, f := range fetchers {
    f := f
    go func() {
      // 3
    
      defer wg.Done()
      for{
        select{
        case res := <-f:
          combinedFetcher <- res
        case <-ctx.Done():
          return
        }
      }
    }()
  }
  
  // 4
  // Удаление каналов
  go func() {
    wg.Wait()
    close(combinedFetcher)
  } ()
  return combinedFetcher
}

fan-in.go

Давайте детальнее разберем приведенный выше код:

  1. В начале мы создаем waitGroup, которая должна дождаться завершения всех слушателей, сгенерированных внутри цикла for.

  2. Цикл for будет повторяться n раз, где n — количество фетчеров, полученных на вход. В каждой итерации мы будем запускать новую горутину, которая будет прослушивать фетчер.

  3. Горутина-слушатель представляет из себя конструкцию for-select-done. На каждой итерации она либо отправляет новые данные в канал combinedFetcher, либо завершится, когда контекст будет отменен.

  4. В конце мы запускаем еще одну горутину. Она ожидает завершения всех слушателей для того, чтобы закрыть канал combinedFetcher.

2. Берем первые n значений (Take First N) из потока

Следующий паттерн может нам пригодиться, когда нам нужно взять только первые 5, 10 или n значений, поступающие из канала.

Берем первые N значений. Изображение автора
Берем первые N значений. Изображение автора

Идея заключается в том, чтобы создать канал с ограниченным количеством выходов. Допустим, мы читаем сообщения из потока данных, и нас интересуют только первые 5 сообщений. Также давайте предположим, что мы не контролируем сам источник данных, мы можем только читать из него. Этот паттерн является хорошим решением этой задачи:

func takeFirstN(ctx context.Context, dataSource <-chan interface{}, n int) <-chan interface{} {
    // 1
    takeChannel := make(chan interface{})
    
    // 2
    go func() {
        defer close(takeChannel)

        // 3 
        for i := 0; i< n; i++ {
          select {
            case val, ok := <-dataSource:
              if !ok{
                return
              }
              takeChannel <- val
            case <-ctx.Done():
              return
          }
        }
    }()
    return takeChannel
}

take.go hosted

Давайте разберемся, что происходит внутри функции TakeFirstN:

  1. Мы создаем новый канал, который будет доставлять полученные данные.

  2. Создается новая горутина. Кроме прочего, она добавляет отложенный вызов для закрытия канала takeChannel.

  3. Внутри горутины реализуется конструкция for-select-done с небольшой модификацией. Вместо того, чтобы запускать бесконечный цикл for, мы ограничиваем его итерации до n входных данных.

Пример реализации:

func main() {
  done := make(chan struct{})
  defer close(done)

  // Генерирует канал, отправляющий целые числа от 0 до 9
  range10 := rangeChannel(done, 10)

  for num := range takeFirstN(done, range10, 5) {
      fmt.Println(num)
  }
}

takemain.go

0
1
2
3
4

Нам удалось получить первые 5 входных данных канала range10 без необходимости закрывать его или каких-либо других манипуляций.

3. Паттерн подписки (Subscription)

Речь идет о паттерне, представленном в докладе Google I/O 2013 — Advanced Go Concurrency Patterns Talk (который вы можете посмотреть здесь). Этот паттерн представлен здесь почти в том же виде, что и в докладе, но с небольшими изменениями, благодаря которым, с моей точки зрения, он работает немного лучше.

Давайте представим, что мы хотим слушать событие, которое генерируется на регулярной основе. Например: нам нужно получать обновления из API каждые 15 секунд. Этот паттерн использует интерфейс Subscription, отвечающий только за доставку новых данных:

type Subscription interface {
  Updates() <-chan Item
}

Мы также собираемся использовать другой интерфейс в качестве абстракции для получения нужных нам данных (фетчер):

type Fetcher interface {
  Fetch() (Item, error)
}

Для каждого из них у нас будет конкретный тип, реализующий их.

Для подписки:

func NewSubscription(ctx context.Context, fetcher Fetcher, freq int) Subscription {
  s := &sub{
    fetcher: fetcher,
    updates: make(chan Item),
  }
// Запуск задачи, предназначенной для получения наших данных
  go s.serve(ctx, freq)
  return s
}
type sub struct {
  fetcher Fetcher
  updates chan Item
}
func (s *sub) Updates() <-chan Item {
  return s.updates
}

То, что происходит внутри метода serve, мы рассмотрим чуть ниже.

Для фетчера:

func NewFetcher(uri string) Fetcher {
  f := &fetcher{
    uri: uri,
  }
  return f
}
type fetcher struct {
  uri   string
}

Внутри метода serve

Метод serve состоит из уже привычной для нас конструкции for-select-done:

func (s *sub) serve(ctx context.Context, checkFrequency int) {
  clock := time.NewTicker(time.Duration(checkFrequency) * time.Second)
  type fetchResult struct {
    fetched Item
    err     error
  }
  fetchDone := make(chan fetchResult, 1)
  
  for {
    select {
    // Таймер, который запускает фетчер
    case <-clock.C:
      go func() {
	fetched, err := s.fetcher.Fetch()
	fetchDone <- fetchResult{fetched, err}
      }()
    // Случай, когда результат фетчера готов к использованию
    case result := <-fetchDone:
      fetched := result.fetched
      if result.err != nil {
        log.Println("Fetch error: %v \n Waiting the next iteration", result.err.Error())
	break
      }
      s.updates <-fetched
    // Случай, когда нам нужно закрыть сервер
    case <-ctx.Done():
      return
    }
  }
}

serve.go

Давайте пройдемся по этому коду, чтобы понять, что тут происходит:

  1. В первую case-ветку нашего switch-оператора мы будем регулярно попадать по таймеру. Что касается фетчера, мы собираемся запускать его внутри другой горутины, чтобы мы оказались не заблокировали, если это займет немного больше времени.

  2. Когда результат фетчера, наконец, будет готов к отправке, он будет получен во второй case-ветке. В случае ошибки она прерывает оператор select и ожидает следующей итерации.

  3. Третья case-ветка — это стандартный случай ожидания завершения контекста.

3.5 Улучшенный паттерн подписки 

Паттерн подписки, в его первоначальном виде, прекрасно работал в моем коде. Но на самом деле в нем есть один баг, который не так просто отловить.

В строке 26 мы отправляем в канал updates данные, который ожидает получить подписчик. Проблема заключается в том, что если подписчик не готов считывать данные, эта строка кода останется заблокированной.

К счастью, есть способ обойти эту проблему, превратив эту строку в отдельную case-ветку в select-операторе.

case s.updates <- fetched:

И объявив получаемую переменную перед циклом for-select.

Но что, если полученные данные не готовы к отправке?

Как оказалось, здесь можно использовать свойства nil-channel’ов. Всякий раз, когда мы пытаемся писать в nil-channel, он остается заблокированным.

var fetchResponseStream chan Item
fetched := Item{//Some data here//}
fetchResponseStream <- fetched // здесь блокируется

Кроме того, мы собираемся создать флаг pending, чтобы отслеживать, можно ли отправить полученный элемент:

var fetched Item
var fetchResponseStream chan Item
var pending bool
...
for {
  if pending {
    fetchResponseStream = s.updates
  } else {
    fetchResponseStream = nil    
  }
  select {
  ...
  
  case: fetchResponseStream <- fetched:
    pending = false

Давайте соберем все вместе:

func (s *sub) serve(ctx context.Context, checkFrequency int) {
  clock := time.NewTicker(time.Duration(checkFrequency) * time.Second)
  type fetchResult struct {
    fetched Item
    err     error
  }
  fetchDone := make(chan fetchResult, 1)

  var fetched Item
  var fetchResponseStream chan Item
  var pending bool
  
  for {
    if pending {
      fetchResponseStream = s.updates
    } else {
      fetchResponseStream = nil
    }
    
    select {
    // Таймер, который запускает фетчер
    case <-clock.C:
      // Ждем следующей итерации, если pending истина
      if pending { break }
      go func() {
	fetched, err := s.fetcher.Fetch()
	fetchDone <- fetchResult{fetched, err}
      }()
      
    // Случай, когда результат фетчера готов быть считанным
    case result := <-fetchDone:
      fetched = result.fetched
      if result.err != nil {
        log.Println("Fetch error: %v \n Waiting the next iteration", result.err.Error())
	break
      }
      pending = true
      
    // Данные можно отправлять по каналу
    case fetchResponseStream <- fetched:
      pending = false
      
    // Случай, когда нам нужно закрыть сервер
    case <-ctx.Done():
      return
    }
  }
}

serve-fixed.go

С помощью переменной pending в канале fetchResponseStream, мы смогли снизить вероятность блокировки внутри одной из case-веток.

В функции main:

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  subscription := NewSubscription(ctx, NewFetcher("http.example.url.com"), 4)

  time.AfterFunc(3*time.Second, func() {
    cancel()
    fmt.Println("Cancelling context:")
  })

  for item := range subscription.Updates() {
    fmt.Println(item)
  }
}

serve-main.go

Map, Filter

Когда мы имеем дело с потоками данных, нам может понадобиться как-то обрабатывать входящие значения. Умножение, деление, ограничение до фиксированной границы – возможностей много.

В современном Javascript массивы имеют некоторые встроенные методы, такие как map и filter, которые используются для обработки или выбора данных из массива. Почему бы не применить эту концепцию для обработки потока, поступающего из канала?

4. Паттерн Map

На примере простого умножения на 2:

Диаграмма паттерна Map. Изображение автора
Диаграмма паттерна Map. Изображение автора

Чтобы не усложнять наш пример без особой на то необходимости предположим, что тип данных на входе и на выходе — это канал целых чисел.

func Map(done <-chan struct{}, inputStream <-chan int, operator func(int)int) <-chan int {
    // 1
    mappedStream := make(chan int)
    go func() {
      defer close(mappedStream)
      // 2
      for {
        select {
        case <-done:
          return
        // 3
        case i, ok := <-inputStream:
          if !ok { return }

          //4
          select {
            case <-done:
                return
            case mappedStream <- operator(i):
          }
        }
      }
    }()
    return mappedStream

map.go

Разбор кода:

  1. Он начинается с создания нового канала, запуска новой горутины и отсрочки закрытия канала внутри нее.

  2. Конструкция for-select-done слушает inputStream.

  3. Мы проверяем, закрыт ли inputStream, и если это так, то функция возвращает значение.

  4. Второй select-оператор внутри будет ожидать сигнала done или возможности переслать данные на канал mappedStream. Этот последний шаг нужен для того, чтобы гарантировать, что горутина всегда завершается, когда done-канал посылает свой сигнал.

Небольшое отличие заключается в том, что для завершения горутины вместо контекста мы используем done-канал.

Этот паттерн может пригодиться при построении конвейера данных, который должен проходить несколько этапов. Он позволяет нам разделить различную логику работы на несколько шагов.

Пример реализации:

func main() {
  done := make(chan struct{})
  defer close(done)
  
  // Генерирует канал, отправляющий целые числа от 0 до 9
  range10 := rangeChannel(done, 10)
  
  multiplierBy10 := func(x int) int {
    return x * 10
  }
  for num := range Map(done, range10, multiplierBy10) {
      fmt.Println(num)
  }
}

mapmain.go

0
10
20
30
40
50
60
70
80
90

5. Паттерн Filter

Предположим, у нас есть поток целых чисел, и мы хотим получить только значения больше 10. Мы можем применить этот паттерн, чтобы пропускать только интересующие нас значения:

Паттерн Filter. Изображение автора
Паттерн Filter. Изображение автора

Как и в паттерне Map, мы предполагаем, что входные и выходные типы являются каналом целых чисел, но это пример можно расширить для работы с любым другим типом:

func Filter(done <-chan struct{}, inputStream <-chan int, operator func(int)bool) <-chan int {
  filteredStream := make(chan int)
  go func() {
    defer close(filteredStream)
  
    for {
      select {
      case <-done:
          return
      case i, ok := <-inputStream:
        if !ok {
          return
        }
        
        if !operator(i) { break }
        select {
          case <-done:
            return
          case filteredStream <- i:
        }
      }
    }
  }()
  return filteredStream
}

filter.go hosted

Здесь мы видим почти ту же структуру, что и в паттерне Map. Разница только в том, что функция operator возвращает логическое значение. Если она возвращает true, то отправляем его в выходной поток, иначе просто отбрасываем.

Пример реализации:

func main() {
  done := make(chan struct{})
  defer close(done)
  
  // Генерирует канал, отправляющий целые числа от 0 до 9
  range10 := rangeChannel(done, 10)
  
  isEven := func(x int) bool {
    return x % 2 == 0
  }
  for num := range Filter(done, range10, isEven) {
      fmt.Println(num)
  }
}

filtermain.go

0
2
4
6
8

Заключение

Эти пять паттернов являются строительными блоками для создания более крупных и сложных Golang-приложений. Это готовые решения для задач, с которыми вы непременно столкнетесь при реализации параллелизма в GO. Кроме того, вы можете изменять, расширять и создавать новые паттерны на их основе!

Спасибо, что дочитали до конца! Если эта статья была вам полезна, подпишитесь на меня на Medium, где вы можете найти больше таких статей!

Ссылки

Вот ссылки, которые я использовал, чтобы написать эту историю:


Сегодня вечером в OTUS пройдет Mock-собеседование со студентом курса Golang, которое можно посетить бесплатно. На этой встрече проведем собеседование по различным темам, связанным с масштабированием нагрузки, отказоустойчивостью систем, внутреннего устройства баз данных, а также различных паттернов разработки бэкенда. Приглашаем всех желащих. Записаться можно по ссылке.

Комментарии (5)


  1. denis-isaev
    00.00.0000 00:00
    +2

    Параллельное программирование - это когда параллельно программируют.


    1. shushu
      00.00.0000 00:00

      Я думал то парное программирование ...


      1. denis-isaev
        00.00.0000 00:00
        +1

        Парное программирование - это когда программируют парами :)


  1. JekaMas
    00.00.0000 00:00
    +6

    Паттерн 0 - не используй конкурентность, пока это не стало совершенной необходимостью.


  1. evg1605
    00.00.0000 00:00

    Возможно я что то упустил, но по serve-fixed.go - во первых pending переменная в самом начале не страхует от повторного создания фетча в самом начале - посему проблема остается.
    Ветка в которой создается новый фетч никак не предотвращает создания повторного фетча по истечении таймера, возможно так нужно НО, - все фетчи работают паралельно и какой из них завершиться раньше в общем случае не предсказуемо - поэтому в данной реализации подписчик может получить новые обновления, а затем более старые обновления.
    Далее ИМХО - лучше использовать переменные типа каналов прямо в case - не разруливать это выше через логический флаг.
    Я понимаю что сие не к автору поста, ибо это перевод, но всё же )))