Решаем задачи на Go без внешних зависимостей

Ловили ли вы себя когда-нибудь на мысли, что можно задействовать конкретный инструмент по-другому и решить сложную задачу, максимально задействуя существующие возможности? С меньшим количеством внешних зависимостей, будь то кеши, очереди или БД

Это статья нашего спикера Виталия Лихачёва, SRE в Booking.com, цель которой — показать какие задачи можно достаточно легко решать на go, по максимуму используя горутины/каналы/блокировки.

Попробуем с помощью Go решить одну не самую тривиальную задачу:  

Условия задачи 1: повысить SLA

Нужно превратить «ненадежную» систему в «надежную» или повысить SLA. Есть внешняя система X, стабильность ответов которой под вопросом. Это может быть вызвано разными причинами, которые здесь мы не рассматриваем. 

Все что мы знаем: любой запрос в X может отвалиться с ошибкой и единственный способ решить это — сделать retry.

Конечно, можно в своем сервисе на вашем любимом языке подключить готовую библиотеку, реализующую повторные запросы, но есть важный нюанс: клиентов, желающих сходить во внешнюю систему, больше одного.

В реальном проекте это еще может осложняться тем, что и написаны они на разных технологиях. И везде придется вносить правки, контролировать настройки ретраев и т.д.

Задачу можно легко расширить до уровня, что есть внешняя система, которая требует определенных рейт лимитов запросов, и нам хочется, чтобы все клиенты, сколько бы их ни было, соблюдали этот рейт лимит. Более того: делая кастомное решение мы сможем сами приоритизировать какой клиент достоин в этот момент времени получить приоритет в обслуживании. А если мы еще сможем кешировать эти ответы, то мы точно в выигрыше, потому что не придется тратить рейт лимит лишний раз. Однако помним про сложность инвалидации кешей и не будем это здесь рассматривать.

Наши клиенты должны быть готовы к повышенным таймаутам ответа, потому что мы будем делать ретраи в нашем сервисе.

По сути мы пишем сервис прокси, которые позволяет скрыть большой процент ошибок при запросах куда-то наружу.

В данной задаче мы предполагаем, что для достижения большей надежности нам нужно собирать батчи клиентских запросов и отправлять их как единое целое. Таким образом мы экономим сетевые коннекты, не создаем слишком много маленьких запросов в и без того не очень хорошо работающую систему X. Из минусов, конечно же, что в случае ошибок придется ретраить весь батч запросов.

Решение

Далее будет представлен скелет решения на golang, который в паре сотен строк кода реализует всю логику батчинга.

Обратите внимание, что как только наш внутренний клиент отправляет запрос в прокси-сервис, мы не начинаем сразу же обращаться во внешнюю систему. Это ключевой момент здесь — как собирать батчи при условии, что каждый запрос каждого клиент независим друг от друга.

Если перефразировать, то нам нужно получить несколько одновременных TCP подключений (http-запросов), заставить их подождать желаемое время, чтобы набрать критическую массу (собрать батч), отправить запрос (потенциально сделав ретраи) и всем сразу ответить.

Посмотрим на часть кода в main.go

package main

import (
	...
)

func main() {
	...
	var wg sync.WaitGroup
	wg.Add(numWorkers)
	batchWorker := worker.NewBatchWorker(ctx, logger, &wg, numWorkers, batchSize)

	http.HandleFunc("/process", process(batchWorker))

	logger.Info(fmt.Sprintf("Start listening at %s", srv.Addr))
	if err := srv.ListenAndServe(); err != http.ErrServerClosed {
		logger.Error("HTTP server Shutdown", "error", err)
		cancel()
	}

	wg.Wait()
}

Сердце всей обработки находится в batchWorker , его рассмотрим позже, но уже можем сказать, что внутри мы реализуем worker pool.

Здесь мы просто создаем WaitGroup и сообщаем ей количество numWorkers .

Это позволит внутри batchWorker реализовать graceful shutdown для каждого воркера.

worker pool позволит делегировать обработку каждого нового батча запросов конкретному воркеру, при этом не создавая постоянно новые горутины. Под делегированием подразумевается, что worker 1 будет накапливать N-е количество запросов от клиентов и при накоплении достаточного количества выполнять весь батч, а пока идет выполнение в работу будет вступать worker 2, затем worker 3 и т.д.

Больше здесь ничего принципиально интересного нет, мы для URL /process назначаем обработчик, который тоже рассмотрим.

Обратите внимание, что batchWorker передается как замыкание, а функция process возвращает анонимную функцию, которая уже и является фактическим обработчиком.

func process(batchWorker *worker.BatchWorker) func(w http.ResponseWriter, req *http.Request) {
	return func(w http.ResponseWriter, req *http.Request) {
		var task models.Request
		err := json.NewDecoder(req.Body).Decode(&task)
		if err != nil {
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}

		w.Header().Set("Content-Type", "application/json")

		ch, err := batchWorker.Enqueue(&worker.Task{ID: task.Id})
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			json.NewEncoder(w).Encode(models.ProcessResponseError{Error: err.Error()})
			return
		}

		result := <-ch

		if result.Error != nil {
			w.WriteHeader(http.StatusInternalServerError)
			json.NewEncoder(w).Encode(models.ProcessResponseError{Error: result.Error.Error()})
			return
		}

		response := fmt.Sprintf("Request with id %d processed with result %s", task.Id, result.Response)
		json.NewEncoder(w).Encode(models.ProcessResponse{Response: response})
	}
}

В этом коде в основном видим типичный утилитарный код, который парсит запросы, формирует ответы и т.д.

Главное, на чем мы здесь остановимся:

ch, err := batchWorker.Enqueue(&worker.Task{ID: task.Id})

batchWorker содержит метод Enqueue , который возвращает канал и опциональную ошибку.

Именно на основе возврата канала мы сможем заставить клиентов ждать ответа, не заставляя их делать переопрос готовности ответа. Если бы мы писали такое решение максимально просто, то как бы оно могло выглядеть?

Создаем в сервисе API /submit, куда отправляем запрос. В ответ получаем ID запроса. Так же создаем API /result, где мы по итогу получим ответ. submit мог бы складывать задачи в очередь, некий фоновый воркер (или пачка воркеров, что гораздо лучше) выбирали бы себе эти задачи, выполняли, складывали в кеш, где ключом был бы id запроса. Итого: добавили воркеры, добавили кеш — новые зависимости.

Тут внимательный читатель сможет поспорить, что на php/java/python/etc. тоже можно написать однопроцессное приложение, которое сможет реализовать похожую логику. И действительно сможет. Но будет ли возможность выжать из этого максимум производительности и не закопаться в синхронизации потоков, будь то легкие или тяжелые потоки (грин треды vs полноценные потоки)? Опытный разработчик на языке X конечно справится с такой задачей :) Но будет ли удобно дальше развивать такой сервис?

Теперь перейдем к самому интересному — реализация batchWorker .

Сначала посмотрим реализацию отдельного воркера в рамках worker pool.

type worker struct {
	logger *slog.Logger
	ID     int
	queue  chan *queueItem
	mu     *sync.Mutex
}

func newWorker(logger *slog.Logger, id int, batchSize int64) *worker {
	queue := make(chan *queueItem, batchSize)
	return &worker{
		logger: logger,
		ID:     id,
		queue:  queue,
		mu:     &sync.Mutex{},
	}
}

func (w *worker) start(ctx context.Context, wg *sync.WaitGroup, batchSize int) {
	defer wg.Done()

	var queueItems []*queueItem
	var queueItem *queueItem
	timer := time.NewTicker(time.Second)
	defer timer.Stop()

	for {
		select {
		case <-ctx.Done():
			w.withLock(func() {
				w.processBatch(queueItems)
				queueItems = nil
			})
			return
		case queueItem = <-w.queue:
			w.logger.Debug("worker got task", slog.Int("workerID", w.ID), slog.Int("taskID", queueItem.task.ID))
			w.withLock(func() {
				queueItems = append(queueItems, queueItem)
				if len(queueItems) >= batchSize {
					w.processBatch(queueItems)
					queueItems = nil
				}
			})
		case <-timer.C:
			w.withLock(func() {
				w.processBatch(queueItems)
				queueItems = nil
			})
		}
	}
}

func (w *worker) withLock(f func()) {
	w.mu.Lock()
	defer w.mu.Unlock()
	f()
}

func (w *worker) TrySend(task *Task) (<-chan Result, bool) {
	if !w.mu.TryLock() {
		return nil, false
	}
	defer w.mu.Unlock()

	ch := make(chan Result)
	w.queue <- &queueItem{task: *task, ch: ch}

	return ch, true
}

func (w *worker) processBatch(queueItems []*queueItem) {
	if len(queueItems) == 0 {
		return
	}

	w.logger.Info("worker processing batch", slog.Int("workerID", w.ID), slog.Int("batchSize", len(queueItems)))
	time.Sleep(time.Second * 5)
	w.logger.Info("worker processed batch", slog.Int("workerID", w.ID), slog.Int("batchSize", len(queueItems)))
	// do actual http request
	for _, item := range queueItems {
		item.ch <- Result{
			Response: fmt.Sprintf("Result of task %d", item.task.ID),
			Error:    nil,
		}
		close(item.ch)
	}
}

newWorker целенаправленно сделан приватным, чтобы его нельзя было использовать снаружи пакета.

У каждого воркера есть своя очередь размером batchSize

queue := make(chan *queueItem, batchSize)

Метод start позволяет запустить сам воркер, где и происходит основная магия.

Обратите внимание, мы передаем контекст и обрабатываем ctx.Done() , обрабатывая возможно накопленный батч здесь. Делая graceful shutdown, который должен дообработать полученные запросы, когда уже пора завершать приложение по сигналу, мы позволяем клиентам не страдать от потенциальной потери ответов от сервиса.

Главное, что здесь надо понять: работа таймера и работа по накоплению батча.

С таймером все просто.

case <-timer.C:
			w.withLock(func() {
				w.processBatch(queueItems)
				queueItems = nil
			})

Если есть элементы в накапливаемом батче, начинаем их обрабатывать.

Затем зануляем батч и начинаем сбор сначала . Напомню, что append для nil слайсов успешно работает.

Обработка по результатам накопления батча происходит здесь

case queueItem = <-w.queue:

И по итогу мы либо по таймеру обработаем столько записей, сколько успели накопить, либо по накоплению batchSize .

Тут может возникнуть вопрос, а как же обрабатываются запросы других клиентов, пока эта горутина занята отправкой запроса, получением ответа и т.д. К этому еще дойдем.

Еще раз посмотрим на бесконечный цикл

for {
		select {
		case <-ctx.Done():
			w.withLock(func() {
				w.processBatch(queueItems)
				queueItems = nil
			})
			return
		case queueItem = <-w.queue:
			w.logger.Debug("worker got task", slog.Int("workerID", w.ID), slog.Int("taskID", queueItem.task.ID))
			w.withLock(func() {
				queueItems = append(queueItems, queueItem)
				if len(queueItems) >= batchSize {
					w.processBatch(queueItems)
					queueItems = nil
				}
			})
		case <-timer.C:
			w.withLock(func() {
				w.processBatch(queueItems)
				queueItems = nil
			})
		}
	}

Здесь есть еще один нюанс, который можно улучшить, если хочется максимально дотошно реализовать логику.

А заключается он в том, что если у нас сервис с не очень большим RPS, то в зависимости от настроек таймера мы будем получить слишком маленькие батчи в запросах.

Например, если на прокси приходит 1 RPS, а batchSize стоит 5, то можем получить большое кол-во одиночных запросов, таким образом не получив тот выигрыш, которые хотели при реализации сервиса. Эти параметры нужно будет подгонять индивидуально.

В данном случае нам бы пришлось поставить таймер на срабатывание раз в 3-5 секунд, чтобы достичь хоть какого-то батчинга. Здесь рекомендую посмотреть условия исходной задачи, чтобы понять, чего мы здесь добиваемся :)

Идем далее. Метод processBatch осознанно упрощен, здесь нет сетевых запросов.

Для демонстрации мы предполагаем, что queueItems здесь — ответ от внешнего API. Напомню, что внешнее API принимает батчевые запросы, поэтому если мы получили успешный ответ, то и данные для всех наших клиентских запросов будут там.

for _, item := range queueItems {
		item.ch <- Result{
			Response: fmt.Sprintf("Processed task %d", item.task.ID),
			Error:    nil,
		}
		close(item.ch)
	}

Здесь мы в качестве ответа пишем в канал структуру Result

Если вернемся к http handler, то здесь идет чтение из этого канала
ch, err := batchWorker.Enqueue(&worker.Task{ID: task.Id})
...
result := <-ch # этот канал, в него записали Result{}

Мы разобрали только работу отдельного воркера, а у нас же целый worker pool!

Так как же все таки балансировать батчи запросов между воркерами?

Небольшое отступление: можно было бы и без воркер пула. Можно единственной горутиной собирать батчи (с учетом необходимости по таймеру тоже делать сброс батчей) и при сборе достаточного кол-ва запросов (либо по таймеру) запускать новую горутину для выполнения. 

Здесь мы осознанно не пошли этим путем, так сложнее контролировать параллелизм — сколько параллельно запросов в каждый момент времени в принципе может идти во внешнюю систему. Да и тюнить воркер пул можно правкой одной константы.

Дальше рассмотрим использование основы решения  batchWorker

type BatchWorker struct {
	workers []worker
	logger  *slog.Logger
}

func NewBatchWorker(ctx context.Context, logger *slog.Logger, wg *sync.WaitGroup, numWorkers int, batchSize int) *BatchWorker {
	var workers []worker
	for i := 1; i <= numWorkers; i++ {
		worker := newWorker(logger, i, int64(batchSize))
		workers = append(workers, *worker)
		go worker.start(ctx, wg, batchSize)
	}
	return &BatchWorker{
		workers: workers,
		logger: logger,
	}
}

func (b *BatchWorker) Enqueue(task *Task) (<-chan Result, error) {
	for _, w := range b.workers {
		if ch, ok := w.TrySend(task); ok {
			b.logger.Info("sent task", slog.Int("taskID", task.ID), slog.Int("workerID", w.ID))
			return ch, nil
		}

		b.logger.Warn("skip busy worker", slog.Int("workerID", w.ID))
	}

	return nil, ErrNotQueued
}

NewBatchWorker экспортируемая функция, доступная вне пакета.В конструкторе (в этой функции) мы создаем нужное кол-во воркеров для worker pool.

Очень внимательный читатель и хороший знаток Go может сказать, что в Go нет конструкторов и будет абсолютно прав. Однако есть негласное правило, что различные функции вида NewSomething, которые создают и инициализируют структуры можно называть конструкторами.

Это, конечно же, не чистый конструктор, у него есть side effect. Он стартует каждую горутину в рамках worker pool.

И последнее и самое важное — метод Enqueue. Здесь пробегаемся по всем воркерам и смотрим возможность отправки методом TrySend 

Берем первый по очереди воркер и пытаемся записать в его очередь новую задачу, которая в итоге попадает в батч.Если воркер занят, то попытка записи не удастся и поскольку у нас есть целая пачка таких воркеров, попробуем загрузить следующий по списку.

На больших RPS это отлично будет работать, потому что батчи очень быстро будут накапливаться. На маленьких RPS мы будем эксплуатировать всегда только первый воркер, это нужно учитывать. На что это влияет? Нам может показаться, что нам хватит 2 воркеров суммарно, но тогда в случае ошибочной оценки неожиданный всплеск запросов мы не сможем обработать. 

Горутины, которые ничего не делают, практически бесплатны в плане ресурсов, поэтому вполне можно поставить 10 воркеров и забыть об этом. Но не забыть про добавление мониторинга загруженности воркеров, про алерты на занятость воркер пула, про отслеживание большого фона ошибок и так далее. Спрашивайте у знакомого SRE :)

Конечно, если все воркеры будут заняты, то мы не сможем обслуживать клиентов, но это исключительно вопрос ожидаемых нагрузок и хорошего мониторинга, чтобы у нас всегда был запас свободных воркеров. Запас проще сделать в данном решении, чем реализовать какую-то отдельную внутреннюю очередь для возможных всплеском нагрузки.

И общее решение. Его очевидно еще можно улучшать и полировать под свои нужды, но базовую задачу оно решает.

https://github.com/make-it-git/go-sample-practical-tasks/tree/main/batch_api

Условия задачи 2: прокси с «умным» кешированием

Мы построили отличный продукт, к нам пришли пользователи и какое-то время все хорошо, но с течением времени мы обнаруживаем деградацию времени ответа некоторых популярных API. 

Для конкретики пусть это будет сервис отслеживания общественного транспорта. Какая же проблема у нас может появиться с ростом клиентской нагрузки?

Приложение/сайт построены таким образом, что раз в 5-10 секунд они приходят с запросом о положении транспорта в определенной области. Наверное у нас будет всплески нагрузки утром и вечером в будние дни, когда многие пользователи проверяют наличие своего автобуса. Проблема в том, что данные быстро меняющиеся, исходный сервис был написан на php и до поры до времени мы справлялись с нагрузкой. 

Решение: прокси с «умным» кешированием. Дотошный читатель скажет, что пример надуманный и легко бы переписали весь исходный сервис на go. Дональд Кнут ему судья.

Решение

Мы конечно же все микросервисные, API gateway на месте и все что нам нужно, написать сервис, реализующий нужное API.

А как же снизить нагрузку? Мы заранее знаем, в каких географических областях будут высокие потоки запросов. По таким областям в нашем сервисе мы можем в фоне ходить в сервис источник данных для карты и у себя эти данные кешировать.

В случае падения исходного сервиса у нас будет пара десятков секунд, пока зависимость поднимается.

А алгоритм кеширования довольно тривиальный:

  1. Захардкодили области в коде/сконфигурировали области через конфиг/добавили БД с настройками областей и API для управления/нарисовали фронтенд для управления этим - выберите что нравится и что укладывается в ваши потребности.

  2. Одной горутиной (или несколькими) ходим в наше нагруженное API, постоянно обновляя данные по областям.

  3. Принимаем запрос в наш гошный сервис, из кеша в памяти достаем требуемые точки и отдаем в ответе. Тут будет немного сложнее, потому что придется самостоятельно реализовать логику отсечения лишнего и поиск нужных квадратов/ромбов (смотря как храним в памяти области).

Что по итогу? Если у нас было 1000 RPS по, скажем, квадрату 10x10 км, то при условии, что мы храним в кеше квадраты 1x1 км, исходный сервис будет получать всего 10 rps при использовании одной горутины обновления кеша и ответе исходного сервиса за 0.1 секунды (это без нагрузки, с нагрузкой он деградирует ?)

Как посчитали?

100 квадратов в кеше ⇒ 100 запросов на полное обновление кеша ⇒ 10 запросов в секунду укладывается.

Нормально ли получать на 10 секунд устаревшие данные? В данном случае да. Приложение вполне может интерполировать движение транспорта и предсказывать его положение до получения следующей точки от API.

Само решение здесь приводить не будем, оно относительно тривиальное, главное не забывать про конкурентный доступ к кешу.

А что же делать, если у нас более одного инстанса такого прокси сервиса?

Есть 2 стула: либо каждый инстанс делает такой кеш в памяти и для этой задачи это вполне будет работать, потому что 3-5 инстансов не создадут большую нагрузку на нижележащий сервис. Либо кешировать в отдельно стоящем кеше, но это уже за рамками статьи.

А почему решение относительно тривиальное?

Самая сложная логика в определении правильных квадратиков из кеша. Как было в примере — берем области 1x1 км, тогда ключом можем взять какой-то из углов квадрата (latitude, longitude с округлением до N знаков после запятой).

Все остальное: запустить нужное количество горутин для фонового постоянного обновления данных — задача уровня джуна на собеседовании по go.

Комментарии (0)