Однажды каждый C# программист получает на работе задачу по разработке интеграции с внешней системой, где ограничена максимальная частота запросов в секунду.
Интернет яростно сопротивлялся предоставить мне инструкцию к написанию такого кода, закидывая туториалами по настройке ограничения RPS на сервере, а не клиенте.
Но теперь на Хабре есть эта статья, которая научит отправлять запросы из
HttpClient
так, чтобы не получать 429 Too Many Requests
.Throttling
Обозначенный процесс взаимодействия с API, где нельзя превышать заданный RPS, в общем случае именуется троттлингом (throttling).
Throttling — это широко используемая техника для увеличения производительности кода, который выполняется повторно с некоторой периодичностью.
Троттлинг функции означает, что функция вызывается не более одного раза в указанный период времени (например, раз в 10 секунд). Другими словами ― троттлинг предотвращает запуск функции, если она уже запускалась недавно. Троттлинг также обеспечивает регулярность выполнения функции с заданной периодичностью.
В экшен-играх приходится нажимать кнопки с высокой частотой для выполнения какого-либо действия (стрельба, удар). Как правило, игроки нажимают кнопки намного чаще, чем это требуется, вероятно, увлекаясь происходящим. Таким образом игрок может нажать на кнопку «удара» 10 раз в течение пяти секунд, но персонаж делает не более одного удара в секунду. В этом случае троттлинг события «удар» позволяет игнорировать повторные нажатия кнопки в течение секунды.
Как реализуется подобное ограничение?
Rate Limiting Алгоритмы
Есть ряд алгоритмов, которые реализуют технику Rate Limiting — контроль количества допущенного трафика к объекту.
Существует множество различных алгоритмов ограничения скорости для управления потоком запросов. .NET 7 представил 4 таких алгоритма.
▍ Concurrency limit
Concurrency лимитер ограничивает количество одновременных запросов, которые могут получить доступ к ресурсу. Если установленный предел равен 10, то 10 запросов могут получить доступ к ресурсу одновременно, а 11-й запрос не будет допущен. Как только запрос завершается, количество разрешённых запросов увеличивается до 1, при завершении двух запросов — до 2 и так далее. Это делается с помощью вызова
Dispose
на экземпляре RateLimitLease
, о которой поговорим позже.▍ Token bucket limit
Token bucket алгоритм получил своё название, исходя из принципа работы. Представьте, что есть ведро, до краёв наполненное токенами. Когда поступает запрос, он забирает токен и хранит его вечно. Через некоторое время кто-то добавляет в ведро заранее определённое количество токенов, никогда не добавляя больше, чем ведро может вместить. Если ведро пустое, то при поступлении запроса ему будет отказано в доступе к ресурсу.
Приведу более конкретный пример. Предположим, что ведро вмещает 10 токенов, и каждую минуту в него добавляется 2 токена. Когда приходит запрос, он забирает 1 токен, так что у нас остаётся 9. Ещё 3 запроса приходят и забирают 3 токена, оставляя в ведре 6 токенов. Через минуту поступает 2 новых токена, что даёт 8 в сумме. 8 запросов приходят и забирают оставшиеся токены, опустошая ведро. Если приходит ещё один запрос после этого, то у него уже не получится взять доступ к ресурсу, пока в ведре не окажется больше токенов. Они в данном примере восполняются каждую минуту. Через 5 минут отсутствия запросов в ведре снова будут все 10 токенов, и в последующие минуты они не будут добавляться, пока новые запросы не начнут забирать токены.
▍ Fixed window limit
Алгоритм с фиксированным окном использует идею окна, которая будет использоваться и в следующем алгоритме. Окно — это промежуток времени, в течение которого действует ограничение, прежде чем произойдёт переход к следующему окну. В случае с фиксированным окном переход к следующему окну означает сброс ограничения обратно в начальную точку.
Представим, что есть кинотеатр с одним залом, вмещающим 100 человек, которые пришли смотреть 2-часовой фильм. Когда фильм начинается, людям разрешается встать в очередь на следующий сеанс, который состоится через 2 часа. В очереди могут стоять до 100 человек, прежде чем им начнут говорить прийти позже. По прошествии двух часов фильм заканчивается, и очередь от 0 до 100 человек может переместиться в кинотеатр, тем самым начиная формирование новой очереди. Всё равно что двигать само окно в алгоритме фиксированного окна.
▍ Sliding window limit
Алгоритм со скользящим окном похож на алгоритм с фиксированным окном. Но туда добавляются отрезки. Отрезок — это, соответственно, часть окна. Если взять 2-часовое окно из предыдущего раздела и разбить его на 4 отрезка, то получится четыре 30-минутных отрезка. Также учитывается индекс текущего отрезка, который будет всегда указывать на самый новый отрезок в окне. Запросы в течение получасового периода попадают в текущий отрезок, и каждые 30 минут окно сдвигается на один отрезок. Если в течение отрезка, мимо которого проскользнуло окно, были запросы, они обновляются, и установленное ограничение увеличивается на эту величину. Если запросов не было, наше ограничение остаётся прежним.
Время кодить
Окей, вот и разобрались немного с теоретической базой. Менеджер не дремлет и всё ещё просит задачу по интеграции, поэтому надо как можно скорее начать писать код.
Вот так, кстати, он выглядит:
Предположим, что есть некий
DataObject
, содержащий некий Content
.Этот
DataObject
можно получить, вызвав какой-нибудь IDataObjectExternalApiService
, под капотом у которого экземпляр HttpClient
делает запрос:record DataObject(string Content);
interface IDataObjectExternalApiService
{
Task<DataObject> GetByIdAsync(int id, CancellationToken ct = default);
}
Есть какой-то набор идентификаторов, исчисляющийся тысячами, и по ним нужно выкачать соответствующие контенты:
interface IDataObjectCollectionProvider
{
Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
IReadOnlyCollection<int> ids,
CancellationToken ct = default);
}
Но вот незадача! Это самое внешнее API допускает максимум только 10 RPS.
▍ SemaphoreSlim
Если допускается, что речь идёт о 10 запросах одновременно, то можно попробовать реализовать Concurrency лимитер с помощью примитива синхронизации
SemaphoreSlim
.SemaphoreSlim
— это облегчённая альтернатива Semaphore
, которая ограничивает количество потоков, имеющих одновременный доступ к ресурсу или пулу ресурсов. При этом SemaphoreSlim
можно использовать в рамках async
/await
.Получится примерно что-то такое:
class DataObjectCollectionProvider : IDataObjectCollectionProvider
{
private readonly IDataObjectExternalApiService _externalApiService;
public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) =>
_externalApiService = externalApiService;
public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
IReadOnlyCollection<int> ids,
CancellationToken ct = default)
{
if (ids.Count == 0)
return [];
var semaphoreSlim = new SemaphoreSlim(
initialCount: 10,
maxCount: 10);
ConcurrentBag<DataObject> dataObjects = [];
var tasks = ids.Select(async id =>
{
await semaphoreSlim.WaitAsync(ct);
try
{
var dataObject = await _externalApiService.GetByIdAsync(id, ct);
dataObjects.Add(dataObject);
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
return dataObjects;
}
}
На мой взгляд, выглядит достаточно кустарно — надо встраивать использование примитива синхронизации в логику вызова и перебора, перемешивая инфраструктурный код с бизнес-логикой.
А если понадобится другой алгоритм, что делать в таком случае?
▍ System.Threading.RateLimiting
Как уже было сказано, .NET 7 представил новый NuGet-пакет
System.Threading.RateLimiting
, содержащий реализации указанных в статье алгоритмов.Все они являются наследниками абстрактного класса
RateLimiter
:public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
public abstract int GetAvailablePermits();
public abstract TimeSpan? IdleDuration { get; }
public RateLimitLease Acquire(int permitCount = 1);
public ValueTask<RateLimitLease> WaitAsync(int permitCount = 1, CancellationToken cancellationToken = default);
public void Dispose();
public ValueTask DisposeAsync();
}
Наследники этого класса принимают в качестве параметра специальные конфигурационные настройки для регулирования поведения алгоритма.
Соответственно, тут уже выбор какой-то появляется. Например, указанная задача, по моему скромному мнению, решается интуитивнее с помощью Fixed Window.
Тогда и переписать код можно следующим образом:
class DataObjectCollectionProvider : IDataObjectCollectionProvider
{
private readonly IDataObjectExternalApiService _externalApiService;
public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) =>
_externalApiService = externalApiService;
public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
IReadOnlyCollection<int> ids,
CancellationToken ct = default)
{
if (ids.Count == 0)
return [];
var limiter = new FixedWindowRateLimiter(
new FixedWindowRateLimiterOptions
{
Window = TimeSpan.FromSeconds(1),
PermitLimit = 10,
QueueLimit = 10
});
ConcurrentBag<DataObject> dataObjects = [];
var tasks = ids.Select(async id =>
{
using var lease = await limiter.AcquireAsync(cancellationToken: ct);
if (lease.IsAcquired)
{
var dataObject = await _externalApiService.GetByIdAsync(id, ct);
dataObjects.Add(dataObject);
}
});
await Task.WhenAll(tasks);
return dataObjects;
}
}
Уже лучше, поскольку алгоритм заменим, а опции можно внедрить через DI.
Кстати, обратите внимание, что создаётся лимитер с параметрами
PermitLimit = 10
и QueueLimit = 10
. Это означает, что в окно размером в секунду пустят не больше 10 запросов, и разрешается ставить в очередь вызовы WaitAsync
с общим количеством запросов на разрешение не более 10.Однако что делать, если разрешение не удастся выбить, и лимитер не пустит наш запрос? Как построить логику обработки ошибок? И всё это сделать так, чтобы можно было написать unit-тесты?
▍ Polly.RateLimiting
Тут на помощь приходит библиотека
Polly
, которая построила обёртку над System.Threading.RateLimiting в виде пакета Polly.RateLimiting
.Вообще, инструмент сильно изменился, добавив концепцию пайплайнов. Пайплайн оборачивает необходимый вызов, но перед этим в него накручиваются все свистелки с перделками. Подробности вот в этом разделе документации.
Так вот лимитеры тоже можно добавлять в качестве этапа пайплайна, а сам пайплайн протаскивать через DI до конкретного потребителя.
Внедрение происходит с помощью специального провайдера, который достаёт пайплайн по заданному ключу. Для простоты в качестве ключа буду использовать имя типа потребителя.
Получается, что можно задекорировать внешний сервис, обернув вызов, а затем использовать его как угодно:
class DataObjectServiceRateLimiterDecorator : IDataObjectExternalApiService
{
private readonly IDataObjectExternalApiService _decorated;
private readonly ResiliencePipeline<DataObject> _pipeline;
public DataObjectServiceRateLimiterDecorator(
IDataObjectExternalApiService decorated,
ResiliencePipelineProvider<string> pipelineProvider)
{
_decorated = decorated;
_pipeline = pipelineProvider.GetPipeline<DataObject>(
key: nameof(DataObjectServiceRateLimiterDecorator));
}
public async Task<DataObject> GetByIdAsync(int id, CancellationToken ct = default) =>
await _pipeline.ExecuteAsync(async token => await _decorated.GetByIdAsync(id, token), ct);
}
class DataObjectCollectionProvider : IDataObjectCollectionProvider
{
private readonly IDataObjectExternalApiService _externalApiService;
public DataObjectCollectionProvider(IDataObjectExternalApiService externalApiService) =>
_externalApiService = externalApiService;
public async Task<IReadOnlyCollection<DataObject>> GetByIdsAsync(
IReadOnlyCollection<int> ids,
CancellationToken ct = default)
{
if (ids.Count == 0)
return [];
var tasks = ids.Select(id => _externalApiService.GetByIdAsync(id, ct));
return await Task.WhenAll(tasks);
}
}
Допустим, RPS задаётся некими опциями, а вместе с лимитером хочется настроить некоторую политику повторов. Тогда конфигурация будет примерно такая:
services.AddResiliencePipeline<string, DataObject>(
nameof(DataObjectServiceRateLimiterDecorator),
(builder, pollyContext) =>
{
var allowedRps = pollyContext.ServiceProvider
.GetRequiredService<IOptions<IDataObjectApiOptions>>()
.Value.RequestsPerSecond;
builder
.ConfigureTelemetry(NullLoggerFactory.Instance)
.AddRetry(
new RetryStrategyOptions<DataObject>
{
Delay = TimeSpan.FromSeconds(1),
MaxRetryAttempts = 5
})
.AddRateLimiter(
new FixedWindowRateLimiter(
new FixedWindowRateLimiterOptions
{
Window = TimeSpan.FromSeconds(1),
PermitLimit = allowedRps,
QueueLimit = allowedRps / 3 + 10,
}));
});
В конце концов, можно и unit-тесты написать, проверив поведение пайплайна в некоторых ситуациях. Например, вот такие:
public class DataObjectServiceRateLimiterDecoratorTests
{
[Theory]
[InlineData(1)]
[InlineData(2)]
[InlineData(3)]
[InlineData(4)]
[InlineData(5)]
public async Task GetByIdAsync_ApiReturnedError_RetryCallHappened(int retryCount)
{
// arrange
var response = new DataObject(Content: Guid.NewGuid().ToString());
var apiService = new Mock<IDataObjectExternalApiService>();
var sequentialResult = apiService.SetupSequence(
x => x.GetByIdAsync(
It.IsAny<int>(),
It.IsAny<CancellationToken>()));
for (var i = 0; i < retryCount - 1; i++)
sequentialResult = sequentialResult.ThrowsAsync(new Exception());
sequentialResult.ReturnsAsync(response);
var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>();
pipelineProvider
.Setup(
x => x.GetPipeline<DataObject>(
nameof(DataObjectServiceRateLimiterDecorator)))
.Returns(
new ResiliencePipelineBuilder<DataObject>()
.AddRetry(
new RetryStrategyOptions<DataObject>
{
MaxRetryAttempts = retryCount,
Delay = TimeSpan.FromMilliseconds(1)
})
.Build());
var decorator = new DataObjectServiceRateLimiterDecorator(
apiService.Object, pipelineProvider.Object);
// act
var dataObject = await decorator.GetByIdAsync(id: default, ct: default);
// assert
dataObject.Should().BeEquivalentTo(response);
apiService
.Verify(
x => x.GetByIdAsync(
It.IsAny<int>(),
It.IsAny<CancellationToken>()),
Times.Exactly(retryCount));
}
[Theory]
[InlineData(5, 1)]
[InlineData(100, 50)]
[InlineData(60, 60)]
[InlineData(10, 11)]
[InlineData(20, 40)]
[InlineData(30, 100)]
public async Task GetByIdAsync_IfRpsRateLimitExceeded_ThenExceptionIsThrown(int rps, int amount)
{
// arrange
var apiService = new Mock<IDataObjectExternalApiService>();
apiService
.Setup(
x => x.GetByIdAsync(
It.IsAny<int>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(new DataObject(Content: string.Empty));
var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>();
pipelineProvider
.Setup(
x => x.GetPipeline<DataObject>(
nameof(DataObjectServiceRateLimiterDecorator)))
.Returns(
new ResiliencePipelineBuilder<DataObject>()
.AddRateLimiter(
new FixedWindowRateLimiter(
new FixedWindowRateLimiterOptions
{
PermitLimit = rps,
Window = TimeSpan.FromSeconds(1)
}))
.Build());
var decorator = new DataObjectServiceRateLimiterDecorator(
apiService.Object, pipelineProvider.Object);
// act
var tasks = Enumerable.Range(0, amount)
.Select(id => decorator.GetByIdAsync(id, ct: default));
var ex = await Record.ExceptionAsync(() => Task.WhenAll(tasks));
// assert
if (amount > rps)
ex.Should().BeOfType<RateLimiterRejectedException>();
else
ex.Should().BeNull();
}
[Fact]
public async Task GetByIdAsync_HappyPath()
{
// arrange
var apiService = new Mock<IDataObjectExternalApiService>();
apiService
.Setup(
x => x.GetByIdAsync(
It.IsAny<int>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(new DataObject(Content: Guid.NewGuid().ToString()));
var pipelineProvider = new Mock<ResiliencePipelineProvider<string>>();
pipelineProvider
.Setup(
x => x.GetPipeline<DataObject>(
nameof(DataObjectServiceRateLimiterDecorator)))
.Returns(
new ResiliencePipelineBuilder<DataObject>()
.AddRetry(
new RetryStrategyOptions<DataObject>
{
MaxRetryAttempts = 5,
Delay = TimeSpan.FromSeconds(1)
})
.AddRateLimiter(
new FixedWindowRateLimiter(
new FixedWindowRateLimiterOptions
{
PermitLimit = 10,
Window = TimeSpan.FromSeconds(1)
}))
.Build());
var decorator = new DataObjectServiceRateLimiterDecorator(
apiService.Object, pipelineProvider.Object);
// act
var tasks = Enumerable.Range(0, 100)
.Select(id => decorator.GetByIdAsync(id, ct: default));
var result = await Task.WhenAll(tasks);
// assert
result.Length.Should().Be(100);
}
}
Заключение
Сегодня вы узнали, как потреблять внешнее API с ограничением по RPS несколькими способами. Итеративно было выявлено, что удобнее и гибче всего работать через
Polly
. Был предоставлен подробный пример.Ещё я веду Telegram-канал StepOne, куда выкладываю много интересного контента про коммерческую разработку на C#, даю карьерные советы, рассказываю истории из личного опыта и раскрываю все тайны IT-индустрии.
Telegram-канал со скидками, розыгрышами призов и новостями IT ?
Комментарии (6)
Rast1234
02.05.2024 12:30+1Жаль только это все не помогает, когда надо ограничивать несколько реплик. Пришлось token bucket запихивать в постгрес, и писать обертки, чтобы прозрачно встроиться в httpclient
maksim_bronnikov
02.05.2024 12:30+1Есть целый набор пакетов под названием DistibutedLock. С помощью них можно просто реализовать распределеный троттлинг через Redis, например.
Rast1234
02.05.2024 12:30круто, спасибо. почему-то не нашел именно эту либу, когда надо было. смотрю, для постгреса там advisory lock. его и юзаем, но такая абстракция была бы полезной конечно
а redis, увы, плохо кластеризуется в мульти-цод среде, поэтому его не используем для синхронизаций вообще
dbashinsky
02.05.2024 12:30А как клиенту узнать за какой период можно выполнить лимит запросов?
У вас в примере в 1 секунду можно выполнить 10 запросов, а что если на сервере изменят настройку и поставят за одну минуту можно выполнить 100 запросов.
Stefanio Автор
02.05.2024 12:30Немного странный вопрос)
Обычно эту информацию предоставляет внешняя система в документации.
Например, https://dadata.ru/api/find-party/#restrictions
mvv-rus
Вы не любите
кошексамописные реализации интерфейсов? Да вы просто не умеете их готовить! ;-)Зачем встраивать? Добавьте сервис для
IDataObjectCollectionProvider с реализацией в виде этого вашего DataObjectCollectionProvider на семафоре в контейнер сервисов (он же - DI-контейнер) - и будет вам щастье: можете теперь этот интерфейс внедрять через DI, или вытаскивать из контейнера через Service Locator, и ничего перемешивать не надо. А реализация интерфейса такая - это для начала, потом поменяете ее, если надо будет, и никакой другой код даже трогать не придется (в идеале, конечно). Ну, а дополнительные параметры для настройки в эту вашу реализацию передать можно передать, используя Options pattern.
Только предварительно сервис для IDataObjectCollectionProvider еще надо в контейнер добавить (если вдруг его раньше никто не добавил), чтобы контейнер смог реализацию этого интерфейса в конструктор DataObjectCollectionProvider передать.