Конкурентность — одна из самых мощных возможностей Go, и её освоение критически важно для создания масштабируемых и эффективных приложений. В этой статье мы рассмотрим 7 шаблонов конкурентности в Go, которые помогут вам писать надёжный код.

1. Пул воркеров

Описание: Пул воркеров создаёт фиксированное количество горутин, которые обрабатывают задачи из общей очереди. Этот шаблон полезен для управления количеством одновременно выполняемых задач и оптимизации использования ресурсов.

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Воркер %d начал задачу %d\n", id, job)
		time.Sleep(time.Second)
		fmt.Printf("Воркер %d завершил задачу %d\n", id, job)
		results <- job * 2
	}
}

func main() {
	const numJobs = 5
	const numWorkers = 3
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
	var wg sync.WaitGroup

	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}

	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	wg.Wait()
	close(results)

	for result := range results {
		fmt.Println("Результат:", result)
	}
}

Реальный сценарий: Веб-сервер, обрабатывающий входящие HTTP-запросы, где каждый запрос обрабатывается воркером из пула.


2. Fan-Out / Fan-In

Описание: Fan-Out запускает несколько горутин для параллельной обработки данных, а Fan-In собирает результаты в единый канал. Этот шаблон полезен для параллельной обработки с последующей агрегацией.

package main

import (
	"fmt"
	"sync"
)

func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 5; i++ {
		ch <- i
		fmt.Printf("Производитель %d создал %d\n", id, i)
	}
}
func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for v := range in {
		out <- v * 2
		fmt.Printf("Потребитель %d обработал %d\n", id, v)
	}
}
func main() {
	numProducers := 2
	numConsumers := 2
	input := make(chan int, 10)
	output := make(chan int, 10)
	var wg sync.WaitGroup
	for i := 1; i <= numProducers; i++ {
		wg.Add(1)
		go producer(i, input, &wg)
	}
	wg.Wait()
	close(input)
	for i := 1; i <= numConsumers; i++ {
		wg.Add(1)
		go consumer(i, input, output, &wg)
	}
	wg.Wait()
	close(output)
	for result := range output {
		fmt.Println("Результат:", result)
	}
}

Реальный сценарий: Конвейер обработки данных, где разные этапы выполняются разными наборами воркеров.


3. Пайплайн

Описание: Пайплайн объединяет несколько этапов обработки, где каждый этап выполняет преобразование данных и передаёт их следующему этапу. Подходит для последовательной обработки данных.

package main

import "fmt"

func stage1(nums []int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

func stage2(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * 2
		}
		close(out)
	}()
	return out
}

func stage3(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n + 1
		}
		close(out)
	}()
	return out
}

func main() {
	nums := []int{1, 2, 3, 4, 5}
	c1 := stage1(nums)
	c2 := stage2(c1)
	c3 := stage3(c2)
	for result := range c3 {
		fmt.Println(result)
	}
}

Реальный сценарий: Система обработки изображений, где изображение проходит через этапы масштабирования, фильтрации и кодирования.


4. Публикация-Подписка

Описание: Шаблон "Публикация-Подписка" позволяет публиковать сообщения для нескольких подписчиков. Полезен в системах, где разные сервисы должны независимо реагировать на события.

package main

import (
	"fmt"
	"sync"
	"time"
)

type PubSub struct {
	mu       sync.Mutex
	channels map[string][]chan string
}

func NewPubSub() *PubSub {
	return &PubSub{
		channels: make(map[string][]chan string),
	}
}

func (ps *PubSub) Subscribe(topic string) <-chan string {
	ch := make(chan string)
	ps.mu.Lock()
	ps.channels[topic] = append(ps.channels[topic], ch)
	ps.mu.Unlock()
	return ch
}

func (ps *PubSub) Publish(topic, msg string) {
	ps.mu.Lock()
	for _, ch := range ps.channels[topic] {
		ch <- msg
	}
	ps.mu.Unlock()
}

func (ps *PubSub) Close(topic string) {
	ps.mu.Lock()
	for _, ch := range ps.channels[topic] {
		close(ch)
	}
	ps.mu.Unlock()
}

func main() {
	ps := NewPubSub()

	subscriber1 := ps.Subscribe("news")
	subscriber2 := ps.Subscribe("news")

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		for msg := range subscriber1 {
			fmt.Println("Подписчик 1 получил:", msg)
		}
	}()

	go func() {
		defer wg.Done()
		for msg := range subscriber2 {
			fmt.Println("Подписчик 2 получил:", msg)
		}
	}()

	ps.Publish("news", "Срочные новости!")
	ps.Publish("news", "Ещё новости!")

	time.Sleep(time.Second)
	ps.Close("news")
	wg.Wait()
}

Реальный сценарий: Система обмена сообщениями, где сервисы подписываются на определённые типы событий.


5. Select с таймаутом

Описание: Использование select с таймаутом позволяет избежать бесконечных блокировок. Полезно, когда нужно выполнить действие или прервать операцию, если она занимает слишком много времени.

package main

import (
	"fmt"
	"time"
)

func main() {
	c := make(chan string)

	go func() {
		time.Sleep(2 * time.Second)
		c <- "результат"
	}()

	select {
	case res := <-c:
		fmt.Println("Получено:", res)
	case <-time.After(1 * time.Second):
		fmt.Println("Таймаут")
	}
}

Реальный сценарий: Сетевой клиент, который пытается подключиться к серверу и останавливается, если сервер не отвечает вовремя.


6. Семафор

Описание: Семафор ограничивает количество горутин, которые могут одновременно обращаться к ресурсу. Полезен для управления конкурентностью и предотвращения перегрузки ресурсов.

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()
	sem <- struct{}{} // Захват семафора
	fmt.Printf("Воркер %d начал\n", id)
	time.Sleep(time.Second)
	fmt.Printf("Воркер %d завершил\n", id)
	<-sem // Освобождение семафора
}

func main() {
	const numWorkers = 5
	const maxConcurrent = 2
	sem := make(chan struct{}, maxConcurrent)
	var wg sync.WaitGroup

	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, sem, &wg)
	}

	wg.Wait()
}

Реальный сценарий: Пул подключений к базе данных, где одновременно допускается ограниченное количество подключений.


7. Ограничение частоты

Описание: Ограничение частоты управляет скоростью обработки событий с помощью тикера. Полезно, когда нужно контролировать частоту выполнения задач, например, запросов к API.

package main

import (
	"fmt"
	"time"
)

func main() {
	rate := time.Second
	ticker := time.NewTicker(rate)
	defer ticker.Stop()

	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)

	for req := range requests {
		<-ticker.C // Ожидание следующего тика
		fmt.Println("Обработка запроса", req)
	}
}

Реальный сценарий: Шлюз API, ограничивающий количество запросов, которые пользователь может сделать за определённый период.

Заключение

Шаблоны конкурентности в Go необходимы для создания эффективных и масштабируемых приложений. Освоение этих шаблонов позволит вам эффективно управлять конкурентностью, оптимизировать использование ресурсов и повысить производительность ваших приложений.

Комментарии (1)


  1. M0rdecay
    19.06.2025 10:01

    Какой-то неполноценный гайд, семафор есть в виде конкретной реализации с весами - https://pkg.go.dev/golang.org/x/sync/semaphore, равно как есть готовый рейт-лимитер с управлением всплесками и прочим - https://pkg.go.dev/golang.org/x/time/rate

    В реальности без burst-control в RL и без весов в семафоре почти не обходится