Конкурентность — одна из самых мощных возможностей Go, и её освоение критически важно для создания масштабируемых и эффективных приложений. В этой статье мы рассмотрим 7 шаблонов конкурентности в Go, которые помогут вам писать надёжный код.
1. Пул воркеров
Описание: Пул воркеров создаёт фиксированное количество горутин, которые обрабатывают задачи из общей очереди. Этот шаблон полезен для управления количеством одновременно выполняемых задач и оптимизации использования ресурсов.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Воркер %d начал задачу %d\n", id, job)
time.Sleep(time.Second)
fmt.Printf("Воркер %d завершил задачу %d\n", id, job)
results <- job * 2
}
}
func main() {
const numJobs = 5
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
close(results)
for result := range results {
fmt.Println("Результат:", result)
}
}
Реальный сценарий: Веб-сервер, обрабатывающий входящие HTTP-запросы, где каждый запрос обрабатывается воркером из пула.
2. Fan-Out / Fan-In
Описание: Fan-Out запускает несколько горутин для параллельной обработки данных, а Fan-In собирает результаты в единый канал. Этот шаблон полезен для параллельной обработки с последующей агрегацией.
package main
import (
"fmt"
"sync"
)
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- i
fmt.Printf("Производитель %d создал %d\n", id, i)
}
}
func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range in {
out <- v * 2
fmt.Printf("Потребитель %d обработал %d\n", id, v)
}
}
func main() {
numProducers := 2
numConsumers := 2
input := make(chan int, 10)
output := make(chan int, 10)
var wg sync.WaitGroup
for i := 1; i <= numProducers; i++ {
wg.Add(1)
go producer(i, input, &wg)
}
wg.Wait()
close(input)
for i := 1; i <= numConsumers; i++ {
wg.Add(1)
go consumer(i, input, output, &wg)
}
wg.Wait()
close(output)
for result := range output {
fmt.Println("Результат:", result)
}
}
Реальный сценарий: Конвейер обработки данных, где разные этапы выполняются разными наборами воркеров.
3. Пайплайн
Описание: Пайплайн объединяет несколько этапов обработки, где каждый этап выполняет преобразование данных и передаёт их следующему этапу. Подходит для последовательной обработки данных.
package main
import "fmt"
func stage1(nums []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func stage2(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
}
close(out)
}()
return out
}
func stage3(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + 1
}
close(out)
}()
return out
}
func main() {
nums := []int{1, 2, 3, 4, 5}
c1 := stage1(nums)
c2 := stage2(c1)
c3 := stage3(c2)
for result := range c3 {
fmt.Println(result)
}
}
Реальный сценарий: Система обработки изображений, где изображение проходит через этапы масштабирования, фильтрации и кодирования.
4. Публикация-Подписка
Описание: Шаблон "Публикация-Подписка" позволяет публиковать сообщения для нескольких подписчиков. Полезен в системах, где разные сервисы должны независимо реагировать на события.
package main
import (
"fmt"
"sync"
"time"
)
type PubSub struct {
mu sync.Mutex
channels map[string][]chan string
}
func NewPubSub() *PubSub {
return &PubSub{
channels: make(map[string][]chan string),
}
}
func (ps *PubSub) Subscribe(topic string) <-chan string {
ch := make(chan string)
ps.mu.Lock()
ps.channels[topic] = append(ps.channels[topic], ch)
ps.mu.Unlock()
return ch
}
func (ps *PubSub) Publish(topic, msg string) {
ps.mu.Lock()
for _, ch := range ps.channels[topic] {
ch <- msg
}
ps.mu.Unlock()
}
func (ps *PubSub) Close(topic string) {
ps.mu.Lock()
for _, ch := range ps.channels[topic] {
close(ch)
}
ps.mu.Unlock()
}
func main() {
ps := NewPubSub()
subscriber1 := ps.Subscribe("news")
subscriber2 := ps.Subscribe("news")
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for msg := range subscriber1 {
fmt.Println("Подписчик 1 получил:", msg)
}
}()
go func() {
defer wg.Done()
for msg := range subscriber2 {
fmt.Println("Подписчик 2 получил:", msg)
}
}()
ps.Publish("news", "Срочные новости!")
ps.Publish("news", "Ещё новости!")
time.Sleep(time.Second)
ps.Close("news")
wg.Wait()
}
Реальный сценарий: Система обмена сообщениями, где сервисы подписываются на определённые типы событий.
5. Select с таймаутом
Описание: Использование select
с таймаутом позволяет избежать бесконечных блокировок. Полезно, когда нужно выполнить действие или прервать операцию, если она занимает слишком много времени.
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
go func() {
time.Sleep(2 * time.Second)
c <- "результат"
}()
select {
case res := <-c:
fmt.Println("Получено:", res)
case <-time.After(1 * time.Second):
fmt.Println("Таймаут")
}
}
Реальный сценарий: Сетевой клиент, который пытается подключиться к серверу и останавливается, если сервер не отвечает вовремя.
6. Семафор
Описание: Семафор ограничивает количество горутин, которые могут одновременно обращаться к ресурсу. Полезен для управления конкурентностью и предотвращения перегрузки ресурсов.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
sem <- struct{}{} // Захват семафора
fmt.Printf("Воркер %d начал\n", id)
time.Sleep(time.Second)
fmt.Printf("Воркер %d завершил\n", id)
<-sem // Освобождение семафора
}
func main() {
const numWorkers = 5
const maxConcurrent = 2
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
wg.Wait()
}
Реальный сценарий: Пул подключений к базе данных, где одновременно допускается ограниченное количество подключений.
7. Ограничение частоты
Описание: Ограничение частоты управляет скоростью обработки событий с помощью тикера. Полезно, когда нужно контролировать частоту выполнения задач, например, запросов к API.
package main
import (
"fmt"
"time"
)
func main() {
rate := time.Second
ticker := time.NewTicker(rate)
defer ticker.Stop()
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-ticker.C // Ожидание следующего тика
fmt.Println("Обработка запроса", req)
}
}
Реальный сценарий: Шлюз API, ограничивающий количество запросов, которые пользователь может сделать за определённый период.
Заключение
Шаблоны конкурентности в Go необходимы для создания эффективных и масштабируемых приложений. Освоение этих шаблонов позволит вам эффективно управлять конкурентностью, оптимизировать использование ресурсов и повысить производительность ваших приложений.
M0rdecay
Какой-то неполноценный гайд, семафор есть в виде конкретной реализации с весами - https://pkg.go.dev/golang.org/x/sync/semaphore, равно как есть готовый рейт-лимитер с управлением всплесками и прочим - https://pkg.go.dev/golang.org/x/time/rate
В реальности без burst-control в RL и без весов в семафоре почти не обходится