Однажды каждый C# программист получает на работе задачу по разработке интеграции с внешней системой, где ограничена максимальная частота запросов в секунду.

Интернет яростно сопротивлялся предоставить мне инструкцию к написанию такого кода, закидывая туториалами по настройке ограничения RPS на сервере, а не клиенте.

Но теперь на Хабре есть эта статья, которая научит отправлять запросы из HttpClient так, чтобы не получать 429 Too Many Requests.

Throttling


Обозначенный процесс взаимодействия с API, где нельзя превышать заданный RPS, в общем случае именуется троттлингом (throttling).

Throttling — это широко используемая техника для увеличения производительности кода, который выполняется повторно с некоторой периодичностью.

Троттлинг функции означает, что функция вызывается не более одного раза в указанный период времени (например, раз в 10 секунд). Другими словами ― троттлинг предотвращает запуск функции, если она уже запускалась недавно. Троттлинг также обеспечивает регулярность выполнения функции с заданной периодичностью.



В экшен-играх приходится нажимать кнопки с высокой частотой для выполнения какого-либо действия (стрельба, удар). Как правило, игроки нажимают кнопки намного чаще, чем это требуется, вероятно, увлекаясь происходящим. Таким образом игрок может нажать на кнопку «удара» 10 раз в течение пяти секунд, но персонаж делает не более одного удара в секунду. В этом случае троттлинг события «удар» позволяет игнорировать повторные нажатия кнопки в течение секунды.

Как реализуется подобное ограничение?

Rate Limiting Алгоритмы


Есть ряд алгоритмов, которые реализуют технику Rate Limiting — контроль количества допущенного трафика к объекту.



Существует множество различных алгоритмов ограничения скорости для управления потоком запросов. .NET 7 представил 4 таких алгоритма.

▍ Concurrency limit


Concurrency лимитер ограничивает количество одновременных запросов, которые могут получить доступ к ресурсу. Если установленный предел равен 10, то 10 запросов могут получить доступ к ресурсу одновременно, а 11-й запрос не будет допущен. Как только запрос завершается, количество разрешённых запросов увеличивается до 1, при завершении двух запросов — до 2 и так далее. Это делается с помощью вызова Dispose на экземпляре RateLimitLease, о которой поговорим позже.

▍ Token bucket limit


Token bucket алгоритм получил своё название, исходя из принципа работы. Представьте, что есть ведро, до краёв наполненное токенами. Когда поступает запрос, он забирает токен и хранит его вечно. Через некоторое время кто-то добавляет в ведро заранее определённое количество токенов, никогда не добавляя больше, чем ведро может вместить. Если ведро пустое, то при поступлении запроса ему будет отказано в доступе к ресурсу.

Приведу более конкретный пример. Предположим, что ведро вмещает 10 токенов, и каждую минуту в него добавляется 2 токена. Когда приходит запрос, он забирает 1 токен, так что у нас остаётся 9. Ещё 3 запроса приходят и забирают 3 токена, оставляя в ведре 6 токенов. Через минуту поступает 2 новых токена, что даёт 8 в сумме. 8 запросов приходят и забирают оставшиеся токены, опустошая ведро. Если приходит ещё один запрос после этого, то у него уже не получится взять доступ к ресурсу, пока в ведре не окажется больше токенов. Они в данном примере восполняются каждую минуту. Через 5 минут отсутствия запросов в ведре снова будут все 10 токенов, и в последующие минуты они не будут добавляться, пока новые запросы не начнут забирать токены.

▍ Fixed window limit


Алгоритм с фиксированным окном использует идею окна, которая будет использоваться и в следующем алгоритме. Окно — это промежуток времени, в течение которого действует ограничение, прежде чем произойдёт переход к следующему окну. В случае с фиксированным окном переход к следующему окну означает сброс ограничения обратно в начальную точку.

Представим, что есть кинотеатр с одним залом, вмещающим 100 человек, которые пришли смотреть 2-часовой фильм. Когда фильм начинается, людям разрешается встать в очередь на следующий сеанс, который состоится через 2 часа. В очереди могут стоять до 100 человек, прежде чем им начнут говорить прийти позже. По прошествии двух часов фильм заканчивается, и очередь от 0 до 100 человек может переместиться в кинотеатр, тем самым начиная формирование новой очереди. Всё равно что двигать само окно в алгоритме фиксированного окна.

▍ Sliding window limit


Алгоритм со скользящим окном похож на алгоритм с фиксированным окном. Но туда добавляются отрезки. Отрезок — это, соответственно, часть окна. Если взять 2-часовое окно из предыдущего раздела и разбить его на 4 отрезка, то получится четыре 30-минутных отрезка. Также учитывается индекс текущего отрезка, который будет всегда указывать на самый новый отрезок в окне. Запросы в течение получасового периода попадают в текущий отрезок, и каждые 30 минут окно сдвигается на один отрезок. Если в течение отрезка, мимо которого проскользнуло окно, были запросы, они обновляются, и установленное ограничение увеличивается на эту величину. Если запросов не было, наше ограничение остаётся прежним.

Время кодить


Окей, вот и разобрались немного с теоретической базой. Менеджер не дремлет и всё ещё просит задачу по интеграции, поэтому надо как можно скорее начать писать код.

Вот так, кстати, он выглядит:


Предположим, что есть некий DataObject, содержащий некий Content.

Этот DataObject можно получить, вызвав какой-нибудь IDataObjectExternalApiService, под капотом у которого экземпляр HttpClient делает запрос:

record DataObject(string Content);

interface IDataObjectExternalApiService
{
    Task<DataObject> GetByIdAsync(int id, CancellationToken ct = default);
}

Есть какой-то набор идентификаторов, исчисляющийся тысячами, и по ним нужно выкачать соответствующие контенты:

interface IDataObjectCollectionProvider
{
    Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
        IReadOnlyCollection<int> ids,
        CancellationToken ct = default);
}

Но вот незадача! Это самое внешнее API допускает максимум только 10 RPS.

▍ SemaphoreSlim


Если допускается, что речь идёт о 10 запросах одновременно, то можно попробовать реализовать Concurrency лимитер с помощью примитива синхронизации SemaphoreSlim.

SemaphoreSlim — это облегчённая альтернатива Semaphore, которая ограничивает количество потоков, имеющих одновременный доступ к ресурсу или пулу ресурсов. При этом SemaphoreSlim можно использовать в рамках async/await.

Получится примерно что-то такое:

class DataObjectCollectionProvider : IDataObjectCollectionProvider
{
    private readonly IDataObjectExternalApiService _externalApiService;

    public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) =>
        _externalApiService = externalApiService;

    public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
        IReadOnlyCollection<int> ids,
        CancellationToken ct = default)
    {
        if (ids.Count == 0)
            return [];

        var semaphoreSlim = new SemaphoreSlim(
            initialCount: 10,
            maxCount: 10);
        ConcurrentBag<DataObject> dataObjects = [];

        var tasks = ids.Select(async id =>
        {
            await semaphoreSlim.WaitAsync(ct);
            try
            {
                var dataObject = await _externalApiService.GetByIdAsync(id, ct);
                dataObjects.Add(dataObject);
            }
            finally
            {
                semaphoreSlim.Release();
            }
        });

        await Task.WhenAll(tasks);
        return dataObjects;
    }
}

На мой взгляд, выглядит достаточно кустарно — надо встраивать использование примитива синхронизации в логику вызова и перебора, перемешивая инфраструктурный код с бизнес-логикой.

А если понадобится другой алгоритм, что делать в таком случае?

▍ System.Threading.RateLimiting


Как уже было сказано, .NET 7 представил новый NuGet-пакет System.Threading.RateLimiting, содержащий реализации указанных в статье алгоритмов.

Все они являются наследниками абстрактного класса RateLimiter:

public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
    public abstract int GetAvailablePermits();
    public abstract TimeSpan? IdleDuration { get; }

    public RateLimitLease Acquire(int permitCount = 1);
    public ValueTask<RateLimitLease> WaitAsync(int permitCount = 1, CancellationToken cancellationToken = default);

    public void Dispose();
    public ValueTask DisposeAsync();
}

Наследники этого класса принимают в качестве параметра специальные конфигурационные настройки для регулирования поведения алгоритма.

Соответственно, тут уже выбор какой-то появляется. Например, указанная задача, по моему скромному мнению, решается интуитивнее с помощью Fixed Window.

Тогда и переписать код можно следующим образом:

class DataObjectCollectionProvider : IDataObjectCollectionProvider
{
    private readonly IDataObjectExternalApiService _externalApiService;

    public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) =>
        _externalApiService = externalApiService;

    public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
        IReadOnlyCollection<int> ids,
        CancellationToken ct = default)
    {
        if (ids.Count == 0)
            return [];

        var limiter = new FixedWindowRateLimiter(
            new FixedWindowRateLimiterOptions
            {
                Window = TimeSpan.FromSeconds(1),
                PermitLimit = 10,
                QueueLimit = 10
            });
        ConcurrentBag<DataObject> dataObjects = [];

        var tasks = ids.Select(async id =>
        {
            using var lease = await limiter.AcquireAsync(cancellationToken: ct);
            if (lease.IsAcquired)
            {
                var dataObject = await _externalApiService.GetByIdAsync(id, ct);
                dataObjects.Add(dataObject);
            }
        });

        await Task.WhenAll(tasks);
        return dataObjects;
    }
}


Уже лучше, поскольку алгоритм заменим, а опции можно внедрить через DI.

Кстати, обратите внимание, что создаётся лимитер с параметрами PermitLimit = 10 и QueueLimit = 10. Это означает, что в окно размером в секунду пустят не больше 10 запросов, и разрешается ставить в очередь вызовы WaitAsync с общим количеством запросов на разрешение не более 10.

Однако что делать, если разрешение не удастся выбить, и лимитер не пустит наш запрос? Как построить логику обработки ошибок? И всё это сделать так, чтобы можно было написать unit-тесты?

▍ Polly.RateLimiting


Тут на помощь приходит библиотека Polly, которая построила обёртку над System.Threading.RateLimiting в виде пакета Polly.RateLimiting.

Вообще, инструмент сильно изменился, добавив концепцию пайплайнов. Пайплайн оборачивает необходимый вызов, но перед этим в него накручиваются все свистелки с перделками. Подробности вот в этом разделе документации.

Так вот лимитеры тоже можно добавлять в качестве этапа пайплайна, а сам пайплайн протаскивать через DI до конкретного потребителя.

Внедрение происходит с помощью специального провайдера, который достаёт пайплайн по заданному ключу. Для простоты в качестве ключа буду использовать имя типа потребителя.

Получается, что можно задекорировать внешний сервис, обернув вызов, а затем использовать его как угодно:

class DataObjectServiceRateLimiterDecorator : IDataObjectExternalApiService
{
    private readonly IDataObjectExternalApiService _decorated;
    private readonly ResiliencePipeline<DataObject> _pipeline;

    public DataObjectServiceRateLimiterDecorator(
        IDataObjectExternalApiService decorated,
        ResiliencePipelineProvider<string> pipelineProvider)
    {
        _decorated = decorated;
        _pipeline = pipelineProvider.GetPipeline<DataObject>(
            key: nameof(DataObjectServiceRateLimiterDecorator));
    }

    public async Task<DataObject> GetByIdAsync(int id, CancellationToken ct = default) =>
        await _pipeline.ExecuteAsync(async token => await _decorated.GetByIdAsync(id, token), ct);
}

class DataObjectCollectionProvider : IDataObjectCollectionProvider
{
    private readonly IDataObjectExternalApiService _externalApiService;

    public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) =>
        _externalApiService = externalApiService;

    public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
        IReadOnlyCollection<int> ids,
        CancellationToken ct = default)
    {
        if (ids.Count == 0)
            return [];

        var tasks = ids.Select(id => _externalApiService.GetByIdAsync(id, ct));
        return await Task.WhenAll(tasks);
    }
}

Допустим, RPS задаётся некими опциями, а вместе с лимитером хочется настроить некоторую политику повторов. Тогда конфигурация будет примерно такая:

services.AddResiliencePipeline<string, DataObject>(
    nameof(DataObjectServiceRateLimiterDecorator),
    (builder, pollyContext) =>
    {
        var allowedRps = pollyContext.ServiceProvider
            .GetRequiredService<IOptions<IDataObjectApiOptions>>()
            .Value.RequestsPerSecond;
        builder
            .ConfigureTelemetry(NullLoggerFactory.Instance)
            .AddRetry(
                new RetryStrategyOptions<DataObject>
                {
                    Delay = TimeSpan.FromSeconds(1),
                    MaxRetryAttempts = 5
                })
            .AddRateLimiter(
                new FixedWindowRateLimiter(
                    new FixedWindowRateLimiterOptions
                    {
                        Window = TimeSpan.FromSeconds(1),
                        PermitLimit = allowedRps,
                        QueueLimit = allowedRps / 3 + 10,
                    }));
    });

В конце концов, можно и unit-тесты написать, проверив поведение пайплайна в некоторых ситуациях. Например, вот такие:

public class DataObjectServiceRateLimiterDecoratorTests
{
    [Theory]
    [InlineData(1)]
    [InlineData(2)]
    [InlineData(3)]
    [InlineData(4)]
    [InlineData(5)]
    public async Task GetByIdAsync_ApiReturnedError_RetryCallHappened(int retryCount)
    {
        // arrange
        var response = new DataObject(Content: Guid.NewGuid().ToString());
        var apiService = new Mock<IDataObjectExternalApiService>();
        var sequentialResult = apiService.SetupSequence(
            x => x.GetByIdAsync(
                It.IsAny<int>(),
                It.IsAny<CancellationToken>()));
        for (var i = 0; i < retryCount - 1; i++)
            sequentialResult = sequentialResult.ThrowsAsync(new Exception());
        sequentialResult.ReturnsAsync(response);

        var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>();
        pipelineProvider
            .Setup(
                x => x.GetPipeline<DataObject>(
                    nameof(DataObjectServiceRateLimiterDecorator)))
            .Returns(
                new ResiliencePipelineBuilder<DataObject>()
                    .AddRetry(
                        new RetryStrategyOptions<DataObject>
                        {
                            MaxRetryAttempts = retryCount,
                            Delay = TimeSpan.FromMilliseconds(1)
                        })
                    .Build());

        var decorator = new DataObjectServiceRateLimiterDecorator(
            apiService.Object, pipelineProvider.Object);

        // act
        var dataObject = await decorator.GetByIdAsync(id: default, ct: default);

        // assert
        dataObject.Should().BeEquivalentTo(response);
        apiService
            .Verify(
                x => x.GetByIdAsync(
                    It.IsAny<int>(),
                    It.IsAny<CancellationToken>()),
                Times.Exactly(retryCount));
    }

    [Theory]
    [InlineData(5, 1)]
    [InlineData(100, 50)]
    [InlineData(60, 60)]
    [InlineData(10, 11)]
    [InlineData(20, 40)]
    [InlineData(30, 100)]
    public async Task GetByIdAsync_IfRpsRateLimitExceeded_ThenExceptionIsThrown(int rps, int amount)
    {
        // arrange
        var apiService = new Mock<IDataObjectExternalApiService>();
        apiService
            .Setup(
                x => x.GetByIdAsync(
                    It.IsAny<int>(),
                    It.IsAny<CancellationToken>()))
            .ReturnsAsync(new DataObject(Content: string.Empty));

        var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>();
        pipelineProvider
            .Setup(
                x => x.GetPipeline<DataObject>(
                    nameof(DataObjectServiceRateLimiterDecorator)))
            .Returns(
                new ResiliencePipelineBuilder<DataObject>()
                    .AddRateLimiter(
                        new FixedWindowRateLimiter(
                            new FixedWindowRateLimiterOptions
                            {
                                PermitLimit = rps,
                                Window = TimeSpan.FromSeconds(1)
                            }))
                    .Build());

        var decorator = new DataObjectServiceRateLimiterDecorator(
            apiService.Object, pipelineProvider.Object);

        // act
        var tasks = Enumerable.Range(0, amount)
            .Select(id => decorator.GetByIdAsync(id, ct: default));
        var ex = await Record.ExceptionAsync(() => Task.WhenAll(tasks));

        // assert
        if (amount > rps)
            ex.Should().BeOfType<RateLimiterRejectedException>();
        else
            ex.Should().BeNull();
    }

    [Fact]
    public async Task GetByIdAsync_HappyPath()
    {
        // arrange
        var apiService = new Mock<IDataObjectExternalApiService>();
        apiService
            .Setup(
                x => x.GetByIdAsync(
                    It.IsAny<int>(),
                    It.IsAny<CancellationToken>()))
            .ReturnsAsync(new DataObject(Content: Guid.NewGuid().ToString()));

        var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>();
        pipelineProvider
            .Setup(
                x => x.GetPipeline<DataObject>(
                    nameof(DataObjectServiceRateLimiterDecorator)))
            .Returns(
                new ResiliencePipelineBuilder<DataObject>()
                    .AddRetry(
                        new RetryStrategyOptions<DataObject>
                        {
                            MaxRetryAttempts = 5,
                            Delay = TimeSpan.FromSeconds(1)
                        })
                    .AddRateLimiter(
                        new FixedWindowRateLimiter(
                            new FixedWindowRateLimiterOptions
                            {
                                PermitLimit = 10,
                                Window = TimeSpan.FromSeconds(1)
                            }))
                    .Build());

        var decorator = new DataObjectServiceRateLimiterDecorator(
            apiService.Object, pipelineProvider.Object);

        // act
        var tasks = Enumerable.Range(0, 100)
            .Select(id => decorator.GetByIdAsync(id, ct: default));
        var result = await Task.WhenAll(tasks);

        // assert
        result.Length.Should().Be(100);
    }
}

Заключение


Сегодня вы узнали, как потреблять внешнее API с ограничением по RPS несколькими способами. Итеративно было выявлено, что удобнее и гибче всего работать через Polly. Был предоставлен подробный пример.

Ещё я веду Telegram-канал StepOne, куда выкладываю много интересного контента про коммерческую разработку на C#, даю карьерные советы, рассказываю истории из личного опыта и раскрываю все тайны IT-индустрии.

Telegram-канал со скидками, розыгрышами призов и новостями IT ?

Комментарии (6)


  1. mvv-rus
    02.05.2024 12:30

    На мой взгляд, выглядит достаточно кустарно — надо встраивать использование примитива синхронизации в логику вызова и перебора, перемешивая инфраструктурный код с бизнес-логикой.

    Вы не любите кошексамописные реализации интерфейсов? Да вы просто не умеете их готовить! ;-)

    Зачем встраивать? Добавьте сервис для IDataObjectCollectionProvider с реализацией в виде этого вашего DataObjectCollectionProvider на семафоре в контейнер сервисов (он же - DI-контейнер) - и будет вам щастье: можете теперь этот интерфейс внедрять через DI, или вытаскивать из контейнера через Service Locator, и ничего перемешивать не надо. А реализация интерфейса такая - это для начала, потом поменяете ее, если надо будет, и никакой другой код даже трогать не придется (в идеале, конечно). Ну, а дополнительные параметры для настройки в эту вашу реализацию передать можно передать, используя Options pattern.
    Только предварительно сервис для IDataObjectCollectionProvider еще надо в контейнер добавить (если вдруг его раньше никто не добавил), чтобы контейнер смог реализацию этого интерфейса в конструктор DataObjectCollectionProvider передать.


  1. Rast1234
    02.05.2024 12:30
    +1

    Жаль только это все не помогает, когда надо ограничивать несколько реплик. Пришлось token bucket запихивать в постгрес, и писать обертки, чтобы прозрачно встроиться в httpclient


    1. maksim_bronnikov
      02.05.2024 12:30
      +1

      Есть целый набор пакетов под названием DistibutedLock. С помощью них можно просто реализовать распределеный троттлинг через Redis, например.


      1. Rast1234
        02.05.2024 12:30

        круто, спасибо. почему-то не нашел именно эту либу, когда надо было. смотрю, для постгреса там advisory lock. его и юзаем, но такая абстракция была бы полезной конечно

        а redis, увы, плохо кластеризуется в мульти-цод среде, поэтому его не используем для синхронизаций вообще


  1. dbashinsky
    02.05.2024 12:30

    А как клиенту узнать за какой период можно выполнить лимит запросов?

    У вас в примере в 1 секунду можно выполнить 10 запросов, а что если на сервере изменят настройку и поставят за одну минуту можно выполнить 100 запросов.


    1. Stefanio Автор
      02.05.2024 12:30

      Немного странный вопрос)

      Обычно эту информацию предоставляет внешняя система в документации.
      Например, https://dadata.ru/api/find-party/#restrictions