В процессе разработки микросервисных приложений часто необходимо наладить эффективную и быструю коммуникацию между сервисами. Разработанный Google gRPC предоставляет высокопроизводительный фреймворк для организации такого взаимодействия. Однако стандартные балансировщики нагрузки в gRPC не всегда удовлетворяют специфическим требованиям, особенно когда требуется приоритизация адресов для минимизации сетевых задержек и обеспечения отказоустойчивости.

В этой статье я поделюсь опытом создания кастомного балансировщика нагрузки на Go для gRPC, который использует приоритеты адресов для выбора наилучшего соединения. Это решение позволяет гибко управлять распределением клиентских запросов между серверами с разными уровнями доступности и обеспечивает подключение к оптимальному ЦОД с минимальными задержками.

Постановка задачи

При разработке одного из проектов VK Tech мне потребовалось реализовать балансировщик, который выбирает первый доступный адрес из приоритетного списка. Приоритеты адресов определяются порядком в конфигурационном файле: чем выше адрес в списке, тем выше его приоритет. В случае недоступности адреса с наивысшим приоритетом балансировщик должен автоматически переключаться на следующий доступный адрес по приоритету.

Требования к балансировщику:

  • Приоритизация адресов: выбор адреса с наивысшим приоритетом из списка.

  • Отказоустойчивость: автоматическое переключение на следующий адрес при недоступности текущего.

  • Минимизация задержек: подключение к ближайшему или наиболее оптимальному ЦОД.

Почему стандартные балансировщики не подходят

Стандартные балансировщики в gRPC, такие как round-robin (циклический) и pick-first («первый доступный»), не учитывают приоритизацию адресов в списке.

Round-robin равномерно распределяет запросы между всеми доступными серверами, что может привести к увеличению сетевых задержек, если некоторые серверы географически удалены или менее производительны.

Pick-first всегда выбирает первый доступный адрес, но не переключается на адреса с более высоким приоритетом, если они становятся доступными после первоначального подключения.

Таким образом, для решения задачи минимизации задержек и обеспечения гибкости подключения к различным ЦОДам стандартные балансировщики не подходят.

Основная идея кастомного балансировщика

Наш кастомный балансировщик использует приоритизацию адресов, заданную в конфигурационном файле, для выбора наилучшего соединения.

  • Порядок адресов: адреса упорядочены по приоритету; индекс 0 — наивысший приоритет.

  • Выбор соединения: всегда выбирается первое доступное соединение с наивысшим приоритетом.

  • Автоматическое переключение: при недоступности текущего соединения балансировщик переключается на следующий по приоритету.

Преимущества такого подхода:

  • Минимизация сетевых задержек.

  • Повышенная отказоустойчивость.

  • Гибкость настройки.

Обзор архитектуры решения

Перед тем как перейти к реализации, рассмотрим основные компоненты нашего балансировщика и их взаимодействие.

BalancerBuilder

Балансировщик в gRPC создаётся с помощью билдера. Наш BalancerBuilder регистрирует балансировщик с определённым именем и схемой, чтобы gRPC-клиент мог его использовать.

type BalancerBuilder struct{}

 func (b BalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
 	return &Balancer{
     	cc:   	cc,
     	subConns: resolver.NewAddressMap(),
     	scStates: make(map[balancer.SubConn]connectivity.State),
     	csEvltr:  &balancer.ConnectivityStateEvaluator{},
     	state:	connectivity.Connecting,
 	}
 }

 func (b BalancerBuilder) Name() string { return balancerName }

 func init() {
 	balancer.Register(&BalancerBuilder{})
 }

Основные задачи билдера:

  • Создание и инициализация балансировщика.

  • Настройка взаимодействия с ClientConn.

  • Регистрация балансировщика для использования клиентом.

Resolver

Резолвер предоставляет балансировщику список адресов с их приоритетами. Он преобразует адреса из конфигурационного файла в resolver.Address, присваивая каждому адресу атрибут index, соответствующий его приоритету.

type resolverBuilder struct {
 	addresses []resolver.Address
 }

 func (b *resolverBuilder) Build(
 	target resolver.Target,
 	clientConn resolver.ClientConn,
 	_ resolver.BuildOptions,
 ) (resolver.Resolver, error) {
 	ctx, cancel := context.WithCancel(context.Background())

 	res := &fiResolver{
     	ctx:        	ctx,
     	cancel:     	cancel,
     	target:     	target,
     	cc:         	clientConn,
     	addressesStore: b.addresses,
 	}

 	if len(b.addresses) > 1 {
     	res.serviceConfig = clientConn.ParseServiceConfig(defaultConfig)
 	}

 	go res.start()

 	return res, nil
 }

 func (*resolverBuilder) Scheme() string {
 	return scheme
 }

 func initResolver(addresses []string) {
 	addressesStore := make([]resolver.Address, len(addresses))
 	for i, addr := range addresses {
     	addressesStore[i] = resolver.Address{
         	Addr:   	addr,
         	Attributes: attributes.New("index", i),
     	}
 	}

     resolver.Register(&resolverBuilder{addresses: addressesStore})
 }

Функции резолвера:

  • Динамическое обновление адресов.

  • Предоставление адресов с приоритетами балансировщику.

  • Сообщение об ошибках в случае недоступности адресов.

Picker

Picker выбирает соединение с наименьшим индексом (наивысшим приоритетом) из доступных. Если соединение с более высоким приоритетом становится доступным, балансировщик автоматически переключается на него.

type firstIdxPicker struct {
 	result balancer.PickResult
 	err    error
 }

 func (p *firstIdxPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
 	return p.result, p.err
 }

 func NewFIPicker(info base.PickerBuildInfo) balancer.Picker {
 	if len(info.ReadySCs) == 0 {
     	return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
 	}

 	minIdx := math.MaxInt
 	var selectedConn balancer.SubConn

 	for sc, scInfo := range info.ReadySCs {
     	idx, ok := scInfo.Address.Attributes.Value("index").(int) // <- наш простенький алгоритм определения оптимального соединения
     	if ok && idx < minIdx {
         	minIdx = idx
         	selectedConn = sc
    	 }
 	}

 	if selectedConn != nil {
     	return &firstIdxPicker{result: balancer.PickResult{SubConn: selectedConn}}
 	}

 	return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
 }

Алгоритм выбора:

  1. Проходит по всем готовым соединениям.

  2. Выбирает соединение с наименьшим index.

  3. Возвращает выбранное соединение для обработки запроса.

Balancer

Балансировщик отслеживает состояния соединений и регенерирует Picker при их изменении.

type Balancer struct {
 	cc   	balancer.ClientConn
 	csEvltr  *balancer.ConnectivityStateEvaluator
 	state	connectivity.State

 	subConns *resolver.AddressMap
 	scStates map[balancer.SubConn]connectivity.State
 	picker   balancer.Picker

 	resolverErr error
 	connErr 	error
 }

 func (b *Balancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
 	b.resolverErr = nil

 	addressMap := b.createNewSubConnections(ccs)

 	for _, addr := range b.subConns.Keys() {
     	if _, ok := addressMap.Get(addr); !ok {
         	sci, _ := b.subConns.Get(addr)
         	sc := sci.(balancer.SubConn)
         	sc.Shutdown()
         	b.subConns.Delete(addr)
     	}
 	}

 	if len(ccs.ResolverState.Addresses) == 0 {
         b.ResolverError(errZeroAddresses)
     	return balancer.ErrBadResolverState
 	}

 	b.regeneratePicker()
     b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

 	return nil
 }

 func (b *Balancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
     oldState, ok := b.scStates[subConn]
 	if !ok {
     	return
 	}

 	b.scStates[subConn] = state.ConnectivityState

 	switch state.ConnectivityState {
 	case connectivity.Idle:
     	subConn.Connect()
 	case connectivity.Shutdown:
     	delete(b.scStates, subConn)
 	case connectivity.TransientFailure:
     	b.connErr = state.ConnectionError
 	}

 	b.state = b.csEvltr.RecordTransition(oldState, state.ConnectivityState)

 	if (state.ConnectivityState == connectivity.Ready) != (oldState == connectivity.Ready) || b.state == connectivity.TransientFailure {
     	b.regeneratePicker()
 	}

     b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
 }

 func (b *Balancer) regeneratePicker() {
 	if b.state == connectivity.TransientFailure {
     	b.picker = &firstIdxPicker{err: errors.Join(b.resolverErr, b.connErr)}
     	return
 	}

 	readySCs := make(map[balancer.SubConn]base.SubConnInfo)

 	for _, addr := range b.subConns.Keys() {
     	sci, _ := b.subConns.Get(addr)
     	sc := sci.(balancer.SubConn)
     	if state, ok := b.scStates[sc]; ok && state == connectivity.Ready {
         	readySCs[sc] = base.SubConnInfo{Address: addr}
     	}
 	}

 	b.picker = NewFIPicker(base.PickerBuildInfo{ReadySCs: readySCs})
 }

Отслеживание состояний соединений:

  • UpdateClientConnState: создание новых и удаление неактуальных соединений.

  • UpdateSubConnState: обновление состояний существующих соединений.

  • regeneratePicker: обновление пикера при изменении состояний для выбора оптимального соединения.

Настройка и конфигурация

Для использования кастомного балансировщика необходимо определить его имя и схему, а также настроить подключение.

const (
 	scheme    	= "scheme-name"
 	balancerName  = "pick_idx_first"
 	defaultConfig = `{"loadBalancingConfig": [{"pick_idx_first": {}}]}`
 	retryTimeout  = time.Millisecond * 100
 	maxRetries	= 10
 )

 type ConnOptions struct {
 	Addrs []string
 	Opts  []grpc.DialOption
 }

 func NewConn(ctx context.Context, connOptions ConnOptions) (*grpc.ClientConn, error) {
 	conn, err := dialContext(ctx, connOptions.Addrs, connOptions.Opts...)
 	if err != nil {
     	return nil, fmt.Errorf("unable to initialize conn: %w", err)
 	}

 	return conn, nil
 }

 func dialContext(ctx context.Context, addresses []string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {

 	...
 	opts = append(opts,
         grpc.WithDefaultServiceConfig(defaultConfig),
     	grpc.WithStreamInterceptor(
         	retry.StreamClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
     	),
     	grpc.WithUnaryInterceptor(
             retry.UnaryClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
     	),
 	)
 	...

 	initResolver(addresses)

 	return grpc.DialContext(ctx, fmt.Sprintf("%s:///", scheme), opts...)

Параметры подключения:

  • scheme и balancerName: определяют кастомный балансировщик.

  • defaultConfig: задаёт конфигурацию балансировки.

  • Интерсепторы: добавлены для повторных подключений при кратковременных сбоях. Я использовал пакет github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry.

Тестирование и результаты

В процессе тестирования балансировщик показал стабильную работу при переключении между адресами в случае недоступности сервера с более высоким приоритетом. Задержки были минимизированы благодаря приоритетному подключению к ближайшему ЦОДу.

Заключение

Создание кастомного gRPC-балансировщика с приоритизацией адресов позволяет более точно контролировать распределение клиентских запросов и улучшить производительность приложения. Такое решение обеспечивает гибкость настройки, минимизацию сетевых задержек и повышенную отказоустойчивость, что особенно важно в современных микросервисных архитектурах.

Преимущества кастомного решения:

  • Гибкость: настройка приоритетов адресов.

  • Эффективность: минимизация задержек за счёт выбора оптимального соединения.

  • Отказоустойчивость: автоматическое переключение при недоступности сервера.

Перспективы развития:

  • Динамическое обновление приоритетов.

  • Интеграция с сервисами обнаружения.

  • Расширение логики выбора на основе метрик производительности.

Надеюсь, эта статья поможет вам в создании кастомных решений для ваших gRPC-приложений. 

Ссылки и дополнительные материалы

Комментарии (4)


  1. evgeniy_kudinov
    15.11.2024 15:44

    Доступен ли исходный код в открытом доступе?
    В fiResolver хранится контекст, хотя рекомендуется не сохранять его в структуре. Однако, если это необходимо, то можно. Как этот контекст используется в логике работы балансировщика?


    1. kuvatovrr Автор
      15.11.2024 15:44

      В открытом доступе исходников нет.
      Соглашусь, контекст в резолвере можно и не хранить и в текущей реализации контекст не используется активно внутри методов fiResolver. Он заложен для будущего расширения функционала. Например, если потребуется добавить периодическое обновление адресов или обработку долгих операций, контекст позволит контролировать их выполнение и отменять при необходимости.


  1. AterCattus
    15.11.2024 15:44

    У нас тоже какое-то время было подобное решение. Но потом поняли, что в кубере с несколькими az интереснее балансить еще и с учетом az-локальности, чтобы по возможности гонять трафик в пределах одной зоны (быстрее и дешевле). И по единственному имеющемуся адресу подключения резолвим его IP'шки и все az, считаем веса с учётом близости и знания своего az, и по полученному варианту уже делаем weighted-RR.


    1. kuvatovrr Автор
      15.11.2024 15:44

      В нашем случае мы работаем с инфраструктурой, распределенной по нескольким ЦОДам, но без использования Kubernetes и его возможностей по обнаружению и учету зон доступности. Поэтому мы сосредоточились на статической приоритизации адресов через конфигурационные файлы, что подходило для наших текущих потребностей.