Переход от однохостового процесса к распределённой системе
10 июля 2019 года. Тейлор Свифт впервые исполняла свою песню “You Need To Calm Down” в нью-йоркском зале Hammerstein Ballroom. Это была прямая трансляция для всех 200 миллионов пользователей Amazon Prime по всему миру. Пока концерт шёл без сбоев, я был в восторге. Тейлор Свифт, конечно, очаровательна; но главная причина моего восторга была в том, что технология, которую я создал во время работы в Amazon, использовалась для подтверждения того, что мы можем обслуживать миллионы довольных Свифт — и эта технология работала безупречно. За кулисами я несколько месяцев сотрудничал с потрясающими инженерами из Prime Video, чтобы сделать это.
Я уже писал историю о том, как создал и развил инфраструктуру, которую Amazon использует для проведения нагрузочного тестирования и тестирования производительности (TPSGenerator). Но я ещё не писал о том, как я её масштабировал.
Первая версия была довольно убогой и предназначалась только для одного хоста. Я запускал её как инструмент командной строки, указывал, сколько тысяч транзакций (“TPS”) в секунду мне нужно — и TPSGenerator порождал потенциально тысячи потоков, радостно потребляя все аппаратные ресурсы, которыми вы были готовы пожертвовать. В тщетной попытке избежать проблем с распределёнными системами я профилировал каждую строчку кода и оптимизировал её до последней степени. На обычной машине он мог генерировать несколько сотен тысяч TPS, что было достаточно для многих сервисов.
Но приближался день больших распродаж «Киберпонедельник» — в этот день совершалось самое большое количество покупок в году. По мере приближения даты всё больше команд стали спрашивать меня, как генерировать большую пропускную способность, чем та, на которую способна одна машина — чтобы убедиться, что их сервисы справятся с пиком.
Поскольку культура в Amazon была несколько агрессивной и прагматичной, мои первые ответы были в духе: «Хм, масштабируй вертикально, возьми машину побольше» или «Ну, ты можешь подключиться по ssh к нескольким машинам и запустить TPSGenerator на каждой из них». Эти решения были, безусловно, убогими и неудовлетворительными (для меня и для моих клиентов). Я знал, что нужно что-то получше.
Метод «ssh на N машин и запуск процесса на каждой из них» не был хорошим, но в успешном сценарии он работал хорошо. Представьте, что вы хотите генерировать 40 000 TPS, а каждая машина может генерировать только 10 000 TPS. Вы можете зайти по ssh на 4 машины и запустить процесс TPSGenerator на каждой из них. Тестируемая система будет получать 40 000 TPS синтезированного трафика с этих машин.
Это ломалось, когда конкретная машина выходила из строя или становилась неавторизованной, потому что другие машины не знали о проблеме. В итоге вы получали меньшую нагрузку, чем планировали:
Или когда конкретная машина не могла генерировать столько же нагрузки, сколько остальные, — либо из-за различий в аппаратных характеристиках, либо из-за повреждённой сетевой карты, либо из-за какого-то случайного процесса, запущенного в фоновом режиме:
Возникло множество дополнительных проблем: как узнать, сколько хостов мне нужно? Как аварийно остановить процесс, запущенный удалённо на потенциально сотнях или тысячах машин? Что делать, если они не отвечают? А запуск и мониторинг всего теста требовали огромных усилий.
По мере того как я всё глубже задумывался над этой проблемой, выяснилось, что основная сложность оптимизации заключалась в том, что TPSGenerator должен был выполнять произвольный клиентский код с высокой пропускной способностью. Не все транзакции одинаковы. Некоторые ограничены процессором, другие — пропускной способностью сети. Некоторые могут выполняться быстро (за миллисекунды), некоторые медленно (за секунды). Как оптимизировать распределённую систему, если вы не знаете шаблонов данных, под которые оптимизируете?
Мне всё стало ясно в тот момент, когда я осознал, что могу применить к этой проблеме одну из старейших техник информатики: «Разделяй и властвуй» (Divide-And-Conquer):
Я могу декомпозировать задачу «выполнить 40 000 TPS в течение 20 минут» на 400 более мелких задач наподобие «выполнить 100 TPS в течение 20 минут», выполняемых параллельно. А ещё могу разложить задачу «выполнить 100 TPS за 20 минут» на 20 более мелких задач вроде «выполнить 100 TPS за 1 минуту», выполняемых последовательно.
Таким образом, вместо того чтобы думать о большой задаче «запустить 40 000 TPS в течение 20 минут», мне нужно было думать о том, как организовать 8000 маленьких задач «запустить 100 TPS в течение 1 минуты».
Архитектура “Controller → Queue → Worker” оказалась как нельзя кстати.
Вы попросили контроллер организовать выполнение 40 000 TPS в течение 20 минут. Контроллер разложит его на более мелкие задачи и будет отвечать за размещение сообщений в очереди с той скоростью, с которой они должны выполняться. Группа Worker-ов будет прослушивать очередь, отменять и выполнять задания. Именно Worker-ы создавали синтезированную тестовую нагрузку и воздействовали на тестируемую систему.
Это было очень просто, но чрезвычайно мощно.
Отделение контроллера от Worker-а давало массу преимуществ. Контроллеру не нужно было знать или заботиться о том, кто на самом деле выполняет работу. Он просто знал, что помещает задачу в очередь, и, если всё было в порядке, то «что-то» отменяло эту задачу, как только она была поставлена в очередь. Размер очереди был критически важной информацией: если размер очереди увеличивался, контроллер знал, что Worker-ов недостаточно или существует фатальная проблема — поэтому он мог выполнить экстренную остановку, удалив очередь. Без очереди, указывающей, что делать, Worker-ы немедленно останавливались.
Что делать, если у нас недостаточно Worker-ов? Мы можем просто добавить больше Worker-хостов в любое время. В следующей версии я сделал контроллер умнее, заставив его общаться с группой автомасштабирования EC2, чтобы реактивно увеличивать количество хостов, если он видел, что размер очереди растёт. В следующей версии я попробовал сделать это проактивно: если контроллер знал, что через минуту ему нужно будет увеличить нагрузку, он мог сделать этот запрос проактивно сейчас, чтобы дополнительное оборудование было готово к работе, когда оно понадобится.
Что произошло, если Worker умер? Ничего страшного. Другие Worker-ы просто возьмут на себя нагрузку. На самом деле, в Amazon SQS есть фича, которая обеспечивает мне дополнительную устойчивость. Когда вы удаляете сообщение из SQS, он на самом деле не удаляет его, а просто устанавливает флаг видимости в false, чтобы никто больше не мог снять его с очереди. При нормальной работе, когда Worker успешно завершает обработку задания, он снова обращается к SQS, чтобы удалить его. Если Worker умирает и не вызывает SQS для удаления сообщения, то после тайм-аута SQS снова устанавливает флаг видимости в true — и это сообщение будет прочитано и выполнено другим Worker-ом.
Как эта архитектура справлялась с проблемой разнородных парков? То есть, как быть в случае, когда Worker не может генерировать столько же нагрузки, сколько все остальные Worker-ы?
На самом деле я сделал три итерации. В первой версии я просто отказался от этой идеи, потому что у меня было всего 2-3 месяца на то, чтобы создать и запустить хоть что-то — поэтому мне пришлось пойти на компромиссы, учитывая сжатые сроки. Запускалось столько Worker-ов, сколько ядер в машине, так что всё было довольно просто. Но стало ясно, что мне нужно, чтобы Worker-ы саморегулировались; и они должны были умнее относиться к тому, сколько сообщений они могут обрабатывать одновременно.
Поэтому я написал небольшой «светофор» как часть кода Worker-а, который считывал данные из очереди. Он отвечал за то, чтобы Worker приостанавливал и возобновлял чтение из очереди. Было несколько фоновых потоков, которые отслеживали такие вещи, как использование процессора, памяти, пропускной способности сети, использование диска и так далее. Когда любой из этих аппаратных ресурсов опускался ниже настраиваемого порога (например, 80 % CPU), Worker приостанавливал чтение из очереди, пока использование CPU снова не снизится [AWS AutoScaling также имеет возможность масштабировать количество хостов вверх или вниз на основе аппаратных ресурсов, и я использовал её; но мне также нужна была возможность более тонкого масштабирования количества потоков на каждом хосте].
Этот второй раунд сработал на удивление хорошо и позволил Worker-ам быть довольно умными в плане постоянной самонастройки количества сообщений, которые они обрабатывают в данный момент времени.
Я не сразу понял, что не все узкие места связаны с аппаратным обеспечением. Например, большая часть Amazon в то время использовала Apache Commons MultiThreadedHttpConnectionManager, у которого DEFAULT_MAX_TOTAL_CONNECTIONS равнялось 20. Сервисный фреймворк Amazon, Coral, использовал его под капотом для всех автоматически генерируемых клиентов, так что это значение по умолчанию было повсюду в компании. И большинство инженеров даже не подозревали, что оно у них есть. Они просили TPSGenerator сгенерировать тысячи TPS, но сами не знали, что в их собственном коде есть произвольное ограничение в 20 одновременных соединений, поэтому последующие соединения просто вставали в очередь. Это расстраивало: инженеры, устраняющие неполадки, видели, что машина не использует много аппаратных ресурсов, но и не генерирует много трафика.
В третьей версии самонастраивающегося кода меня посетило озарение, что на самом деле здесь можно использовать эвристику «поймай всё». В успешном случае количество завершённых транзакций имело точно такой же наклон, как и количество начатых транзакций. Но в случае узкого места, не связанного с аппаратным ресурсом, я знал, что наклон завершенных транзакций будет значительно меньше наклона начатых транзакций, и это давало мне подсказку, что система в беде.
К тому моменту я построил систему, которая обладала огромной масштабируемостью и могла динамически саморегулироваться, определяя, сколько хостов использовать и сколько потоков должно быть запущено на каждом из этих хостов.
Но не всё было гладко. По пути я столкнулся с тысячей мелких проблем.
Одним из основных решений, которое я принял, было пожертвовать некоторой точностью ради достижения крупного масштаба. Проектирование распределённых систем — это поиск компромиссов с учётом конкурирующих нефункциональных приоритетов. В однохостовом режиме TPSGenerator мог быть на 100% точным. Вы хотели сгенерировать 23 476,76 TPS в течение 15 секунд? Вы получите ровно 23 476,76 TPS за 15 секунд. Но в многохостовом режиме возможность достижения нагрузки в миллионы TPS была важнее, чем такая точность. Многие механизмы саморегуляции имели задержку в несколько секунд, так что вы не могли работать на 4 000 007 TPS в течение 20 секунд, а затем переключиться на 4 000 009 TPS в течение 10 секунд. Он был оптимизирован для увеличения времени работы и большой пропускной способности за счёт некоторой точности. Компромиссы в распределённых системах!
Изначально для простоты я жёстко задал значение TPS, запрашиваемого на одно сообщение очереди, равным 100. Но если бы я захотел создать нагрузку в 40 млн TPS, это привело бы к тому, что контроллер писал бы 400 тыс. сообщений в SQS каждую минуту, или почти 7000 записей в секунду. Поэтому контроллер рассчитывал TPS-на-сообщение как функцию от общего желаемого TPS, чтобы скорость записи не была такой агрессивной.
Мне нужно было следить за тем, чтобы Worker-ы не перегружали SQS, поэтому я переключил их на длинный опрос. Я также добавил небольшой случайный джиттер при запуске, чтобы избежать проблемы громогласного стада, когда тысячи Worker-ов одновременно попадают в очередь.
И наконец, про «слона в комнате». Вся архитектура имела единственную точку отказа: контроллер. Я поиграл со вторым контроллером, по сути, в теневом режиме, отслеживая состояние активного контроллера с помощью сердцебиений. Это заставило меня заняться выборами лидера, протоколами сплетен и алгоритмами консенсуса, такими как Raft и Paxos. В конце концов, я не был удовлетворен тем количеством сложности, которое это привнесло — учитывая, как редко это было актуальной проблемой. Иногда решение проблемы приносит больше проблем, чем та, которую оно устраняет.
Основной проблемой были метрики. В однохостовом режиме TPSGenerator отслеживал такие вещи, как транзакции, выполняемые каждую секунду, их количество, сколько из них прошли и нет; а также показатели задержки в минуту в процентах. Он мог принять решение о прерывании теста, если количество ошибок было слишком велико или задержка значительно превышала ожидания. Но в распределённом мире у контроллера не было метрик, они были у Worker-ов. Как контроллер мог получить доступ к этим метрикам, чтобы принимать такие решения?
Я вкратце рассмотрел механизм push или pull, в котором контроллер мог бы запрашивать метрики у Worker-ов, а Worker-ы могли бы отправлять свои метрики контроллеру:
Мне это не нравилось по двум простым причинам. Во-первых, это было повторное соединение компонентов, которые я специально развязал. Но самое главное, это могло превратить контроллер в узкое место, если он получал миллионы метрик в минуту. Контроллер уже был единственной точкой отказа, поэтому я не хотел испытывать судьбу.
И снова проектирование распределённых систем — это компромисс между конкурирующими требованиями. Я выбрал архитектуру, в которой Worker-ы отправляли свои метрики в AWS Cloudwatch, а контроллер получал их оттуда. Недостатком была задержка: если бы Worker-ы отправляли свои метрики напрямую контроллеру, он мог бы принимать решения о состоянии мира практически в реальном времени. Но при использовании Cloudwatch в качестве посредника возникала потенциальная многоминутная задержка, учитывая потенциально поздно прибывающие метрики с чужих хостов. Это был компромисс, на который я нехотя согласился, чтобы избежать гарантированного узкого места.
В течение многих лет мы добавляли в алгоритм и продукт миллион дополнительных мелочей и эвристик. Но суть осталась прежней. Я ушёл из Amazon в 2020 году, но мне приятно знать, что этот алгоритм работает на тысячах машин в дата-центрах Amazon и сегодня, с миллионными TPS.
В заключение расскажу об открытых уроках по нагрузочному тестированию, которые пройдут скоро в Otus:
— 6 июня: Первый нагрузочный тест в Apache Jmeter. Создадим и отладим HTTPS-скрипт, проведем итерацию нагрузочного тестирования и проанализируем полученные результаты. Научимся генерить информативные html-отчеты. Записаться можно по ссылке
— 18 июня: Настройка связки Gatling с Grafana, VictoriaMetrics. Посмотрим, как настроить отправку метрик в набирающую популярность VictoriaMetrics и как визуализировать эти метрики в Grafana. Запись по ссылке