На днях мне прилетела задача, в которой нужно было вычерпывать данные по HTTP с такими вводными:
Есть ограничение по количеству запросов в минуту
Объём данных - миллионы записей
Один запрос выполняется долго (возвращает много данных)
Нужен асинхронный механизм выгрузки
Не включая мозг, я начал накидывать решение...
Грабли №1: async, который работает синхронно
async def fetch_all_pages():
...
while True:
response = await fetch( # ← ошибка
f"/resource?page={page}"
)
...
page += 1
...
Формально:
async
await
Фактически:
один запрос за раз
каждый следующий запрос ждёт предыдущий
Async-код не становится параллельным автоматически. Если каждый запрос ждёт завершения предыдущего - это синхронное исполнение в async-синтаксисе.
Грабли №2: «давайте просто gather всё»
Окей, давайте делать «настоящий async».
async def fetch_all():
tasks = [fetch(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks) # ← ошибка
...
Работает. Быстро. Очень быстро. Но мы только что создали монстра.
Что пошло не так:
сoroutine storm - резкий рост потребления RAM
закономерные
429 Too Many Requests, а дальше - блокировки и баныgatherждёт все задачи, одна зависла → зависло всёневозможно стримить данные (а это критично в моём кейсе)
asyncio.gatherбез ограничений - это не параллелизм, а хаос.
Грабли №3: «давайте думать, подсказывайте»
Интегрируем семафор - классическое решение из тренажёров и собеседований.
from asyncio import Semaphore
semaphore = Semaphore(10)
async def fetch_page(page):
async with semaphore:
return await fetch(f"/resource?page={page}")
async def main():
tasks = [fetch_page(page) for page in range(1, 1000)]
results = await asyncio.gather(*tasks)
...
Интуитивное ожидание: «Ну значит 10 запросов в секунду»
Реальность:
latency API *плавает** (в моём случае от 8 до 30 секунд), из-за этого:
лимит используется лишь на ~30%
выгрузка растягивается во времени
если API внезапно ускорится (кэш, CDN, оптимизация) - получаем гарантированное кратное увеличение rps - привет, бан.
Для наглядности:
Долгие ответы:
10 / 8 сек ≈ 75 req/minДеградация API:
10 / 30 сек ≈ 20 req/minУскорение API:
10 / 0.5 сек = 1200 req/min
Семафор связывает пропускную способность системы с самым нестабильным параметром - latency внешнего API.
Semaphore регулирует ширину трубы. Но не регулирует поток воды.
Грабли №4: почти работает (но не совсем)
Чтобы RPS не зависел от latency, нужен механизм лимитирования по времени. Я остановился на алгоритме leaky bucket и его практической реализации - пакете aiolimiter.
Leaky bucket - алгоритм, который контролирует скорость обработки запросов, независимо от того, как быстро они обрабатываются.
Аналогия: кран с фиксированным напором - сколько ни лей, вытекает стабильно.
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(100, 60) # 100 запросов в минуту
semaphore = Semaphore(10) # контроль параллелизма
async def fetch_page(page):
async with semaphore:
await limiter.acquire()
return await fetch(f"/resource?page={page}")
Это действительно решило проблему зависимости от latency.
Но…
корутины всё ещё создаются пачкой
нет стриминга
сложно контролировать ретраи и ошибки
память по-прежнему под нагрузкой
Работает - да. Продакшен-реди - нет.
Антиграбли: золотая середина
Я вышел в интрернет с данным вопросом... и нашел несколько интересных подходов с producer/consumer/worker архитектурой, логика следующая:
Очередь задач → воркеры обрабатывают параллельно → AsyncLimiter контролирует скорость → результаты обрабатываются потоком.
from aiolimiter import AsyncLimiter
async def worker(client, queue, results):
while True:
offset = await queue.get()
if offset is None:
break
await limiter.acquire()
response = await client.get(API_URL, params={"offset": offset})
await results.put(response.json()["results"])
queue.task_done()
async def main():
queue = asyncio.Queue()
results = asyncio.Queue()
for offset in range(0, 1000, 50):
await queue.put(offset)
workers = [
asyncio.create_task(worker(...))
for _ in range(5)
]
while ...:
batch = await results.get()
# обработка данных
Что мы получили:
Экономия памяти - задачи не создаются одномоментно
Стриминг результатов
Контроль перегрузки через количество воркеров
Гибкая работа с ретраями и ошибками
Предсказуемая и стабильная пропускная способность системы
Подводим итоги
Решение не серебрянная пуля, но для моего кейса - устойчивая и продакшен-реди золотая середина.
Комментарии (6)

Adgh
05.01.2026 18:01Хорошая обзорная статья - всё сжато и по делу. Вот только без обработки исключений / ошибок, access / refresh токенов, ретрая, журналирования, расчета IO-метрик... до production ready всё равно ещё больше половины пути. Что удивительно - сценарий частый / проблема общая, каждый пишет свой велосипед, но так никто и не взялся за универсальный "комбайн"

mityukov
05.01.2026 18:01Самое стремное, что когда гуглишь проблему - основная масса ответов про rate limiting входящих запросов. Но там лишние просто отбрасываются, а надо чтобы ждали. А ещё хочется сохранить очередность (FIFO)

dyadyaSerezha
05.01.2026 18:01Все хорошо, только почему время обработки запроса вы называете latency? Это не задержка ни разу. Тщательнее)
naquad
С очередью - это оно кмк. Всё остальное - слишком. Как вариант: небольшой класс, который сразу отдаёт future клиентскому коду, а внутри его worker обрабатывает запрос. AsyncioLimiter можно и на банальный sleep заменить, довесить condition, чтобы если один worker получил 429, то флаг + sleep и продолжаем. Но это уже для пуристов, кто зависимости не любит.