Finagle — библиотека от разработчиков Twitter. Ее используют для организации межсервисного RPC и построения mesh-микросервисной архитектуры. У библиотеки богатая функциональность по тонкой настройке и интроспекции, но сегодня мы рассмотрим клиентскую балансировку.

Задача

Finagle имеет слоистую архитектуру, и один из слоев отвечает за балансировку. Можно использовать имеющиеся слои или реализовать свой. Рассмотрим балансировщики, которые работают из коробки. 

Round robin. Тут все прозрачно: доступные хосты выбираются по очереди, запросы делаются равномерно. Эта реализация не учитывает ничего, кроме доступности хостов.

Heap. В алгоритме берется первое доступное соединение сверху.

Power of two choices. Эта реализация старается оптимизировать распределение. На каждый запрос случайным образом выбирается два хоста из доступных, затем выбирается один на основе некоторой метрики. Есть несколько модификаций: стандартная, где учитывается загруженность, и EWMA, где в качестве метрики используется RTT.

Aperture. Этот вид балансировки — по факту развитие P2C для кластеров большого размера.

Давайте рассмотрим, какую проблему мы пытаемся решить. Большое количество хостов приводит к большому количеству соединений, когда мы обращаемся ко всем хостам сразу. Установление каждого нового соединения требует времени, и это приводит к долгому холодному старту. А если учесть, что кандидаты выбираются случайно, утилизация каждого соединения будет падать.

В целом получается не очень эффективно. К тому же большое количество соединений — это дополнительная нагрузка на ОС. А если вспомнить, что Finagle разрабатывает Twitter, можно представить масштаб проблемы.

Чтобы сократить количество соединений, разработчики придумали этот балансировщик. В нем процесс выбора унаследован от P2C, а набор, из которого выбирают, модифицирован. В отличие от остальных, этот алгоритм берет не все хосты, а некоторое случайное подмножество, и обращение идет только к ним. Когда нагрузка возрастает и выбранного набора не хватает, происходит увеличение, а при спаде нагрузки — сжатие. Границы можно сконфигурировать. Количество одновременно установленных соединений ограничено, и соединения утилизируются эффективнее.

У алгоритма Aperture два режима работы:

  • random aperture;

  • deterministic aperture.

Думаю, названия говорят сами за себя. В первом случае мы выбираем случайный набор, во втором — следуем определенным правилам. Для этого каждому экземпляру присваивается идентификатор, который используется для вычисления группы хостов для клиентов. Каждый клиент с одинаковым идентификатором получает одинаковый набор хостов, который не пересекается с другими. Если не указывать идентификатор, считается, что кластер состоит из одного клиента, и все клиенты получают весь набор хостов.

Более подробно это описано в документации.

Сравнение алгоритмов

Для демонстрации работы каждого алгоритма можно провести синтетический тест. В рамках теста создадим S серверов, каждый из которых будет отвечать с задержкой 100 мс × Sid. С ростом порядкового номера растет задержка. После этого выполним N запросов с concurrency = S. В результате получим распределение запросов в зависимости от используемого алгоритма. Код примера доступен на Github.

Из результатов видно, что RR распределил все поровну, как и указано в его описании, а также оказался самым долгим по времени выполнения теста. Heap основной трафик направил на первый сервер, так как у него наименьшая задержка. При этом остальные остались не нагруженными, что тоже нехорошо, так как может привести к перегрузке отдельных узлов, вызывать троттлинг, ускоренный износ железа и другие эффекты. 

Остальные имеют примерно одинаковое время работы, различается лишь распределение: в целом оно отражает быстродействие отдельных узлов, но при этом все загружены более равномерно. Однако для Aperture, как видно из случая с 30 хостами, задействована лишь часть. Правда, на времени это почти не сказалось.

В своих конфигурациях мы используем P2C-балансировщик, но смотрим на Aperture, так как число хостов достигло 200 штук. Мы рассмотрели коробочные реализации алгоритмов, но можно реализовать свои, комбинируя с имеющимися.

Service Discovery

Распределять запросы, конечно, хорошо, но как же балансировщик получает список хостов, куда нужно их отправить? Эту задачу выполняет механизм разрешения имен в Finagle. Имена — это то, что мы указываем в аргументе dest при создании сервиса. За процесс отвечают два механизма:

  • Resolver — выполняет преобразование некоторого имени в конечный адрес, причем не обязательно IP-адрес ;)

  • Delegation Tables — выполняет преобразование пути согласно заданным правилам для получения конечного адреса через цепочку Namer => Resolver — но может быть и только Namer.

Какие задачи это позволяет решать:

  1. Динамическое изменение хостов на клиенте без изменения конфигурации, без перезапуска и других операций, которые могут вызвать простой сервиса.

  2. Распределение запросов по нескольким направлениям равномерно, с возможностью fallback на стандартное направление. Это позволяет реализовать canary deployment с быстрым откатом в случае проблем.

  3. Реализация health check на уровне клиента, возможность убирать и добавлять хосты для балансировки. Избыточно, но почему бы и нет.

  4. Перенаправление трафика точечно для тестирования новой функциональности. Тестировщики, проставив дополнительные параметры, могут перенаправить трафик только для своего запроса адреса с релизных или стабильных инстансов на тестовые. Также можно включать фичи и новые версии бэкендов только для части клиентов.

А если мы хотим локализовать трафик в рамках DC, да еще и с fallback на остальные? Или попасть запросом в конкретную ноду? Эти и другие классные штуки рассмотрим ниже.

Addr

Addr — это ADT, которая описывает состояния получения адреса:

  • Bounded — адрес найден, и с ним связаны конечное назначение и метаданные;

  • Pending — в процессе;

  • Failed — произошла ошибка в процессе определения;

  • Neg — адрес не может быть найден.

Конечное назначение — это тоже ADT, и оно представлено тремя состояниями:

  • Inet — тут все понятно. Храним конечный IP-адрес, порт.

  • Failed — что-то пошло не так.

  • ServiceFactory — самое неожиданное состояние. В качестве адреса назначения может выступать любой экземпляр Service.

Для Inet и ServiceFactory дополнительно могут быть указаны метаданные, которые могут быть использованы при балансировке запросов. Один из видов метаданных — вес адреса, который позволяет приоритизировать отдельные хосты. ServiceFactory же можно использовать очень неожиданными способами.

Resolver

Как я уже упомянул, Resolver выполняет преобразование имени в адрес. Имя имеет формат scheme!arg. Например, когда при создании сервиса мы пишем «tinkoff.ru:443», это на самом деле упрощенная запись «inet!tinkoff.ru:443». В этом случае вызывается Resolver, который отвечает за схему inet. Оставшаяся часть передается ему как аргумент. В ответ получаем Var[Addr] — реактивный объект с push-семантикой. Этот объект передается в балансировщик, который подписывается на изменения и перестраивает список доступных для запроса хостов.

Вот и вся логика — довольно просто и прозрачно. Мы можем создать Var, который будет асинхронно обновляться, меняя доступные хосты. Например, встроенный Resolver inet каждые пять секунд опрашивает DNS и получает актуальное состояние записей. Если что-то изменится, наши клиенты сразу же узнают об этом. В то же время есть Resolver fixedinet, который выполняет запрос к DNS лишь один раз.

class SimpleResolver extends Resolver {
  override val scheme: String = "simple"
  override def bind(arg: String): Var[Addr] = {
    val simpleService = Service.mk[Request, Response] { req =>
      println(s"Simple Request [$arg]: ${req.contentString}")
      Future.value(Response(req))
    }
    Var.value(Addr.Bound(Address.ServiceFactory(ServiceFactory.const(simpleService))))
  }
}

Delegation Tables (Dtab)

Если Resolver простой и прямолинейный, то с Dtab не так все просто. Этот компонент предлагает механизм разрешения имен на основе правил, которые применяются последовательно до получения не пустого результата (ошибки или пустоты), если все варианты перепробованы. Да, именно перепробованы: Dtab поддерживает fallback и разделение трафика. Его используют, если имя начинается с «/», то есть минимальная запись может иметь вид /x.

Далее к ней в обратном порядке применяются правила путем простой замены префикса a => b. Например:

/s## => /!/inet
/s# => /s##/prod-fallback.host.tld
/s# => /s##/prod.host.tld
/s => /s#

Тогда при создании сервиса имя вида /s/8080 будет развернуто в /!/inet/prod.host.tld/8080 — что эквивалентно inet!prod.host.tld:8080. Если это не получится сделать из-за отсутствия хоста, клиент попытается пойти на inet!prod-fallback.host.tld:8080.

За разрешение имен также отвечает Namer. Его нужно указать в системном пути. Системный путь имеет формат /" class="formula inline">/namer/path, где namer — это FQN класса, реализующего интерфейс Namer. Его задача — преобразовать path в другой path или вернуть конечный узел с Var[Addr]. Это можно выполнить через Resolver с необходимыми параметрами или сформировать на месте.

Использование

Как же вся эта машинерия используется? Resolver можно вызвать напрямую через метод MyResolver.bind или зарегистрировать его в системе. Тогда им можно будет воспользоваться при создании любого сервиса. 

Если с прямым вызовом все понятно, то у регистрации своя специфика. Finagle использует механизм ServiceLoader через рефлексию: в нем FQN реализаций ищутся в ресурсах в папке META-INF/services. В папке создается файл с именем необходимого интерфейса, и в нем указывается FQN класса реализации. При старте приложение собирает все такие сервисы и регистрирует их. Второй способ более явный, но требует наследования от класса com.twitter.app.App. В этом случае переопределяется переменная loadServiceBindings, в которой можно указать возможные реализации для интерфейсов. Это должно быть выполнено строго до первого обращения к Finagle, иначе мы получим ошибку на старте. Также второй способ, в отличие от первого, позволяет создать реализацию с дополнительными параметрами.

Dtab требует инициализации для работы. Правила делятся на два уровня — глобальные и локальные, и указывать их нужно отдельно. Так, глобальное правило инициализируется путем присваивания в глобальную переменную Dtab.base. Да, не ФП, но как есть.

Dtab.base = Dtab.read(
  """
    |/s => /$/demo.namer/demox/localhost;
    |""".stripMargin)

Эти правила применяются для всех случаев разрешения путей. Локальные правила устанавливаются через Dtab.local: в конце запроса нужно либо заменить, либо обнулить правило. Для этого можно обернуть операции в Dtab.unwind.

Dtab.unwind {
  Dtab.local = Dtab.read(
    """
      |/s => /$/demo.namer/demoy/localhost;
      |""".stripMargin)
  doRequest
}

Локальные правила добавляются в конец списка и имеют приоритет перед глобальными, но глобальные правила все равно доступны и выполняют роль альтернативного пути. Если мы используем Finagle и Twitter future, dtab local достается из заголовка и при запросе проставляется туда же. Таким образом, можно очень гибко маршрутизировать трафик внутри системы, делать A/B-тесты, canary deploy и многое другое. Чтобы не утонуть во всем этом, в инфраструктуре обязательно должен быть трейсинг.

Используя указанные выше механизмы, можно интегрироваться в любую систему Service Discovery, где источником может быть DNS, etcd, zookeeper или даже локальный файл. Понадобится лишь написать немного кода. Примеры конфигурации и работы resolver и dtab можете посмотреть в репозитории.

Cross DC балансировщик

Теперь у нас есть все необходимое для написания умного балансировщика. К нему будет два требования:

  • отправлять запросы в свой дата-центр;

  • если нет доступных хостов, отправлять в остальные.

Для этого нам нужно уметь определять, к какому DC принадлежит каждый хост, и уметь выбирать подходящий.

Для начала рассмотрим, как устроен балансировщик в принципе. Балансировщики унаследованы от trait Balancer[Req, Rep]. Этот trait имеет множество методов: для статистики, ошибок и так далее. Но для нас важны следующие:

private trait Balancer[Req, Rep] extends ServiceFactory[Req, Rep] {
  // Абстрактный тип класса, который будет содержать логику выбора узла и перестроения в случае обновления доступных узлов
  protected type Distributor <: DistributorT[Node] { type This = Distributor }
  
  // Абстрактный тип для узла. Содержит информацию о загруженности и доступности, а также фабрику для создания соединения
  protected type Node <: NodeT[Req, Rep]
  
  // Фабричный метод для создания объектов Node. Используется в Distributor
  protected def newNode(factory: EndpointFactory[Req, Rep]): Node
  
  // Фабричный метод для создания начального экземпляра Distributor
  protected def initDistributor(): Distributor
}

Вся остальная логика уже содержится в Balancer либо во вспомогательных Trait. Так, если нам хочется выбирать менее нагруженный сервер, нужно добавить к итоговому классу балансировщика trait LeastLoaded[Req, Rep], а к реализации Node — trait LeastLoadedNode. Все, теперь наш Balancer поумнел и будет учитывать загруженность. Если у вас есть своя метрика, можно использовать ее и переопределить значение метода load или добавить что-то свое.

Делаем свой балансировщик

Основное мы разобрали, давайте приступим к созданию балансировщика. Мы должны уметь определять, к какому DC принадлежит каждый хост. Эту задачу возложим на Resolver — для этого реализуем свой. Для демонстрации я буду использовать простую логику и воспользуюсь Announcer, а при старте сервера буду регистрировать каждый сервис, передавая, в каком DC он развернут. В реальности мы можем получить эти данные из service discovery либо определять их по имени хоста, если есть внутреннее правило по их именованию, которое учитывает и дата-центр. Информацию об этом будем укладывать в поле metadata у Address. В дальнейшем это поле будет доступно в балансировщике.

Теперь определим необходимые классы. Определим нашу Node. Для этого добавим trait, содержащий информацию о нашем расположении:

trait ZonedNode[Req, Rep] extends NodeT[Req, Rep] {
  def zone: Option[String]
}

Сама Node будет выглядеть следующим образом:

case class Node(factory: EndpointFactory[Req, Rep], zone: Option[String])
    extends ServiceFactoryProxy[Req, Rep](factory) with LeastLoadedNode with ZonedNode[Req, Rep]

В качестве метрики загруженности будем использовать количество запросов (LeastLoadedNode). И определим метод для создания узлов:

def newNode(factory: EndpointFactory[Req, Rep]): Node = {
   val zone = (factory.address match {
    case Address.Inet(_, metadata)           => metadata.get(ZonedBalancers.zoneField)
    case Address.ServiceFactory(_, metadata) => metadata.get(ZonedBalancers.zoneField)
    case Address.Failed(_)                   => None
  }).collect { case name: String =>
    name
  }
  Node(factory, zone)
}

В этом месте нам пригодились метаданные, которые мы определяли в Resolver. Также мы можем навесить фильтры на сами Service[Req, Rep] через factory.map.

Осталось только определить логику выбора узла в классе Distributor. Полную реализацию можете посмотреть в git, здесь же приведу сам метод выбора:

val (inSameDatacenter, inOtherDatacenter) = vector.partition { node =>
    node.zone.contains(currentZone)
  }
  private def isAcceptable(node: Node): Boolean =  node.isAvailable && maxLoad.forall(node.load <= _)
  private[this] val p2cZeroCounter = statsReceiver.counter(
    description = "counts the number of times p2c selects two nodes with a zero load",
    Verbosity.ShortLived,
    "p2c_zoned",
    "zero")
  override def pick(): Node =
    if (vector.isEmpty) failingNode
    else {
      val other = inOtherDatacenter.filter(isAcceptable)
      val same  = inSameDatacenter.filter(isAcceptable)
      val subVector = if (inSameDatacenter.isEmpty && inOtherDatacenter.isEmpty) {
        vector
      } else {
        if (same.nonEmpty) same else other
      }
      P2CPick.pick(subVector, subVector.size, rng, p2cZeroCounter)
    }

Чтобы не реализовывать выбор самому, воспользуемся готовым из P2C, он нам подходит. Для этого будем предварительно фильтровать узлы по доступности и загруженности (если указано), после этого, если есть доступные узлы в текущем дата-центре, выбираем из них, иначе выбираем из оставшихся. А затем, если есть что-то в текущем дата-центре, выбираем из них. Если нет — идем в остальные.

Вот и все — наш балансир готов! Вы можете реализовать балансировку любым удобным для вас способом и использовать любые метрики, до которых сможете дотянуться. Можно не только использовать локальные, но и учитывать загруженность CPU/IO каждого узла, получая данные из Service Discovery. Либо получая их из метрик — тут все зависит от вашей фантазии и потребностей. Полный код примера можно посмотреть на Github. 

Комментарии (0)