Всем привет! Это Сергей Качеев, старший разработчик в отделе сетевой инфраструктуры Yandex Infrastructure. Наша команда создаёт технологии, на которых работают сервисы Яндекса. В прошлый раз я рассказал целый сетевой детектив о том, как мы искали баг, который убивал DNS‑сервер Unbound. И сегодня я расскажу не менее интересную историю.

Мне на развитие попала XDP eBPF‑программа, которая защищает DNS‑серверы от выхода из строя под слишком большой нагрузкой (другими словами, от DDoS). На ядре 5.4 алгоритм защиты был основан на EWMA‑статистике с вероятностными дропами, которые постоянно контролировались из Control Plane. Это делало eBPF‑программу неавтономной. К тому же если Control Plane падал, то сервер оставался в состоянии последнего удачного обновления eBPF. Это нужно было исправлять — было решено заменить это всё на Token Bucket. Этот момент и будем считать отправной точкой в нашей истории.

С чего всё началось

Изначально задача защиты DNS‑серверов появилась из конкретного случая, когда один из потребителей прислал нам миллионы запросов на сервер в пике. На старте, после обсуждения всех условий и ограничений, в этой программе был реализован простой способ лимитирования.

В автоматическом режиме для каждого нового клиента в eBPF заводился новый Token Bucket (далее — TB) с каким‑то лимитом по умолчанию. Фактически TB заменяли стейты EWMA‑статистики. Таких TB могло накопиться много, но реально активных обычно было очень мало. В теории всего таких клиентов может быть 2^32, то есть столько же, сколько IPv4-адресов, но на практике ожидалось, что одновременно не может существовать больше 100 тыс. клиентов.

Также в конфигурации была возможность описать каких‑то конкретных клиентов, для которых можно поменять лимит по умолчанию.

И такая схема работала, но имела свои недостатки. Лимит по умолчанию назывался limit_any. И условно всё выглядело так:

limit_any = 160k
[ wk1 | wk2 | wk3 |  unk1 | unk2 | unk3 ... ]
wk (well known) лимиты определены для конкретных потребителей и описаны в конфигурации
unk (unknown) лимиты появляются на каждого нового потребителя, ему выдаётся лимит limit_any

Это значит, если несколько клиентов одновременно начнут потреблять всю свою квоту, они всё равно смогут перегрузить сервер. Другими словами, в такой схеме мы были защищены от пика запросов от одного клиента, но не от множества.

Подумав над этими ограничениями, мы решили сделать Hierarchical Token Bucket (далее — HTB). Но так как в XDP нет очередей, придётся его делать как счётчик (HTB as a meter).

Среда: что было на старте

Как вы уже поняли, сегодня мы поговорим о eBPF и XDP. Вообще eBPF — это довольно просто и для старта достаточно сделать три простых шага:

  1. Написать программу на C (Data Plane), которая будет работать в контексте ядра.

  2. Пройти успешную верификацию нашей программы у eBPF verifier.

  3. Написать программу (Control Plane), которая загрузит eBPF‑программу в ядро и будет управлять ей.

Всё это заслуживает отдельных статей. Я надеюсь, что читатель знаком с eBPF, хотя это и не обязательно.

В этой статье речь пойдёт про разработку алгоритма лимитирования запросов к серверу. Будем считать, что один запрос — это один сетевой пакет. Для нас это подходит, потому что лимитировать будем DNS‑запросы. Алгоритм будет на основе TB.

Наша eBPF‑программа работает в XDP, что накладывает на нас следующие ограничения:

  • Нет возможности поставить пакет в очередь: очередей нет, есть пакет в текущем моменте, и нужно решить, что с ним делать.

  • Нет локов: даже bpf_spin_lock недоступен, есть только атомики.

  • Нет тредов: нельзя запустить фоновый процесс, который будет что‑то контролировать.

  • Нужна автономность eBPF‑программы, чтобы падения Control Plane не нарушали работу лимитера (поэтому какой‑то контроллер в Control Plane не рассматриваем вообще).

  • TB может быть много, поэтому важно сделать стейт бакета как можно меньше.

  • Нам важна скорость, поэтому нужно стараться оптимизировать всё, что получится (например, во время работы всплыл факт, что деление — это дорого и желательно его избежать).

Первая реализация была осложнена тем, что я пытался её делать на ядре 5.4. Там очень сильные ограничения BPF verifier и к тому же нет атомиков, которые возвращают данные. Была идея использовать bpf_spin_lock, но в результате исследований стало ясно, что в XDP спинлоки недоступны. А атомики, возвращающие значение, появились только в версии 5.12.

Начиная с версии 5.15 уже чувствуется некоторая свобода в действиях. Verifier стал умнее, лимиты расслабили, добавили новых, нужных нам возможностей и сильно улучшили сообщения verifier об ошибках.

Несколько полезных ссылок

А почему не YARL?

В Яндексе есть несколько реализаций лимитеров, и одной из последних является YARL. Возникает резонный вопрос: почему не взять его?

Тут всё просто. Мы думали использовать его как верхнеуровневый оркестратор, но быстро поняли, что нам это просто не нужно. YARL необходим там, где у вас за кучей машинок фронтенда или бэкенда есть база данных и нужно защитить её от перегрузки. То есть производительность базы данных является неким потолком производительности вашего сервиса.

В случае наших DNS‑серверов каждый сервер — это отдельный юнит. Исчерпание его вычислительных ресурсов никак не сказывается на соседнем сервере. Поэтому нам нужен локальный механизм для сервера. К тому же на данный момент у YARL нет модуля для eBPF.

TB для XDP

Учитывая все ограничения среды, я стал придумывать какой‑то рабочий вариант. Начнём с простого: все алгоритмы TB описывают какой‑то bucket, где копятся токены.

Для начала покажу вспомогательный код (весь код примеров можно найти в репозитории симулятора, о котором расскажу позже).

#define ONE_SEC_NS (u64)1e9
#define MIN(X, Y) ((X) < (Y) ? (X) : (Y))
#define MAX(X, Y) ((X) > (Y) ? (X) : (Y))

typedef enum {
    DG_DROP,
    DG_PASS,
} E_DG_Verdict;

// Сокращение DG — от dnsguard

Заведём структуру с одним полем s64 (signed 64 bit или int64_t из stdint.h) и будем использовать его как хранилище токенов.

// Если нет особых причин не делать typedef, всегда делайте typedef. Это упрощает понимание вашего кода.
typedef struct {
    s64 tokens;
} TBucket;

Логично обозначить токены как количество наносекунд, которые должны пройти между пакетами. Так мы обеспечим оговоренный лимит по количеству пакетов, проходящих в секунду. Стоимость одного пакета не может превышать количества наносекунд в секунде, то есть не больше одного миллиарда единиц (1e9).

Например, для лимита в 3 RPS (три запроса в секунду) мы получаем примерно вот такую картинку.

+------------------------------- Одна секунда --------------------------------+
[       333e6 ns        |         333e6 ns         |          333e6 ns        ]
+-----------------------------------------------------------------------------+
         `- стоимость одного пакета в наносекундах

Дальше нам нужно придумать, как наполнять ведро с токенами.

Пока идёт системное время, в наш бакет накапливаются токены, поэтому логично, что нам нужно запомнить время последнего наполнения ведра. Ещё нам нужно хранить объём ведра — придётся добавить ещё одно поле для capacity. Дальше мы можем смотреть на разницу now_timestamp — last_timestamp и, если это значение больше нуля, то добавлять его к TBucket→tokens.

typedef struct {
    s64 tokens;
    s64 last_timestamp;
    s64 packet_cost;
    s64 capacity;
} TBucket;

void refill_bucket(TBucket *tb, u64 now_timestamp) {
    // Если refill_bucket вызывается слишком часто, игнорируем эти вызовы
    if (tb->last_timestamp >= now_timestamp) {
        return;
    }
    // Получаем разницу от прошлого времени наполнения бакета
    u64 elapsed = now_timestamp - tb->last_timestamp;
    // Заполняем наш бакет, важно хранить не больше чем capacity (объём ведра) токенов.
    tb->tokens = MAX(tb->tokens + elapsed, tb->capacity);
    ts->last_timestamp = now_timestamp;
}

Как говорится, всё гениальное просто. Но дьявол кроется в деталях.

Так как программа работает в eBPF, у нас просто нет возможности запустить фоновый тред, поэтому наполнение ведра должно происходить тут же при обработке запроса. При таком режиме множество тредов ядра могут попытаться обновлять один и тот же бакет. А это приведёт к гонке на данных стейта TB, то есть к непредсказуемым результатам при заполнении бакета.

Для нормальной работы алгоритма нам нужно атомарно обновлять два значения: s64 tokens и last_timestamp. Очевидным было бы завести лок и выполнить что‑то вроде…

spin_lock(tb->lock)
refill_bucket(&tb->tokens)
spin_unlock(tb->lock)

Но как я писал выше, bpf_spin_lock нам недоступен. Более того, это очень неоптимальный механизм: все запросы конкурировали бы за доступ в эту критическую секцию и блокировались, тем самым сильно ухудшая производительность. В итоге у нас ничего не остаётся, кроме атомиков.

Я долго обдумывал различные техники, как можно без гонки обновить два атомика и сохранить работоспособность алгоритма. Часто приходили идеи, которые на практике не работают.

Первая реализация TB

Вот так выглядела неудачная попытка реализации TB № 1.

E_DG_Verdict dg_tb_consume_or_refill(Bucket* tb, s64 now_ns) {
    // Доступ без атомарного чтения не проблема, потому что в конечном итоге
    // всё синхронизируется через атомарные операции ниже по коду.
    if (tb->tokens < tb->packet_cost) {
        s64 last_ts = tb->last_timestamp;
        s64 old_tokens = tb->tokens;

        if (last_ts >= now_ns) {
            return DG_DROP;
        }

        s64 elapsed = now_ns - last_ts;
        s64 tokens = old_tokens + elapsed;
        if (tokens >= tb->packet_cost) {
            tokens -= tb->packet_cost;
        }
        if (__sync_bool_compare_and_swap(&tb->last_timestamp, last_ts, now_ns)) {
            if (__sync_bool_compare_and_swap(&tb->tokens, old_tokens, tokens)) {
                if (old_tokens + elapsed >= tb->packet_cost) {
                    return DG_PASS;
                }
            }
        }
    }
    if (__sync_add_and_fetch(&tb->tokens, -tb->packet_cost) > tb->packet_cost) {
        return DG_PASS;
    }
    return DG_DROP;
}

Иногда удавалось что‑то сделать с этими вариантами так, что казалось, что они заработали нормально. Но нет. Вот неудачная попытка починить неудачную реализацию № 1.

E_DG_Verdict dg_tb_consume_or_refill(Bucket* tb, s64 now_ns) {
    // Доступ без атомарного чтения не проблема, потому что в конечном итоге
    // всё синхронизируется через последующие атомарные операции.
    if (tb->tokens < tb->packet_cost) {
        s64 last_ts = tb->last_timestamp;
        s64 old_tokens = tb->tokens;

        if (last_ts >= now_ns) {
            return DG_DROP;
        }

        s64 elapsed = now_ns - last_ts;
        s64 tokens = old_tokens + elapsed;
        if (tokens >= tb->packet_cost) {
            tokens -= tb->packet_cost;
        }
        if (__sync_bool_compare_and_swap(&tb->last_timestamp, last_ts, now_ns)) {
            if (__sync_bool_compare_and_swap(&tb->tokens, old_tokens, tokens)) {
                if (old_tokens + elapsed >= tb->packet_cost) {
                    return DG_PASS;
+                } else {
+                    return DG_DROP;
                }
            }
        }
    }
-    if (__sync_add_and_fetch(&tb->tokens, -tb->packet_cost) > tb->packet_cost) {
+    if (tb->tokens >= tb->packet_cost && __sync_add_and_fetch(&tb->tokens, -tb->packet_cost) > tb->packet_cost) {
        return DG_PASS;
    }
    return DG_DROP;
}

В такие моменты у меня почти всегда было чувство, что это не заработает. Тщательное тестирование подтверждало это. Проблема с этим кодом была в том, что мы в разных местах обновляем либо время, либо ведро с токенами. И всегда будет случаться так, что рейт запросов, проходящих через лимитер, будет не соответствовать установленному.

Тестовая среда

Тут стоит сделать отступление, чтобы подчеркнуть важность быстрой итерации в цикле разработки любого эксперимента (Proof of Concept, или сокращённо PoC).

Всё состоит из трёх этапов: идея, код, тестирование. Если проверка идеи требует коммита в ветку, выкатки в тестинг и только потом тестирования, такой вариант не годится для мозгового штурма. В такой среде придётся долго и тщательно продумать своё решение и только потом пробовать.

Поэтому очень важно локальное тестовое окружение. Желательно, чтобы оно было автоматизированное с воспроизводимыми результатами. Именно поэтому впоследствии я потратил немного времени и написал на коленке симулятор, которым я проверял мои попытки запустить Token Bucket в XDP.

У симулятора есть два режима:

  • простенький REPL, реализованный через прекрасную утилиту rlwrap;

  • запуск сценариев тестирования TB.

Он выводит метрики потребления TB в stdout. Через Makefile я сделал вывод в pipe, в другом окне. Этот pipe читается другой утилитой tlook.

Итого мой цикл разработки сводился к следующим действиям:

  1. Описываю интересующий меня сценарий тестирования TB.

  2. Пишу новый вариант кода.

  3. Запускаю симулятор в одной консольке в режиме сценария.

  4. Запускаю tlook и смотрю результат прогона скрипта.

  5. Если нужно, то провожу дополнительный тест с режимом REPL и руками пытаюсь найти сценарий, в котором код сломается.

Но до симулятора я разрабатывал долго и упорно продумывал своё решение (неизбежно допуская ошибки и упуская хорошие альтернативные варианты). Симулятор появится позже, после v1.

Изобретение рабочей версии — v1

Вернёмся к первой версии. В тот момент я не придумал ничего нормального и решил пойти по пути уменьшения данных, которые нужно обновлять атомарно. Другими словами я решил отказаться от попытки обновить два атомика и уложиться в один u64.

64 бита — это целых два 32-битных числа. Можно в одном атомике хранить сразу и предыдущий timestamp наполнения ведра, и количество доступных токенов.

            (u32) now ts                |          (u32) tokens
u64   00000000_00000000_00000000_00000000_00000000_00000000_00000000_00000000

Но для наносекунд u32 — это всего 4 секунды. И с этим нужно что-то делать.

Я решил поступить следующим образом: если поделить текущий timestamp в наносекундах (now_timestamp) на количество наносекунд в 10 миллисекундах (NS_TO_SSEC_DIV == 10e6), то мы получим timestamp с точностью до сотых секунды. Кажется, это можно назвать сантисекунды — по аналогии с сантиметрами в метре.

Но число всё ещё слишком большое. Его можно просто обрезать сверху (сделать приведение к u32).

// 10e6(10_000_000): Nanoseconds to santiseconds divider
#define NS_TO_SSEC_DIV (u32)10e6
u32 now_santi_sec = now_timestamp / NS_TO_SSEC_DIV;

В таком случае в u32 помещается примерно 1,3 года сантисекунд. С этим можно работать.

То же самое с токенами: мы не можем хранить их в наносекундах по причинам, описанным выше (хватит всего на 4 секунды токенов). Поэтому также поделим на NS_TO_SSEC_DIV максимальную стоимость пакета (1 секунду наносекунд), превратив токены в сантисекунды. Стоимость пакета теперь не может превышать 100 единиц.

И это сразу становится проблемой: в 100 единицах не получится сделать приемлемый механизм управления полосой от 0 до каких‑нибудь 2 миллионов запросов в секунду. Чтобы преодолеть это, я решил инвертировать механизм вычитания стоимости пакета из ведра с токенами. Вместо деления секунды (в нашем случае 100 единиц) на количество запросов для получения реальной стоимости пакета, мы будем считать, что все пакеты стоят одинаково — 100 единиц. То есть при проходе любого пакета из ведра всегда вычитается 100 единиц.

С другой стороны, при накапливании токенов мы будем добавлять сантисекунды, умноженные на количество запросов в секунду для конкретного TB (RPS или рейт). То есть одна секунда будет прибавлять 100 * RPS токенов. Несложная математика подсказывает, что так мы сможем задавать разрешённый лимит до 40М RPS.

u32::MAX / 100 ~= 40Mrps

Этого более чем достаточно.

Может показаться, что упаковка двух u32 в один u64 — это какая‑то сложная магическая операция с битами. Да, она может быть такой:

#define DG_TB_PACK_TIME_AND_LIMIT(time, limit) ((time << 32) | DG_TB_PACKET_COST * limit)

Но на самом деле всё намного проще:

// Значение одного бакета, которое будет упаковано в u64
struct dg_bucket {
    s32 tokens;
    u32 last_time;
};

u64 dg_tb_pack_time_and_limit(s32 time, u32 limit) {
    u64 packed = 0;
    // Представляем u64 как структуру из двух 32-битных чисел
    struct dg_bucket* b = (struct dg_bucket*)&packed;  // Вся МАГИЯ в этой строке.
    b->tokens = DG_TB_PACKET_COST * limit;
    b->last_time = time;
    return packed;
}

В результате я пришёл к следующему алгоритму.

Код алгоритма
// аргумент u32 now — это подготовленный ранее now_santi_sec (то самое медленное деление).
// max_time_slice — это неудачное название. На самом деле это максимальная capacity бакета.
static __always_inline s32 dg_tb_consume_or_refill(u64* tb, u32 now, u32 rate, u32 burst, u32 max_time_slice) {
    struct dg_bucket* b = (struct dg_bucket*)tb;
    s32 tokens = b->tokens;
    // Попытка ослабить конкуренцию на чтение атомика и снизить скорость уменьшения b->tokens,
    // чтобы уменьшить нагрузку на CPU и случайно не переполнить s32 при вычитании.
    if (tokens >= DG_TB_PACKET_COST) {
        tokens = __sync_add_and_fetch(&b->tokens, -DG_TB_PACKET_COST);
        if (tokens >= DG_TB_PACKET_COST) {
            return tokens;
        }
    }
    u64 packed = *tb;
    // Атомарная операция Compare and Swap требует предоставить старое значение числа,
    // чтобы убедиться, что мы обновляем самое последнее актуальное значение.
    // Если атомик уже обновлён конкурентно из другого потока исполнения,
    // вторая фаза Swap не запустится, потому что Compare увидит, 
    // что у нас невалидное значение в old_packed
    u64 old_packed = packed;
    b = (struct dg_bucket*)&packed;
    if (now > b->last_time) {
        s32 elapsed = now - b->last_time;
        // Добавляем токены в размере 
        // <количество сантисекунд с момента последнего заполнения> * limit RPS
        s32 add = MIN(elapsed, max_time_slice) * rate;
        b->last_time = now;
        b->tokens = MIN(MAX(b->tokens, 0) + add, burst);
    } else if (b->last_time - now > 1<<31) {
        // Прошло больше ~1,3 лет с момента загрузки сервера, время в last_time переполнилось.
        // Нужно рассчитать elapsed немного иначе либо просто сбросить бакет в начальное состояние:
        b->last_time = now;
        b->tokens = DG_TB_PACKET_COST * rate;
    }
    if (__sync_bool_compare_and_swap(tb, old_packed, packed)) {
        return b->tokens;
    }
    return 0;
}
// Списание токенов происходит следующим образом:
bool dg_consume(void* bucket, u64 now_ns) {
    struct dg_tb_value* tb = bucket;
    s32 tokens = dg_tb_consume_or_refill(
        &tb->time_and_tokens,
        (u32)(now_ns/DG_TO_SANTI_DIV),
        tb->rate,
        tb->burst,
        tb->max_time_slice
    );
    return tokens >= DG_TB_PACKET_COST;
}

Отлично! У нас есть рабочая реализация, которая выдержала нагрузочное тестирование.

Инструменты нагрузочного тестирования

Отдельно стоит отметить то, чем я тестировал. На самом деле, когда я создавал самую первую реализацию описанного выше TB, у меня не было хорошего инструмента для теста в реальных условиях. Да и как я упомянул ранее, симулятора тогда тоже не существовало. Что‑то из этого я доработал, что‑то написал с нуля, когда уже занимался созданием HTB.

У нас есть инструмент под названием bang, которым можно выпустить в сторону тестируемого сервера очень большой RPS в разных режимах. Один из них — это DNS. В этом режиме нагрузочного тестирования важно получать ответы от тестируемого сервера и измерять время ответов.

Bang способен без особых проблем отправить и принять в режиме DNS до 9 миллионов запросов в секунду с одного сервера. Если быть точным, то с одного IP‑адреса удаётся отправить 600–800 тыс. запросов без локальных дропов при отправлении. Чтобы выжать больше, нужно добавить IP‑адреса на сервер‑источник нагрузки.

Так как у нас IPv6 only среда, это не проблема: добавляем 20 адресов и получаем те самые 9 миллионов запросов. Источник для запросов — pcap, собранный для нужного типа нагрузки (UDP/DNS/HTTP).

В другой своей статье я упоминал Pandora c плагином pandora‑dns как основной инструмент для нагрузочного тестирования DNS‑серверов в Яндексе. Однако у неё есть проблемы: Pandora выдаёт максимум(!) 80 000 запросов в секунду на сервер‑источник, что очень сильно затрудняет тестирование XDP‑программ. В этом случае нужно поднимать по несколько десятков серверов и подавать нагрузку с кластера, а это долго и неудобно.

Как мне удалось научить bang отправлять 9 миллионов запросов и правильно считать тайминги и таймауты — это отдельная история. Если кратко, то мне пришлось переиспользовать ID DNS‑пакета для замера таймингов и для маркировки групп запросов, чтобы посчитать тайм‑ауты. Также стоит отметить, что bang написан на Rust и использует Tokio как основной рантайм.

Но это ещё не всё. Для реально крутых проектов типа YANET bang умеет выполнять нагрузочное тестирование в режиме DPDK и выдавать по 40 миллионов запросов любыми pcap.

Использование атомик флага — v2

Пока я тестировал и отлаживал первый вариант, мне пришла идея использовать локи на основе атомиков. Для этого даже есть специальные встроенные функции: __sync_lock_test_and_set и __sync_lock_release. В ядре 5.15 в XDP‑программах eBPF они доступны.

Лок не блокирует поток исполнения, а вместо этого показывает, можно ли зайти в ветку кода обновления стейта (критическую секцию) TB. Не уверен, можно ли назвать этот флаг локом, но в имени этих функций и в документации вторая функция упоминает его как lock.

void __sync_lock_release (type *ptr, ...)

This builtin releases the lock acquired by __sync_lock_test_and_set. Normally this means writing the constant 0 to *ptr.
This builtin is not a full barrier, but rather a release barrier.

Взяв этот лок, код может работать с критической секцией кода. Обычно такое происходит, когда в бакете закончились токены. Если тред не получил нужное количество токенов и не смог зайти в критическую секцию, то пакет сбросится (DROP).

Первоначальный вариант (который я не стал отлаживать) выглядел как набор багов. Я взялся за отладку только, когда наступила необходимость реализовать HTB. Но как и в первой итерации, во время отладки второго варианта я изобрёл третий, и он получился настолько интересным, что вторая реализация так и не поехала в продакшен.

Код реализации покажу ниже. Можно заметить, что стейт для второго варианта TB достаточно большой, что уже не подходило под требование первой реализации.

Код реализации
typedef struct {
    u32 guard;
    s32 packet_cost;
    u64 ts;
    u64 cap;
    s64 value;
    s64 stage;
} Bucket;

static __always_inline E_DG_Verdict dg_tb_consume_or_refill(Bucket* tb, u64 now) {
    if (tb->value >= tb->packet_cost) {
         if (__sync_add_and_fetch(&tb->value, -tb->packet_cost) >= 0) {
             return DG_PASS;
         }
    }
    // critical section START
    if (__sync_lock_test_and_set(&tb->guard, 1) == 0) { // acquire the barrier
        if (now > tb->ts) {
            s64 add = MIN(now - tb->ts, tb->cap);
            tb->ts = now;

            add += tb->stage;

            if (tb->value < 0) {
                add += tb->packet_cost;
            } else if (tb->value > 0) {
                add += tb->value;
                tb->value = 0;
            }

            if (add >= tb->packet_cost) {
                tb->stage = 0;

                add = MIN(add, tb->cap) - tb->packet_cost;
                __sync_lock_test_and_set(&tb->value, add);
                __sync_lock_release(&tb->guard); // release
                // critical section END
                return DG_PASS;
            } else {
                // Эти дополнительные пляски ~~с бубном~~ со stage нужны,
                // чтобы не потерять токены, которых не хватает на один пакет, но которые
                // уже насчитались после обновления tb->ts. Если их прибавить сразу к
                // tb->value, то они спишутся в никуда следующим пакетом (их не хватит на DC_PASS).
                tb->stage = add;
            }
        }
        __sync_lock_release(&tb->guard); // release
        // critical section END
    }
    return DG_DROP;
}

// Списание токенов очень простое:
bool dg_consume(void* bucket, u64 now_ns) {
    Bucket* tb = bucket;
    E_DG_Verdict verdict = dg_tb_consume_or_refill(tb, now_ns);
    return verdict == DG_PASS;
}

Основной плюс второй версии в том, что в ней нет деления на каждый запрос (вообще нет деления). Это увеличило производительность почти в два раза.

Hierarchical Token Bucket (HTB)

Вот и настал момент реализации HTB в XDP. Именно тогда появился симулятор и был отлажен TB‑v2.

Я начал думать над тем, как же реализовать HTB в среде с такими жёсткими ограничениями. Одна из неявных целей, которую я себе поставил, — реализовать HTB, не уступающий текущему решению limit_any по производительности. Это было что‑то вроде личного вызова, который давал мне больше мотивации.

В первую очередь, нужно было избавиться от инструкции деления, которая присутствовала в v1 и выполнялась на каждый запрос. Это было обязательное условие, потому что в HTB на каждый запрос нужно было проверять минимум два TB и, соответственно, производительность должна была снизиться в два раза.

Всё началось с отладки TB‑v2 и попытки реализовать идею HTB с переливкой квоты. Честно говоря, я долго и безуспешно искал реализацию HTB на счётчиках в интернете, параллельно читал литературу про TB и устройство HTB.

Требования к реализации HTB

Напомню ещё раз, что в XDP нет возможности завести очередь пакетов. Также есть сложности с локами, которые я, кажется, победил ранее. И профилирование решения v1 ставит нас перед фактом, что даже такая базовая операция, как деление, должна быть исключена из реализации.

С другой стороны, все реализации HTB, которые я нашёл, используют глобальные локи, много деления и, что самое главное, очереди!

Обсуждая требования к решению, мы с командой пришли к следующему. У нас будет простая двухуровневая иерархия.

#================== global =================#
#[ wk1 | wk2    | wk3 |  other             ]#
#===========================================#

На картинке ниже отражено, как будет происходить взаимодействие внутри HTB между разными квотами.

  • глобальный лимит (Global) отмечен пунктирной линией;

  • лимит для всех остальных — Other;

  • выделенные лимиты (WK, или Well Known) — A, B, C;

  • гарантированная квота (limit) отмечена заливкой цветом;

  • Burst, ограниченные по времени и по количеству запросов (на шаге 4 он отмечен особым образом);

  • Burst не влияют на квоту других бакетов.

Что происходит на картинке:

  1. Квоты A, B, C используют свои лимиты не полностью, и поэтому есть свободные ресурсы, отмеченные белым цветом.

  2. Использование квоты А постепенно растёт, но не превышает limit этой квоты.

  3. В какой‑то момент квота A начинает использоваться сверх limit.

  4. В этот момент квота использует свой накопленный Burst. Он ограничен capacity бакета и использованием других квот.

  5. Если Burst ещё не использован, но квоты B и C начали использоваться на полную, то Burst в квоте A прекращается. В этот момент все квоты могут использовать только свой limit.

Замечание по шагу 5: так как квота Other действует так же, как и все остальные, она тоже может отдать свои свободные ресурсы под Burst других квот.

Глобальный лимит на машинку — это первый уровень. Он определяет доступный ресурс этой машинки. Глобальный лимит нельзя пробить — у этого TB нет видимого Burst, по сути это Leaky Bucket As a Meter.

Выделенные квоты и квота для всех остальных — это второй уровень, но позже внутри Other появился ещё один уровень. Фактически Other стал аналогом реализации limit_any — того, что у нас было до внедрения HTB.

Лимиты на количество запросов в каждой квоте (каждом TB) жёсткие, если не учитывать Burst. То есть их пробить нельзя и нет шаринга квоты (увы, он противоречит тому, что мы хотим).

Поверх лимитов для каждой квоты есть стандартная фишка TB — это Burst capacity (или по‑простому размер бакета, в котором находятся токены). Другими словами, если TB недоиспользует свой лимит, у него накапливаются свободные токены. В какой‑то момент TB может потратить все эти токены, даже пробивая свой лимит RPS. При этом TB не может помешать другим использовать их лимиты, то есть использование Burst выполняется только за счёт свободной квоты соседних TB. Это своего рода Soft Burst.

Внутри Other потребители конкурируют как придётся. Там нет каких‑то определённых правил — кто первый захватил токен, тот и прав. Но внутри Other есть лимит для каждого, который не позволяет одному потребителю «съесть» всю квоту этого TB.

Замечу, что доработка Other для внедрения limit_any произошла уже после переноса реализации HTB в код eBPF‑программы, поэтому тут этого кода не будет.

И, надеюсь, это сумбурное описание строит целостную картинку того, что нам придётся реализовать.

Идея с переливкой квоты

На первом этапе я стал думать про идею реализации HTB, где недоиспользованная квота в TB переливается в выделенный TB для шаринга квоты. Важно заметить, что на верхнем уровне есть глобальный TB, который должен был обеспечить гарантированный global limit без Burst.

Я не хотел делать нелимитированный шаринг, поэтому эта идея не была окончательным вариантом. Основная мысль тут в том, что в случае нехватки квоты в каком‑то TB, он может попытаться взять её из выделенного TB избытка. Сам избыток при этом тратится. Если избытков нет, то шаринга не происходит.

Избытки накапливаются только тогда, когда есть TB, квота которого недоиспользуется. То есть поток запросов, проходящий через этот бакет, недостаточен, чтобы своевременно использовать токены из ведра. Чтобы лимитировать потребление при шаринге, я хотел использовать Burst TB как лимит для использования избытков.

Если забыть про TB избытков, то набор TB будет составлять какую‑то квоту, и сверху неё будет Burst, который без лимитирования пробьёт глобальный лимит. Если разрешить использовать Burst только в рамках TB с избытками, кажется, что лимит пробить невозможно.

На бумаге идея выглядит хорошо, но на практике я забыл о важном моменте TB As a Meter. В каждый момент токены в TB избытков инвалидируются в плане справедливости использования этих токенов. В итоге накопленный TB избытков будет приводить к тем же самым пробитиям квоты, как и без специально выделенного TB.

Пытаясь отладить реализацию HTB с переливкой, я окончательно убедился, что без симулятора мне не обойтись. Так я приступил к его реализации.

Симулятор

Симулятор получился довольно простой, в стиле Single File Library. Написан он на C, потому что код BPF — это подмножество языка C. Так что особого выбора не было.

Симулятор состоит из нескольких частей:

  • REPL — пульт управления симуляцией;

  • тред сбора статистики TB;

  • тред тюнинга параметров TB;

  • множество тредов потребителей, которые эмулируют реальную нагрузку на TB.

make
$(IMPL): Build the simulator for the $(IMPL) token bucket implementation
help: Optional vars IMPL=<tb implementation> OUTPUT=<pipe for metrics output> SCRIPT=<scenario script>
run: Run the REPL simulator (default IMPL=tb-old)
script: Run the simulator with a script (default SCRIPT=scripts/example.sh IMPL=tb-old)

Для проверки TB нужно реализовать интерфейс, который предоставляет симулятор, и написать сценарий тестирования. Или запустить REPL и интерактивно проверить, как ведёт себя реализация TB под различной нагрузкой.

Сверху расположено окно tlook. Три окна снизу: левое с текстом тестового скрипта, среднее с выводом лога симулятора и правое с запущенным htop для отслеживания потребления CPU симулятором
Сверху расположено окно tlook. Три окна снизу: левое с текстом тестового скрипта, среднее с выводом лога симулятора и правое с запущенным htop для отслеживания потребления CPU симулятором

Изобретение на основе идеи EDT — v3

В v2 мне не нравились две вещи:

  • Слишком большой стейт TB и слишком много кода в критической секции. Интуиция подсказывала мне, что должно быть более простое решение!

  • Конкуренция на лок при DROP. Это значит, что в момент, когда алгоритм дропает больше, нагрузка на CPU увеличивается из‑за конкуренции на доступ к критической секции (постоянные попытки взять лок).

Второй момент было видно по потреблению CPU при нагрузочном тестировании и в симуляторе. Kernel‑потоки ksoftirqd (которые занимаются обработкой eBPF‑программ) уходят в 100% загрузку ядер намного быстрее, чем ожидалось.

Ранее я слышал, что в сетевой подсистеме Linux внедрили какой‑то интересный алгоритм. Он позволил отказаться от глобального лока при лимитировании исходящих пакетов. Речь о EDT (Earliest Departure Time). У него есть интересная концепция «Колеса времени» (нет, это не про то, о котором писал Кастанеда).

Ещё я случайно (честно) вспомнил про видео с netdev 0×14, которое также по интересной случайности смотрел за полгода до событий, описанных в этой статье. В этом видео рассказывается, как в Google заменили HTB на EDT. Этот материал весьма релевантен за исключением того, что мне в XDP нужно принять решение о судьбе пакета ещё до захода в сетевой стек ядра. Поэтому у меня просто нет очередей, EDT‑меток и sk_buff‑структур — то есть нет ничего, что использовалось в указанном видео.

Уперевшись в тот факт, что реализация с переливкой не работает, я бесконечно думал: где же выход из этой ситуации? Где та часть, которую я упустил и которая позволит мне сложить весь пазл в рабочую программу?

У меня обычно случается так, что если я не вижу явного решения и не могу с ходу решить проблему, я перехожу в стадию решения её на «подсознательном уровне». Мне нужно поставить перед собой проблему, подумать над её решением, насколько хватит сил, и прекратить явное обдумывание на некоторое время. И тут в дело вступит «подсознание», которое недоступно нам для управления на сознательном уровне, но может предоставить ключевую часть для решения пазла. Обычно это выглядит как: «Ух ты! Меня осенило!» В это время важно заняться чем‑то, не связанным с рабочей задачей.

И в таком режиме мне пришла в голову бесконечно простая и красивая идея. Зачем я пытаюсь копить токены в каком‑то ведре, ведь временная метка может выполнять одновременно функцию ведра!

Now — текущий timestamp хранит за меня система, а я могу просто отражать факт использования токенов, прибавляя стоимость пакета к временной метке.

Остаётся только проверить, что временная метка не вышла за горизонт событий. Шутка! За горизонт, который будет являться одной из границ capacity бакета. Вторая граница — это now. 

В итоге я упаковал три параметра в один, три 64-битных числа из v2-реализации — в одно. Как говорится, лучше один раз увидеть, чем сто раз прочитать, поэтому вот картинка, наглядно показывающая всю магию. Весь изменяемый стейт, который нужно синхронизировать между тредами, хранится в одном 64-битном числе EDT!

Описание алгоритма

Видео, наглядно показывающее работу алгоритма

Временная линия — это бесконечная прямая, простирающаяся в обе стороны. Граница time now и horizon всегда движется по временной линии в сторону будущего со скоростью 1e9 наносекунд в секунду (другими словами, со скоростью времени).

Каждый новый пакет толкает edt вперёд, заставляет его догонять horizon. Если при попытке пропустить новый пакет окажется, что EDT будет пересекать horizon, пакеты начинают отбрасываться и EDT перестаёт толкаться вперёд.

Получается, если EDT никто не толкает, то time now очень быстро его догоняет и потом оставляет EDT в прошлом — в отрицательном диапазоне usage. Если TB находит себя в таком состоянии — это индикатор очень низкого RPS‑потребления, и TB просто двигает EDT прямо на time now.

Так как в коде первый раз usage рассчитывается без атомарного чтения, он всегда будет немного устаревать. Но это не проблема, потому что пока EDT не пушится слишком быстро и usage отрицательный, можно быть уверенным, что использование квоты сверх лимита невозможно. Если usage уже больше capacity, просто дропаем пакет. В следующий раз (для следующего пакета) time now приблизится к EDT и usage окажется в диапазоне [0-capacity). В этом случае сработает пуш EDT и автоматически атомарная перепроверка usage. Далее на основе этого значения принимается точный вердикт о судьбе пакета.

Время за нас наполняет ведро токенами (двигает EDT влево), а мы их используем для пропуска пакетов. Лишние токены переливаются через край (в коде ветка if (usage < 0)).

Итоговая реализация получилась настолько простой, что я даже потрудился добавить туда комментариев :) 

typedef struct Bucket {
    s64 edt;
    u64 packet_cost;
    u64 cap;
} Bucket;

static __always_inline E_DG_Verdict dg_tb_consume_or_refill(Bucket* tb, u64 elapsed) {
    s64 usage = tb->edt - (s64)elapsed;
    if (usage < 0) {
        // If the current rate is too low, we need to actualize tb->edt
        // in order to meet the burst constraints.
        __sync_bool_compare_and_swap(&tb->edt, tb->edt, elapsed + tb->packet_cost);
        return DG_PASS;
    } else if (usage > (tb->cap - tb->packet_cost)) {
        // If the current rate is too high just drop the packet without moving tb->edt
        return DG_DROP;
    }
    // If there is a good chance that the packet will be passed, we need to
    // advance tb->edt and recheck whether the current rate is within the limit.
    usage = __sync_fetch_and_add(&tb->edt, tb->packet_cost) - (s64)elapsed;
    if (usage > tb->cap) {
        return DG_DROP;
    }
    return DG_PASS;
}

bool dg_consume(void* bucket, u64 now_ns) {
    Bucket* tb = bucket;
    E_DG_Verdict verdict = dg_tb_consume_or_refill(tb, now_ns);
    return verdict == DG_PASS;
}

Очень приятное открытие, что эта реализация стала ещё быстрее, чем v2.

Запуская пиковые значения нагрузки в симуляторе, я убедился, что теперь граница, после которой CPU уходит в 100% загрузку, намного более предсказуемая и совпадает с моими ощущениями, как это должно работать. Эта реализация уже не светится в perf атомик‑операциями со значимым перевесом по проценту исполнения на CPU. И самое приятное то, что эта реализация при росте нагрузки будет иметь тенденцию к меньшему использованию атомик‑операций, а значит к более простому и быстрому принятию решения о дропе пакета.

Конечно, не может быть идеального решения без какого‑то НО, и всё зависит от размера этого НО.

В данном случае ограничение для использования этого алгоритма незначительное. Оно заключается в том, что в TB всегда должен быть какой‑то capacity (tb→cap) обязательно больше, чем стоимость одного пакета. В противном случае алгоритм будет работать неправильно и начинать дропать сильно раньше заданного capacity. Так как обычно мы не хотим использовать TB c Burst размером в стоимость одного пакета (использовать TB как Leaky Bucket), можно считать, что это ограничение не пересекается с допустимыми режимами работы TB.

Для расчёта минимального capacity для нормальной работы TB‑v3 я эмпирически вывел следующую формулу методом экспоненциальной аппроксимации.

// код на go из control plane
int32(3 * math.Exp(x: math.Log10(x: float64(limit))))

Изобретение tlook и интеграция с симулятором

Отлаживая v3-реализацию в составе HTB, я понял, что без визуализации всего этого дела в графики работать просто невозможно. На самом деле я и до этого пытался строить графики через такие программы, как Ttyplot и Sampler. Последний я пропатчил для своих нужд самым бесчеловечным образом, поэтому предоставление этого патча в upstream даже не рассматривалось.

Но оба эти инструмента либо сильно ограничены, либо паттерн их использования совершенно не подходит мне. Поэтому было решено написать что‑то своё. Так появился tlook.

Идея такой программы витала в воздухе очень давно, и никак не находился случай, достаточно весомый для её реализации. По основным функциональным возможностям, которых я нигде больше не видел, могу отметить следующее:

  • простая передача входных данных для визуализации (в tlook достаточно периодически на stdin программы писать key=value);

  • возможность увеличивать или уменьшать окно просмотра (диапазон отображаемых данных) и размер истории;

  • постановка на паузу и возможность двигать окно просмотра по истории;

  • курсор, которым можно просматривать значения за определённый отрезок времени;

  • различные режимы отображения данных (scale) как в Grafana (в tlook есть два — исходные данные и asinh);

  • скрытие легенды и осей координат.

Чтобы интегрировать tlook с симулятором, не пришлось изобретать что‑то неординарное — хватило старого доброго named pipe. Его создание я добавил в Makefile симулятора, поэтому использование связки tlook + simulator стало просто как запуск двух программ в двух разных терминалах.

Выше пример использования связки simulator + tlook. В одном окне запущен симулятор в режиме REPL, в другом tlook читает вывод статистики на pipe и красиво отрисовывает графики. Я могу видеть в интерактивном режиме, как изменения разных параметров бакетов влияют на их поведение. Также я могу менять нагрузку и количество тредов, подающих нагрузку.

Какой HTB получился в итоге

Идея с переливкой квоты не сработала, и я решил взять запасной вариант, который существовал с самого начала этой задачи. Берём глобальный TB (далее — Global), по два бакета (пары) для каждой выделенной квоты: один будем называть Limit, а второй — BB (Burst Bucket). Такую же пару для квоты Other.

Один из пары бакетов будет контролировать заданный лимит, а второй — мягкий Burst. Мягким я его называю, потому что у него не будет возможности пробить Global-лимит или повлиять на квоту других пар. В такой реализации пара для Other действует так же, как выделенная пара, поэтому в симуляторе у Other не было никакой особенной логики. Уже после переноса в eBPF я дорабатывал обработку пары Other, но, как я упоминал ранее, там логика из одного if и просто копирует старое поведение limit_any.

Остаётся правильно организовать списание токенов в каждом бакете, и всё готово. 

Конечно, при переносе в eBPF этот код претерпел некоторую трансформацию, но основная идея осталась та же.

Итоговый код
typedef struct Bucket {
    s64 edt;
    u64 packet_cost;
    u64 cap;
    struct Bucket* bb;
} Bucket;

Bucket* global = NULL;

static __always_inline E_DG_Verdict dg_tb_consume_or_refill(u64 elapsed, s64* edt, s64 capacity, s64 packet_cost, bool force_take) {
    // Реализация этой функции есть выше. Изменился только способ передачи аргументов, код реализации тот же самый.
}

// После нахождения пары для конкретного запроса в каком-то внешнем хранилище выполняем функцию проверки HTB 
static __always_inline E_DG_Verdict dg_htb_consume_or_refill(Bucket* tb, u64 elapsed) {
    // Проверяем бакет для квоты, если Limit разрешает пропустить пакет,
    E_DG_Verdict my_v = dg_tb_consume_or_refill(elapsed, &tb->edt, tb->cap, tb->packet_cost, false);
    if (my_v == DG_PASS) { //… то пропускаем,
        // но перед этим ЯВНО списав токены с Global и с BB.
        // ЯВНО — это значит, что списание контролируется доп. параметром и в случае явного списания 
        // мы всегда добавляем стоимость пакета к `tb->edt`, даже если результат получится DG_DROP
        dg_tb_consume_or_refill(elapsed, &global->edt, global->cap, global->packet_cost, true);
        dg_tb_consume_or_refill(elapsed, &tb->bb->edt, tb->bb->cap, tb->bb->packet_cost, true);
        return DG_PASS;
    }

    // Если Limit не разрешает пропустить, то пробуем использовать Burst через BB.
    // Для этого проверяем, пропускает ли BB пакет:

    E_DG_Verdict burst_v = dg_tb_consume_or_refill(elapsed, &tb->bb->edt, tb->bb->cap, tb->bb->packet_cost, false);
    if (burst_v == DG_PASS) {
        // Если BB пропускает пакет, проверяем Global Bucket:
        E_DG_Verdict global_v = dg_tb_consume_or_refill(elapsed, &global->edt, global->cap, global->packet_cost, false);
        if (global_v == DG_DROP) {
            // В случае дропа со стороны Global нам нужно вернуть стоимость пакета BB,
            // чтобы Burst не сжигался дропами.

            // Return the burst token if the global limit doesn't allow bursting.
            __sync_fetch_and_add(&tb->bb->edt, -tb->packet_cost);
        }
        // Если и Global пропустил, то пропускаем пакет.
        return global_v;
    }
    // Если пакет не пропускается, то дропаем.
    return DG_DROP;
}

В таком режиме Burst не будет отъедать квоту у Limit‑бакета в других парах, потому что если бакет Limit пропускает пакет, его судьба уже решена и остальной код никак на это повлиять не может. Кажется, что из‑за этого может быть пробитие Global‑лимита, но это не так. Global всегда списывается в случае потребления Limit и тем самым не позволяет BB потребить квоту сверх нормы. Такую конструкцию можно достаточно просто расширить на большее количество уровней.

Я считаю, что реализация HTB получилась не сильно сложнее реализации одного TB‑v2 без иерархии. И по финальным тестам производительности она работает не хуже реализации limit_any с TB‑v1!

Комментарии (0)