Ничто не предвещало беды, просто очередная минорная фича в проекте. Необходимо ограничить количество вызовов определенного функционала внутри сервиса.

Если бы это была классическая задача защиты от DDoS на уровне сервиса/системы в целом, то она решилась бы стандартными средствами на уровне инфраструктуры. Но в нашем случае задача была проще:

  1. В рамках одного сервиса обеспечить вызов конкретного функционала строго не более 100 раз в секунду;

  2. Ограничение должно соблюдаться не в рамках каждой последовательной секунды, а в рамках любого произвольно выбранного секундного интервала;

  3. Неудачники нам не нужны Троттлинг не требуется, запросы которые не успели - отбрасываются;

Второй пункт требует некоторого пояснения, и пояснять его проще графически. Для этого посмотрим на два профиля нагрузки запросов в течение 2-х секундного интервала:

Пример 1. Некорректная работа ограничения нагрузки
Пример 1. Некорректная работа ограничения нагрузки

В данном случае в рамках интервалов A и С ограничение сработало как надо, но с интервалом B проблема, в нем количество запросов превышает допустимое значение.

Пример 2. Корректная работа ограничения нагрузки
Пример 2. Корректная работа ограничения нагрузки

Собственно требования достаточно понятны, и, не откладывая в долгий ящик, разработка фичи началась...

Поиск пути

Что делает нормальный Java разработчик, которому нужна достаточно стандартная функциональность? Правильно - ищет соответствующую аннотацию во фреймворке, на котором написан проект =)

Поверхностный гуглеж показал, что такой функционал "из коробки" не доступен.

Что делает опытный Java разработчик, когда понимает, что одной аннотацией тут не обойдешься? Правильно - ищет подходящую библиотеку. И лучше, чтобы она была интегрирована с вышеупомянутым фреймворком.

Второй этап гуглежа (на самом деле еще в первом все нашлось, но ради логики повествования пусть будет второй) показал, что таких библиотек есть даже несколько. А наиболее популярных 2 штуки:

bucket4j - отличная библиотека, которая позволяет реализовывать в том числе и распределенный Rate Limiter, имеет кучу встроенных интеграций и всяческого дополнительного функционала.

guava RateLimiter - часть библиотеки guava от уважаемого автора. Существенно меньше фич, не умеет в распределенку, но для текущих целей вполне подходит.

Два кандидата это конечно лучше чем ничего, но хуже чем один, ибо надо как-то выбирать.

Выбор пути

Что делает прагматичный Java разработчик когда ему требуется выбрать какую-то реализацию? Правильно - тестирует все варианты и выбирает более производительный, менее затратный по ресурсам. Конечно из тех, которые в принципе изначальным требованиям удовлетворяют.

Итого, нам требуется 2 вида тестов:

  • Тест, который подтвердит корректность ограничения потока запросов в соответствии с нашим изначальным требованием №2;

  • Тест на производительность, чтобы посмотреть, что нам будет стоить использование той или иной библиотеки;

Тестируем корректность

Для этого мы:

  1. Настроим кандидатов на нужный сценарий - в нашем случае ограничение в 100 обращений за секунду;

  2. К каждому будем обращаться каждые ~8 мс так, чтобы точно получить больше 100 обращений в секунду;

  3. Фиксировать метку времени каждый раз, когда алгоритм кандидата будет возвращать true на обращение;

  4. Все зафиксированные метки времени складывать последовательно в список;

  5. Потом пройдемся по списку окном в 1 сек и посмотрим сколько в каждом окне было согласованных обращений.

Для наглядности давайте графически представим процесс валидации потенциального результата работы алгоритма при ограничении не больше 5 запросов в секунду и длительности теста 2 секунды:

Валидация результата
Валидация результата

Окна выделяем так, чтобы концом окна служила метка времени из списка и выбираем максимум доступных окон.

В приведенном примере видно, что 2 из пяти окон не проходят проверку в контексте данного примера.

Первое что сделаем - выделим общий интерфейс, чтобы можно было написать общий алгоритм теста:

public interface TestRateLimiter {
    boolean tryAcquire();
}

Много нам от него не надо, пусть просто в каждый конкретный момент времени говорит, допустима операция или нет.

Реализуем этот интерфейс для наших кандидатов:

bucket4J:

public class Bucket4jTestLimiter implements TestRateLimiter {

    private final Bucket bucket;

    public Bucket4jTestLimiter(int capacity, Duration interval) {
        bucket = Bucket.builder()
                .addLimit(Bandwidth.simple(capacity, interval))
                .build();
    }

    @Override
    public boolean tryAcquire() {
        return bucket.tryConsume(1);
    }
}

guava RateLimiter:

public class GuavaTestLimiter implements TestRateLimiter {

    private final RateLimiter limiter;

    public GuavaTestLimiter(int count, Duration interval){
        limiter = RateLimiter.create(count);
    }

    @Override
    public boolean tryAcquire() {
        return limiter.tryAcquire(1);
    }
}

Фиксировать временные метки будем используя Instant . Исходя из этого напишем метод валидации полученного перечня временных меток:


private void validate(List<Instant> grants) {

    var first = 0;

    for (int i = 0; i < grants.size(); i++) {
        var current = grants.get(i);

        //-- Вычисляем первый элемент в окне для очередной метки
        while (Duration.between(grants.get(first), current).toMillis() >= WINDOW_MILLIS){
            first++;
        }

        var count = i - first + 1;

        if (count > MAX) {
            fail("Second %s - %s has %s operations when max %s".formatted(
                    current.minus(Duration.ofSeconds(1)),
                    current,
                    count,
                    MAX
            ));
        }
    }
}
  • WINDOW_MILLIS - размер нашего окна в милисекундах;

  • MAX - максимально допустимое количество операций в рамках окна;

Внимательный читатель заметил, что в начале алгоритм "растит" окно до секунды и потом его двигает, так что мы начнем с проверки мелких и не целевых окон, но на конечный результат это не влияет, так что в данном случае такое поведение нас вполне устраивает.

Теперь метод с основной логикой теста:

private void testCorrectnessOf(TestRateLimiter limiter) {

    var list = new ArrayList<Instant>();

    for (int i = 0; i < CYCLES; i++) {

        if (limiter.tryAcquire()) {
            list.add(Instant.now());
        }

        sleep(CYCLE_INTERVAL);
    }

    validate(list);
}
  • CYCLES - количество итераций

  • CYCLE_INTERVAL - задержка после каждой итерации в миллисекундах

  • sleep - вспомогательная функция , которая по факту просто оборачивает Thread.sleep для целей лучшей читаемости метода:

private void sleep(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ignored) {
    }
}

Еще нам нужны данные для тестов:

long     WINDOW_MILLIS   = 1000; // Размер окна в миллисекундах
Duration WINDOW          = Duration.ofMillis(INTERVAL_MILLIS); // Размер окна
int      MAX             = 100;  // Максимальное количество операций
int      CYCLES          = 1000; // Количество итераций в цикле теста
long     CYCLE_INTERVAL  = 8;    // Таймаут цикла теста

И, наконец, сами тесты:

@Test
public void bucket4j_correctness_test() {
    testCorrectnessOf(new Bucket4jTestLimiter(MAX, WINDOW));
}

@Test
public void guavaRateLimiter_correctness_test() {
    testCorrectnessOf(new GuavaTestLimiter(MAX, WINDOW));
}

Барабанная дробь в течение 20 секунд работы теста и...

Оба свалились!

То есть каждый допустил больше сработок чем настроено. Работа над фичей перестает быть томной.

Раз такое дело, давайте посмотрим, насколько промахиваются оба варианта. Для этого немного перепишем метод валидации. Мы добавим расчет и распечатку основных метрик для проверяемого списка меток времени, а именно:

  • Количество одобренных операций (Grants);

  • Количество секундных окон, найденных процедурой валидации (Windows);

  • Минимальное количество операций в секунду (Minimum);

  • Среднее количество операций в секунду (Average);

  • Медиана по операциям в секунду (Median);

  • 90 перцентиль операций в секунду (90th percentile);

  • Максимальное количество операций в секунду (Maximum);

Сам метод уберу под спойлер
private static void validate(List<Instant> grants) {

    var first   = 0;
    var windows = new HashMap<Instant, Integer>();

    for (int i = 0; i < grants.size(); i++) {
        var current = grants.get(i);

        while (Duration.between(grants.get(first), current).toMillis() >= Data.WINDOW_MILLIS) {
            first++;
        }

        var start = grants.get(first);
        var count = i - first + 1;

        //-- Привязываем к первому событию в окне
        //   чтобы исключить вложенные окна
        windows.put(start, count);
    }

    //-- Сортируем окна по количеству операций в них
    var sorted = windows.values()
                .stream()
                .sorted()
                .toList();

    //-- Количество окон
    var cnt = sorted.size();

    //-- Минимальное количество операций в секунду
    var min = sorted.getFirst();

    //-- Среднее количество операций в секунду
    var avg = (int) sorted.stream().mapToInt(x -> x).average().getAsDouble();

    //-- Медиана по операциям в секунду
    var p50 = sorted.get(sorted.size() / 2);

    //-- 90-й перцентиль операций в секунду
    var p90 = sorted.get((int) (sorted.size() * 0.9));

    //-- Максимальное количество операций в секунду
    var max = sorted.getLast();

    System.out.println("Grants           : " + grants.size());
    System.out.println("Windows          : " + cnt);
    System.out.println("Minimum          : " + min);
    System.out.println("Average          : " + avg);
    System.out.println("Median           : " + p50);
    System.out.println("90th percentile  : " + p90);
    System.out.println("Maximum          : " + max);

    if (max > Data.MAX) {
        fail("Maximum number of operations exceeded");
    }
}

И получаем следующие результаты:

bucket4j

guava RateLimiter

Grants

997

903

Windows

848

701

Minimum

109

94

Average

111

100

Median

112

101

90th percentile

113

101

Maximum

114

107

Такие цифры мы получили на исключительно синтетическом, причем однопоточном тесте и, скорее всего, при боевой нагрузке в большинстве сценариев все будет работать как надо. Но в нашем случае есть строгое ограничение, и выходить за него нельзя.

Получается, что мы зашли в тупик...

Поиск альтернативного пути

Что делает разочарованный Java разработчик, когда библиотека работает не так, как ожидалось?

Что вы говорите: "Читает документацию к ней"? - ну нет, не та еще стадия отчаяния :) "Ищет другую библиотеку"? - этап поиска уже пройден.

Правильно - пишет свой костыль!

И ведь действительно, нам не нужно общее решение с троттлингом, множественными ограничениями, распределенной работой или возможностью в реальном времени добавить токенов в бакет. Нам нужен простой и надежный Rate Limiter, который удовлетворяет нашим требованиям.

Эта задача как специально создана для собеседований - простой алгоритм, основанный на стандартной структуре данных, а именно на очереди.

Итак, у нас есть очередь, в которую мы будем складывать временные метки допущенных нашим Rate Limiter операций. А сам алгоритм прост как:

  • Получить текущую метку времени;

  • Вычесть из нее размер временного окна алгоритма (в нашем случае секунду);

  • Из начала очереди удалять элементы пока они старше полученной на предыдущем шаге временной метки. Таким образом, в очереди останутся только метки операций, допущенных за последнюю секунду с текущего времени;

  • Если в очереди после удаления "старых" временных меток останется элементов меньше, чем наше предельное значение - добавляем текущую временную метку и возвращаем true, в противном случае false.

Таким образом, мы реализует принцип "скользящего окна" (хорошая статья с обзором различных алгоритмов ограничения частоты запросов). Он простой как палка и в требуемой степени надежен.

Реализация на чистой Java без привлечения сторонних библиотек занимает меньше 30 строчек:

public class QueueLimiter {

    private final Queue<Long> queue = new LinkedList<>();

    private final int  limit;
    private final long windowMs;

    public QueueLimiter(int limit, Duration window) {
        this.limit    = limit;
        this.windowMs = window.toMillis();
    }

    public boolean tryAcquire() {
        var timestamp    = System.currentTimeMillis();
        var minTimestamp = timestamp - windowMs;

        while (!queue.isEmpty() && queue.peek() < minTimestamp) {
            queue.poll();
        }

        if (queue.size() < limit) {
            queue.add(timestamp);
            return true;
        } else {
            return false;
        }
    }
}

Давайте же скорее посмотрим, что нам выдаст тест на корректность. Для этого проводим те же манипуляции, которые мы делали для bucket4j и guava RateLimiter.

Реализация тестового интерфейса:

public class QueueTestLimiter implements TestRateLimiter {
    private final QueueLimiter limiter;

    public QueueTestLimiter(int capacity, Duration interval) {
        limiter = new QueueLimiter(capacity, interval);
    }

    @Override
    public boolean tryAcquire() {
        return limiter.tryAcquire();
    }
}

Сам тест:

@Test
public void queueLimiter_correctness_test() {
    testCorrectnessOf(new QueueTestLimiter(MAX, WINDOW));
}

И долгожданные результаты:

bucket4j

guava RateLimiter

QueueLimiter

Grants

997

903

900

Windows

848

701

755

Minimum

109

94

87

Average

111

100

99

Median

112

101

100

90th percentile

113

101

100

Maximum

114

107

100

В целом оно и понятно - специально заточенный под задачу алгоритм ее и решает.

Но что-то мы отвлеклись, мы же хотели еще проверить производительность кандидатов. И теперь, когда на старт выходит собственноручно написанная реализация, это должно быть еще интереснее.

Для тестирования будем использовать jmh с включенным профилированием GC.

Сам бенчмарк уберу под спойлер
@State(Scope.Benchmark)
@Fork(value = 3, warmups = 3)
@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
public class ThroughputTest {

    interface Data {
        int      LIMIT  = 100;
        Duration WINDOW = Duration.ofSeconds(1);
    }

    private Bucket       bucket4j;
    private QueueLimiter queueLimiter;
    private RateLimiter  guava;

    @Setup(Level.Trial)
    public void setUp() {

        bucket4j = Bucket.builder()
                .addLimit(Bandwidth.simple(LIMIT, WINDOW))
                .build();

        queueLimiter = new QueueLimiter(LIMIT, WINDOW);

        guava = RateLimiter.create(LIMIT, WINDOW);
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(ThroughputTest.class.getSimpleName())
                .addProfiler(GCProfiler.class)
                .build();
        new Runner(opt).run();
    }

    @Benchmark
    public void bucket4j(Blackhole blackhole) {
        blackhole.consume(bucket4j.tryConsume(1));
    }

    @Benchmark
    public void queueLimiter(Blackhole blackhole) {
        blackhole.consume(queueLimiter.tryAcquire());
    }

    @Benchmark
    public void guava(Blackhole blackhole) {
        blackhole.consume(guava.tryAcquire(1));
    }
}

На что мы будем смотреть:

  • Производительность (Throughput) - какое количество операций мы можем выполнить в секунду. Измеряется, соответственно, в операциях в секунду (ops/s);

  • Количество аллокаций в секунду (Alloc/sec) - какое количество мегабайт выделяется в секунду работы теста. Единица измерения - Мегабайты в секунду (MB/sec);

  • Количество аллокаций за операцию (Alloc/op) - количество байт, выделяемых на операцию бенчмарка. Единица измерения - байты за операцию (Bytes/op);

  • Количество событий сборки мусора в процессе бенчмарка (GC count) в штуках;

  • Время, затраченное на GC за время бенчмарка (GC time) в милисекундах (ms);

Спустя ~9 минут ожесточенной работы бенчмарка получаем результат:

bucket4j

guava RateLimiter

QueueLimiter

Throughput (ops/s)

13 126 645

19 610 525

26 444 810

Alloc/sec (MB/sec)

800

0,002

0,007

Alloc/op (Bytes/op)

64

≈ 10⁻⁴

≈ 10⁻⁴

GC count

186

≈ 0

≈ 0

GC time (ms)

225

-

-

Казалось бы Wow и всеобщее ликование:

  • Производительность выше остальных;

  • Потребление памяти на минимуме;

Но! Надо быть честным с самим собой, в нашей реализации есть 2 проблемы:

  1. Она не будет корректно работать в многопоточной среде;

  2. Линейная зависимость по памяти от максимально допустимого количества операций. Сейчас у нас настройка была равна 100, и это значит, что в очереди ожидается максимум 100 элементов. Но что если будет больше?

Поставим еще один эксперимент. Сделаем метод tryAcquire synchronized для целей потокобезопасности и увеличим лимит операций до половины производительности - 10 000 000 операций в секунду:

bucket4j

guava RateLimiter

QueueLimiter

Throughput (ops/s)

12 809 305

16 430 450

4 028 388

Alloc/sec (MB/sec)

781

0,002

184

Alloc/op (Bytes/op)

64

≈ 10⁻⁴

48

GC count

192

≈ 0

114

GC time (ms)

240

-

29 956

И вот теперь совсем не Wow, а вполне себе Oops... На фоне конкурентов, которые хоть и потеряли в производительности немного, по памяти вовсе не изменились, а наш вариант потерял порядок производительности и начал потреблять память.

Можно было бы сказать, что нам все равно, ведь нам нужен сценарий 100 операций в секунду, а он работает отлично. Но дело не в сценарии - задета честь!

Оптимизация найденного пути

Для начала заметим, что наш Rate Limiter фактически состоит из двух частей:

  • Очередь определенной емкости, содержащая временные метки с методами:

    • Подрезать начало до определенной минимальной метки;

    • Попытаться добавить новую метку если емкость позволяет;

  • Собственно алгоритм, который занимается получением временных меток, знает про размер окна и обращается к очереди.

При этом основные проблемы производительности именно из-за очереди. Логично спрятать очередь за подходящим интерфейсом, чтобы не зависеть от реализации и иметь возможность экспериментировать с ней, не меняя при этом логику самого Rate Limiter.

После проведения рефакторинга получаем следующий результат:

public class QueueLimiter {

    private final Queue queue;
    private final long  windowMs;

    public QueueLimiter(Queue queue, Duration window) {

        if (queue == null) {
            throw new IllegalArgumentException("queue cannot be null");
        }

        if (windowMs.isNegative() || windowMs.isZero()) {
            throw new IllegalArgumentException("period mist be greater then zero");
        }

        this.queue    = queue;
        this.windowMs = windowMs.toMillis();
    }

    public synchronized boolean tryAcquire() {
        var timestamp    = System.currentTimeMillis();
        var minTimestamp = timestamp - windowMs;

        queue.trimTo(minTimestamp);

        return queue.tryEnqueue(timestamp);
    }

    public interface Queue {

        void trimTo(long minTimestamp);

        boolean tryEnqueue(long timestamp);
    }

    public interface Now {
        long nowMillis();
    }
}

Теперь наш класс умеет принимать экземпляр интерфейса QueueLimiter.Queue, который скрывает за собой конкретную реализацию очереди. Также была добавлена пара проверок.

Реализуем интерфейс очереди для реализации на базе LinkedList:

public class ListQueue implements QueueLimiter.Queue {

    private final int         capacity;
    private final Queue<Long> queue = new LinkedList<>();

    public ListQueue(int capacity) {
        this.capacity = capacity;
    }

    @Override
    public void trimTo(long minTimestamp) {
        while (!queue.isEmpty() && queue.peek() < minTimestamp) {
            queue.poll();
        }
    }

    @Override
    public boolean tryEnqueue(long timestamp) {
        if (queue.size() < capacity) {
            queue.add(timestamp);
            return true;
        } else {
            return false;
        }
    }
}

Теперь нам открыт путь к оптимизации логики очереди. Мы можем реализовать несколько вариантов и сравнивать их между собой.

Прежде чем бросаться в пучину оптимизации необходимо ответить на вопрос - что в текущей реализации нам мешает больше всего?

А беспокоит нас постоянная модификация коллекции, которая влечет за собой не менее постоянную аллокацию новых обеъектов и их утилизацию.

Также вспомним что у нас очередь всегда имеет предельный размер, который ограничен максимально допустимым количеством операций.

А это еще одна типичная задача на собеседовании. Задача про "циклический буфер". И работает это так:

  • Выделяем массив long-ов на максимальное количество операций;

  • Заводим два целочисленных значения - начало и размер очереди;

  • При добавлении элемента кладем значение в массив по индексу, вычисляемому как начало + размер. Если полученный индекс получается больше размера массива - вычитаем из него размер массива, зацикливая таким образом наш буфер;

  • При удалении элемента из очереди сдвигаем начало на единицу (зацикливая при необходимости) и уменьшаем размер на ту же единицу;

  • Следим чтобы "хвост" очереди не догнал "голову";

Так мы получим константную сложность основных операций и полное отсутствие каких-либо аллокаций в процессе работы.

Собственно реализация:

public class ArrayQueue implements QueueLimiter.Queue {

    private final long[] data;

    private int head = 0;
    private int size = 0;

    public ArrayQueue(int capacity) {

        if (capacity <= 0) {
            throw new IllegalArgumentException("capacity must be greater then zero");
        }

        this.data = new long[capacity];
    }

    @Override
    public void trimTo(long minTimestamp) {
        while (size > 0 && data[head] < minTimestamp) {
            head = ++head % data.length;
            size--;
        }
    }

    @Override
    public boolean tryEnqueue(long timestamp) {
        if (size < data.length) {
            data[(head + size++) % data.length] = timestamp;
            return true;
        } else {
            return false;
        }
    }
}

Проведем еще пару тестов производительности. Для ограничении 100 операций в секунду и 10 000 000 операций в секунду:

100 операций в секунду:

bucket4j

guava RateLimiter

QueueLimiter

Throughput (ops/s)

14 950 040

18 298 180

19 380 337

Alloc/sec (MB/sec)

912

0,002

0,002

Alloc/op (Bytes/op)

64

≈ 10⁻⁴

≈ 10⁻⁴

GC count

223

≈ 0

≈ 0

GC time (ms)

189

-

-

10 000 000 операций в секунду:

bucket4j

guava RateLimiter

QueueLimiter

Throughput (ops/s)

14 460 229

16 134 173

17 700 445

Alloc/sec (MB/sec)

882

0,002

0,002

Alloc/op (Bytes/op)

64

≈ 10⁻⁴

≈ 10⁻⁴

GC count

248

≈ 0

≈ 0

GC time (ms)

203

-

-

С точки зрения производительности вопросов, кажется, не осталось.

Итог нашего путешествия

Давайте оценим полученный результат. Посмотрим на его плюсы, минусы и возможности развития (а ведь как водится необходимость может возникнуть внезапно в любой момент).

Плюсы:

  • Точное выполнение поставленных требований;

  • Производительность в требуемом сценарии на уровне популярных библиотек;

  • Никаких сторонних зависимостей;

  • Небольшой объем простого кода, поддержка которого не принесет больших проблем;

Минусы:

  • Используется относительно много памяти. Нам все-таки нужен массив на размер ограничения. Ограничение в 1000 операций потребует 8Kb памяти, и эта зависимость линейна. С одной стороны, при текущих размерах сервисов это не так чтобы много, но нужно учитывать необходимое количество экземпляров Rate Limiter.

Возможности для развития:

  • Инфраструктурная часть - то что отличает поделку в рамках проекта от полноценной библиотеки. Чтобы можно было давать лимитам имена, гибко настраивать и т.д.

  • Поддержка нескольких условий на лимит. Например какая-то логика должна срабатывать не чаще 100 раз в секунду и не более 1000 раз в минуту. Для этого необходимо просто выделить массив на 1000 элементов и завести 2 пары счетчиков начала/размера. Проверять оба, сама логика проверки при этом не изменится. В принципе где 2 там и N =)

  • Распределенный лимит. Для этого необходимо уметь сериализовывать состояние лимита и хранить его в базе/кеше с учетом атомарности изменения. И если атомарность хранения и изменения зависит от используемой базы/кеша, то сериализация должна быть на нашей стороне, и она должна быть максимально дешевой. А поскольку основное состояние у нас хранится в массиве, сериализация его тривиальна.

  • Интеграция с популярными фреймворками так же не составляет труда.

На данный момент это именно проектная поделка, поэтому оформлять в отдельную библиотеку не буду. Однако весь код из статьи доступен по ссылке.

Спасибо за уделенное время.

Комментарии (0)