Привет, Хабр!

Я просмотрел не мало источников, много из них были англоязычными, но хочу выделить отдельное спасибо авторам этих статей на Xабре:

Эта статья является неким сборником многих статей про все, что связано с параллелизмом в Go, например: горутины, каналы, select и многое другое. При создании статьи моя "карта" выглядела так:

Я хотел углубиться в тему горутин с параллелизмом и, слушая на фоне "The Doors", поглощал информацию и выделял интересные мысли из статей

Надеюсь, Вы найдете для себя то, что искали.

Горутины

Появление

Основным источником появления горутин и каналов в Go послужило CSP. Эта модель основывается на коммуникации параллельных программ через каналы.

Объяснение

Думаю, наиболее легкое объяснение горутинам такое: горутина (goroutine) — это легковесный оберточный функционал над потоком.

В Go приняты такие термины: G (Goroutine) — Горутина M (Machine) — Машина Каждая Машина работает в отдельном потоке и способна выполнять только одну Горутину в момент времени. Планировщик операционной системы, в которой работает программа, переключает Машины. Число работающих Машин ограничено переменной среды GOMAXPROCS или функцией runtime.GOMAXPROCS(n int). По умолчанию она равна количеству ядер процессора компьютера, на которой было запущено приложение.

Для запуска функции как горутину необходимо написать go func() , где func() - функция, которую хотите запустить. Имейте ввиду, что если вы сделаете так:

package main

import "fmt"

func main() {
  // цикл с 5-ю итерациями
  for i := 0; i < 5; i++ {
    // вызываем функцию greeting() как горутину
    go greeting()
 }
}

func greeting() {
  fmt.Println("Hello World!!!")
}

Go playground: https://go.dev/play/p/xdQFn8gEBFW

Ничего не произойдет, ведь горутина выполняется параллельно с "main()" и "main()" закончится еще до завершения горутины, а если функция "main()" завершается, завершается и вся программа.

Для того, чтобы программа отработала корректно, необходимо дождаться выполнения горутины. Надо использовать WaitGroup, но об этом в следующей главе. Пока что обойдемся time.Sleep(2 * time.Second) . Функция "main()" "заснет" на две секунды, тем самым дождавшись выполнения "greeting()". Вот исправленный пример:

package main

import (
	"fmt"
	"time"
)

func main() {
         // цикл с 5-ю итерациями
	for i := 0; i < 5; i++ {
                // вызываем функцию greeting() как горутину
		go greeting()
	}
        // ожидаем незавершившиеся горутины
	time.Sleep(2 * time.Second)
}

func greeting() {
	fmt.Println("Hello World!!!")
}

Go playground: https://go.dev/play/p/KRwf_oyd0c1

Пакет "sync"

WaitGroup(замена time.Sleep())

WaitGroup - это примитивная синхронизация, используемая для ожидания того, как множество горутин завершат свое выполнение. Пакет "sync" предоставляет тип "sync.WaitGroup" и его методы "Add()", "Done()" и "Wait()". По идее, WaitGroup Вы используете вместо того, что бы писать time.Sleep(). Это и более надежно, и ждать перестанете именно в тот момент, как все выполнится, а не когда закончится "sleep()". Давайте посмотрим простой пример с WaitGroup:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // создаем WaitGroup
    wg := &sync.WaitGroup{}
    // цикл с 5-ю итерациями
    for i := 0; i < 5; i++ {
        // добавляем в список ожидания одну горутину
        wg.Add(1)
        go func(i int) {
            // говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
            defer wg.Done()
            // засыпаем, имитируя какую-то работу
            time.Sleep(time.Duration(i) * time.Second)
            fmt.Println("Горутина", i, "завершила свое выполнение")
        }(i)
    }
    // ожидаем незавершившиеся горутины
    wg.Wait()
    fmt.Println("Все горутины завершили свое выполнение!")
}

Go playground: https://go.dev/play/p/XdsoK-FSPR_m

Здесь мы использовали WaitGroup для ожидания пяти горутин. Перед стартом каждой из них, мы использовали "wg.Add(1)" для увеличения счетчика горутин на 1. Внутри горутин, мы вызывали "wg.Done()"(что отнимает 1 от счетчка горутин) когда горутина заканчивала свое выполнение. Наконец, в "main()" мы вызывали "wg.Wait()" для ожидания завершения всех горутин.

Mutex

Mutex (сокращение "mutual exclusion", что в переводе означает "взаимное изолирование") - это некая примитивная синхронизация, которая используется для защиты от использования данных, разрешая ими пользоваться только одной горутине. В Go, пакет "sync" предоставляет тип "sync.Mutex", который имеет 2 метода: "Lock()" и "Unlock().

Давайте посмотрим простой пример использования мютекса для защиты счетчика:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var (
      counter int // счетчик
      lock    sync.Mutex // наш mutex
    )
    
    // создаем WaitGroup
    wg := &sync.WaitGroup{}

    // цикл с 5-ю итерациями
    for i := 0; i < 5; i++ {
        // добавляем в список ожидания одну горутину
        wg.Add(1)
        go func() {
            // говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
            defer wg.Done()
            // используем mutex для блокировки использования счетчика другими горутинами
            lock.Lock()
            // увеличиваем счетчик
            counter++
            fmt.Println("Счетчик:", counter)
            // разблокируем счетчик
            lock.Unlock()
        }()
    }
    // ожидаем незавершившиеся горутины
    wg.Wait()
    fmt.Println("Итоговый счетчик:", counter)
}

Go playground: https://go.dev/play/p/eDm4HHbAGEB

В этом примере, мы использовали "sync.Mutex" для защиты от использования переменной "counter". Перед обновлением счетчика, мы вызываем "lock.Lock()" для создания блокировки. Теперь ни одна другая горутина не сможет использовать "counter". После обновления счетчика, мы вызываем "lock.Unlock()" для удаления блокировки. После разблокировки счетчика, им снова может пользоваться кто-угодно.

Atomic operations/атомарные операции

Атомарные операции - это низкоуровневые примитивы синхронизации, которые обеспечивают способ выполнения операций чтения-модификации-записи (read-modify-write) над общими переменными без необходимости в блокировке (Атомарная операция — операция, которая либо выполняется целиком, либо не выполняется вовсе; операция, которая не может быть частично выполнена и частично не выполнена). Пакет "sync/atomic" предоставляет несколько атомарных операций для целых чисел(int), указателей и чисел с плавающей точкой(float). Вот простой пример с использованием атомарных операций:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    // объявляем счетчик
    var counter int64

    // создаем WaitGroup
    wg := &sync.WaitGroup{}

    // цикл с 5-ю итерациями
    for i := 0; i < 5; i++ {
        // добавляем в список ожидания одну горутину
        wg.Add(1)
        go func() {
            // говорим, чтобы в конце анонимной функции одна горутина из списка исчезла
            defer wg.Done()
            // атомарной операцией добалвяем к счетчику единицу
            atomic.AddInt64(&counter, 1)
            fmt.Println("Счетчик:", atomic.LoadInt64(&counter))
        }()
    }
    // ожидаем незавершившиеся горутины
    wg.Wait()
    fmt.Println("Итоговый счетчик:", counter)
}

Go playground: https://go.dev/play/p/cLvSy7Mccdj

В этом примере, мы использовали функцию "atomic.AddInt64()" для атомарного изменения переменной "counter". Эта функция добавляет заданное значение к счетчику и возвращает новое. Мы также использовали функцию "atomic.LoadInt64()" для атомарного считывания текущего значения счетчика(переменная "counter"). Обратите внимание, что нам не нужно использовать мютекс для защиты счетчика, так как атомарные операции обеспечивают его безопасное обновление.

Каналы

Определение

Простыми словами: каналы - это инструменты коммуникации между горутинами.

Технически - это конвейер, откуда можно считывать или помещать данные. То есть одна горутина может отправить данные в канал, а другая — считать помещенные в этот канал данные.

Использование

Создание

Создается канал так: ch := make(chan int) . На месте "int" может быть любой тип данных, например: string.

Также можно использовать и "make()" со вторым аргументом - с размером канала. Таким образом он станет буферизорованным.

Длина и ёмкость

Буферизированный канал имеет длину и емкость. Длина канала - это количество значений в очереди в буфере канала, которые не были прочитаны. Емкость - это размер самого буфера канала. Для того, чтобы получить длину, мы используем функцию len(Ваш канал), а для получения емкости - cap(Ваш канал).

Закрытие

Канал можно и закрыть. Для этого надо вызвать функцию close(Ваш канал), тем самым заблокировав доступ к чтению из него. Если необходимо проверить, закрыт ли канал, или возможно ли прочитать из него данные - напишите такое выражение: val, ok := <- Ваш канал, где val - переменная, в которую запишется значение с канала, если это возможно, а ok - булева переменная, где true означает, что из канала можно прочитать данные, а false - что он закрыт/невозможно считать.

For range

С помощью for range можно читать данные из закрытого буферизированного канала, так как данные остаются в буфере даже после закрытия канала. Для этого необходимо написать такую структуру for range: for elem := range Ваш канал , где elem - элемент из канала. Вот пример:

package main

import "fmt"

func main() {
        // создаем канал
       c := make(chan int, 5)

        // записываем числа в канал
       c <- 1
       c <- 2
       c <- 101
       c <- 102
       c <- 1000

       // закрываем канал 
       close(c)

       // циклом идем по каналу
       for num := range c {
         fmt.Println(num)
       }
}

Go playground: https://go.dev/play/p/DjMEfLAsyZk

Deadlock

Если Вы будете считывать данные из пустого канала, Вы получите ошибку "Deadlock", которая появляется при бесконечном считывании из канала. Появляется она из-за того, что при считывании данных с канала, программа будет пытаться их считать до последнего, даже если их нет. Получается, что если данных нет и никогда не будет в канале, программа будет бесконечно пытаться. Так и появляется Deadlock. Вот пример:

package main

func main() {
        // создаем канал
	c := make(chan int)

        // пытаемся получить данные с канала, которые отсутствуют. Deadlock!
	<-c
}

Go playground: https://go.dev/play/p/1CSAVnWNEOf

Select

Определение

select - это почти что switch, но без аргументов. Есть и еще одна особенность - используется только для оперций с каналами.

Использование

Select, когда два канала готовы к чтению данных одновременно

package main
import "fmt"

func main() {

  // создаем каналы
  number := make(chan int)
  message := make(chan string)
  
 // вызываем функции как горутины
 go channelNumber(number)
 go channelMessage(message)

 // select
 select {
   case firstChannel := <- number:
     fmt.Println("Данные канала:", firstChannel)

   case secondChannel := <- message:
     fmt.Println("Данные канала:", secondChannel)
 }

}

// горутина, которая записывает число в канал
func channelNumber(number chan int) {
  // записываем число в канал
  number <- 15
}

// горутина, которая записывает строку в канал
func channelMessage(message chan string) {
  // записываем строку в канал
  message <- "Погружение в параллелизм в Go"
}

Go playground: https://go.dev/play/p/xlXKMfRVBSh

Здесь в селекте мы выбираем один из каналов. Одна горутина записывает данные в канал "number", а другая - в "message". Так как оба канала подготовлены, результат программы случаен(но на самом деле, в данном случае будет "Погружение в параллелизм в Go")

Select, когда один из каналов готов первым

package main

import (
  "fmt"
  "time"
)

func main() {

   // создаем каналы
  number := make(chan int)
  message := make(chan string)

  // вызываем функции как горутины
  go channelNumber(number)
  go channelMessage(message)

  // select
  select {
    case firstChannel := <-number:
      fmt.Println("Данные канала:", firstChannel)

    case secondChannel := <-message:
      fmt.Println("Данные канала:", secondChannel)
  }

}

// горутина для записи числа в канал
func channelNumber(number chan int) {
  // записываем число в канал
  number <- 15
}

// горутина для записи строки в канал
func channelMessage(message chan string) {
  
  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  // записываем строку в канал
  message <- "Погружение в параллелизм в Go"
}

Go playground: https://go.dev/play/p/vSVJT_F_Hn1

Здесь тоже есть две функции, которые записывают данные в каналы "number" и "message", но при одном условии. При записи в канал "message", функция засыпает на 2 секунды, делая "message" не готовым к использованию к тому моменту, как "number" уже будет заполненным.

Select, когда два канала одновременно запаздывают на 2 секунды

package main
import (
  "fmt"
  "time"
)

func main() {

  // создаем каналы
  number := make(chan int)
  message := make(chan string)

  // вызываем функции как горутины
  go channelNumber(number)
  go channelMessage(message)

  // select
  select {
    case firstChannel := <-number:
      fmt.Println("Данные канала:", firstChannel)

    case secondChannel := <-message:
      fmt.Println("Данные канала:", secondChannel)
  }

}

// горутина для записи числа в канал
func channelNumber(number chan int) {

  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  number <- 15
}

// горутина для записи строки в канал
func channelMessage(message chan string) {

  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  message <- "Погружение в параллелизм в Go"
}

Go playground: https://go.dev/play/p/2OQOmTMmkt-

В данном случае - обе функции засыпают на 2 секунды. Что же будет делать select? Ответ: ничего. Он будет ждать, пока хоть один канал будет готов к чтению. Так что, здесь как и в первом случае, ответ будет случайным, так как спустя 2 секунды оба канала уже будут готовы

Default

package main

import (
  "fmt"
  "time"
)

func main() {

  // создаем каналы
  number := make(chan int)
  message := make(chan string)

  // вызываем функции как горутины
  go channelNumber(number)
  go channelMessage(message)

  // select
  select {
    case firstChannel := <-number:
      fmt.Println("Данные канала:", firstChannel)

    case secondChannel := <-message:
      fmt.Println("Данные канала:", secondChannel)
	
    // default case 
    default:
      fmt.Println("Подожди!!! Каналы еще не готовы к чтению!")
  }

}

// горутина для записи числа в канал
func channelNumber(number chan int) {
  
  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)

  number <- 15
}

// горутина для записи строки в канал
func channelMessage(message chan string) {

  // функция засыпает на 2 секунды
  time.Sleep(2 * time.Second)
 
  message <- "Погружение в параллелизм в Go"
}

Go playground: https://go.dev/play/p/S7psSqvugQS

Тут - вместо того, чтобы преостанавливать выполнение функции "main" селектом в ожидании готовности каналов, добавляем default (да, все равно, что в switch-case) и теперь, если ни один из каналов не будет готовен к выполнению select, в консоль выведется сообщение "Подожди!!! Каналы еще не готовы к чтению!"

Паттерны (concurrency patterns)

Fan-out

Fan out используется тогда, когда несколько горутин читают из одного и того же канала. Это чтение закончится только тогда, когда канал закроется.

Пример:

package main
 
import (
   "fmt"
   "time"
)
 
// функция по заполнению канала числами
func generate(nums ...int) <-chan int {
   // создаем канал
   out := make(chan int)
   go func() {
       // идем циклом по переданным числам
       for _, n := range nums {
           // записываем каждое число в канал
           out <- n
       }
   }()
   
   // возвращаем канал
   return out
}

func main() {
   fmt.Println("Запускаем Fan Out ")
   // генерируем канал с числами 1, 2, 3
   c1 := generate(1, 2, 3)
 
   // запускаем первую горутину
   go func() {
       // циклом идем по первому каналу и печатаем каждое число из него
       for num := range c1 {
           fmt.Println(num)
       }
   }()
   // запускаем вторую горутину
   go func() {
       // циклом идем по первому каналу и печатаем каждое число из него
       for num := range c1 {
           fmt.Println(num)
       }
   }()
 
   // ожидаем незавершившиеся горутины
   time.Sleep(time.Second * 2)
}

Здесь мы два раза вызываем функцию "generate()", в которой определяем канал int, запускаем горутину и в неё записываем переданные в функцию числа. После вызова горутины возвращаем канал. Получив два канала, запускаем две горутины, которые будут получать числа из канала 1 и канала 2, а после этого выводить их в консоль. В конце "засыпаем" на 2 секунды для того, чтобы дождаться завершения всех горутин.

Fan-in

Fan in используется тогда, когда одна функция читает с нескольких каналов, пока они не будут закрыты. Это полезно для, например, агрегации результатов параллельно выполняющихся задач.

Давайте напишем Fan-in:

func merge(in ...<-chan int) <-chan int {
   // создаем WaitGroup
   var wg sync.WaitGroup

   // создаем итоговый канал
   out := make(chan int)
 
   // записываем функцию в переменную "output"
   output := func(c <-chan int) {
       // говорим, чтобы в конце анонимной функции одна горутина из списка ожидания исчезла
       defer wg.Done()
       // циклом идем по каналу "c"
       for n := range c {
           // в итоговый канал "out" записываем числа из канала "c"
           out <- n
       }
   }
   // добавляем в список ожидания столько же горутин, сколько каналов "in" было передано
   wg.Add(len(in))
   // циклом идем по всем каналам "in"
   for _, c := range in {
       // вызываем "output" как горутину
       go output(c)
   }
   // ожидаем незавершившиеся горутины
   wg.Wait()
   return out
}

В этой функции мы объеденяем несколько каналов "in" в один канал "out"

Коротко про pipeline

Pipeline - это множество обработчиков и каждый из них принимает какие-либо входные данные, что-то сними делает, обрабатывает их и передает следующему.

Заключение

Я долго писал эту статью, вытаскивая из каждого источника самое важно и, немного переделывая, доделывая, писал все сюда. Надеюсь, Вы прочитали для себя то, что искали. А вот все источники, откуда я брал информацию:

Комментарии (1)