Привет!
Предыстория
Была у меня задача: проверить качество работы API для поиска похожих векторов. Штука довольно стандартная для ML-систем — на вход подаёшь 128-мерный вектор (например, эмбеддинг изображения или текста), API ищет в базе ближайшие вектора и возвращает их.
Тестовый датасет: 10 миллионов векторов. Задача: для каждого вектора проверить, что API правильно находит его же (similarity должна быть 100%).
Коллеги написали API, я написал клиент для тестирования. Что может пойти не так?
Попытка 1: Тупо в лоб:
async def check_vector(vector: list[float]) -> bool:
"""Проверить один вектор."""
response = await client.post(
"http://api.internal/search",
json={"vector": vector, "top_k": 1}
)
result = response.json()
return result["similarity"] >= 0.99 # Это условная величина
async def main():
# Читаем векторы с диска
vectors = read_vectors_from_disk("vectors.bin")
results = []
for vector in vectors:
result = await check_vector(vector)
results.append(result)
accuracy = sum(results) / len(results) * 100
print(f"Accuracy: {accuracy}%")
Запустил. Понятно стало, что это будет работать долго, очень долго. 10 миллионов запросов по очереди — не самая быстрая идея.
Попытка 2: Пробую параллелить (asyncio.gather):
async def main():
vectors = read_vectors_from_disk("vectors.bin") # 10M векторов
tasks = [check_vector(v) for v in vectors]
results = await asyncio.gather(*tasks)
accuracy = sum(results) / len(results) * 100
print(f"Accuracy: {accuracy}%")
Первые минуты всё летало. Потом клиент начал тормозить. Через какое-то время (не помню сколько, но меньше 10 минут) — вообще завис. Смотрю в htop — память ж��ёт, CPU на 100%, файловые дескрипторы кончились.
Перезапускаю. Та же история.
Проблема была очевидна (это сейчас очевидно, на тот момент не очень очевидно было): я запустил 10 миллионов одновременных HTTP запросов. Мой клиент пытался:
Держать в памяти 10M корутин
Читать с диска и отправлять всё одновременно А их API просто не был рассчитан на такую нагрузку. Rate limiter срабатывал, запросы дропались, база не справлялась.
Что будет в статье
Сначала покажу, почему наивные подходы не работают (на примере моего реального факапа с 10М векторов). Потом — как asyncio.Semaphore решает проблему. И в конце — решение с обработкой ошибок, timeout-ами и retry.
В чём была проблема
Задача:
Прочитать 10 миллионов векторов с диска
Для каждого вектора отправить POST запрос на API
Проверить, что API вернул правильный результат
Посчитать accuracy
Попытка №1: Последовательно
import asyncio
import aiohttp
async def check_vector(session: aiohttp.ClientSession, vector: list[float]) -> bool:
"""Проверить один вектор через API."""
async with session.post(
"https://api.internal/search",
json={"vector": vector, "top_k": 1}
) as response:
result = await response.json()
return result["similarity"] >= 0.99
async def main():
vectors = load_vectors("vectors.bin") # 10M векторов
async with aiohttp.ClientSession() as session:
results = []
for vector in vectors:
result = await check_vector(session, vector) # ← Ждём каждый
results.append(result)
accuracy = sum(results) / len(results) * 100
print(f"Accuracy: {accuracy}%")
asyncio.run(main())
Проблема: Каждый запрос ждёт предыдущий. Если один запрос = 50ms, то:
10,000,000 запросов × 50ms = 500,000 секунд = 138 часов = 5.7 дней
Пять дней на тестирование? Не, спасибо
Попытка №2: Всё сразу (катастрофа)
async def main():
vectors = load_vectors("vectors.bin") # 10M векторов
async with aiohttp.ClientSession() as session:
# Создаём 10 миллионов задач СРАЗУ
tasks = [check_vector(session, v) for v in vectors]
results = await asyncio.gather(*tasks)
accuracy = sum(results) / len(results) * 100
print(f"Accuracy: {accuracy}%")
Что произошло:
На стороне клиента:
Python создал 10M корутин в памяти → жрёт 20+ GB RAM
Чтение с диска не успевает → I/O bottleneck
Программа зависла, потом упала с
MemoryError
На стороне API:
10M запросов прилетели за 30 секунд
Connection pool исчерпан (обычно 100-1000 соединений)
Rate limiter включился:
429 Too Many RequestsБаза данных под нагрузкой начала тормозить
Итог: положил и свой клиент, и их API, и базу данных. Тройное комбо!
Решение: Semaphore
После разбора полётов я полез в документацию asyncio искать "как ограничить количество одновременных запросов". Наткнулся на Semaphore.
Семафор — это по сути счётчик с двумя операциями:
acquire() — уменьшить счётчик (или ждать, если он уже 0)
release() — увеличить счётчик обратно
Создаёшь семафор с лимитом:
semaphore = asyncio.Semaphore(100) # Максимум 100 одновременных операций
Теперь у тебя есть "100 слотов". Захватил слот — работаешь. Освободил — следующая задача может начать.
Исправленный код
import asyncio
import aiohttp
from typing import List
async def check_vector(
session: aiohttp.ClientSession,
vector: list[float],
semaphore: asyncio.Semaphore
) -> bool:
"""Проверить вектор с rate limiting."""
async with semaphore:
async with session.post(
"https://api.internal/search",
json={"vector": vector, "top_k": 1}
) as response:
result = await response.json()
return result["similarity"] >= 0.99
async def main():
vectors = load_vectors("vectors.bin")
semaphore = asyncio.Semaphore(100)
async with aiohttp.ClientSession() as session:
# Создаём все задачи (но они будут ждать слота)
tasks = [
check_vector(session, vector, semaphore)
for vector in vectors
]
results = await asyncio.gather(*tasks)
accuracy = sum(results) / len(results) * 100
print(f"Accuracy: {accuracy}%")
asyncio.run(main())
Что изменилось?
Было:
Все 10M задач стартуют одновременно
Память взрывается, API падает
Стало:
Создаются все 10M задач (это дёшево, просто корутины)
Но работают только 100 одновременно
Как только одна завершилась → следующая хватает слот
Результат:
Время выполнения: ~14 часов (вместо 5 дней или краша)
Память клиента: стабильные 500 MB (вместо 20+ GB)
API живой и счастливый
Accuracy: 99.97%
Как это работает внутри
async with semaphore: # Попытка захватить слот
# Если есть свободные слоты (счётчик > 0):
# - Уменьшаем счётчик
# - Входим в блок
# Если слотов нет (счётчик = 0):
# - Ждём в очереди
# После выхода из блока:
# - Увеличиваем счётчик
# - Следующая задача из очереди может войти
await do_work()
Примерная timeline:
t=0s: Задачи 1-100 захватили слоты, начали работу
t=50ms: Задача 1 завершилась → Задача 101 начала работу
t=51ms: Задача 2 завершилась → Задача 102 начала работу
... Всегда 100 активных задач
Подбор лимита
Как выбрать число для Semaphore(N)? Зависит от:
-
Возможности API ��� сколько он может обработать?
У нас API мог ~500 req/sec
При 50ms на запрос: 100 concurrent = ~2000 req/sec
Значит 100 — много, снизил до 50
-
Скорость ответа — сколько времени API отвечает?
Быстрые ответы (10-50ms) → можно больше concurrent
Медленные (500ms+) → меньше concurrent, иначе задачи накапливаются
В итоге я остановился на 50 одновременных запросов:
semaphore = asyncio.Semaphore(50)
API не перегружался, клиент работал стабильно, коллеги были довольны
Production паттерны
После всех приключений я понял: копипастить async with semaphore в каждую функцию — плохая идея. Код превращается в кашу, и когда нужно поменять логику (например, добавить retry), приходится лезть в 20 мест.
Поэтому я завёл себе класс-обёртку. Выглядит избыточно, но когда в проекте 50+ мест с HTTP запросами — это спасение.
Вариант 1: Простая обёртка
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
class RateLimitedExecutor:
"""Делает rate limiting за вас."""
def __init__(self, max_concurrent: int = 10):
self._semaphore = asyncio.Semaphore(max_concurrent)
async def execute(
self,
func: Callable[[str], Awaitable[T]],
items: list[str],
) -> list[T]:
"""Запустить функцию для всех items с rate limiting."""
tasks = [
self._run_with_limit(func, item)
for item in items
]
return await asyncio.gather(*tasks)
async def _run_with_limit(self, func, item):
async with self._semaphore:
return await func(item)
# Использование простое:
executor = RateLimitedExecutor(max_concurrent=10)
results = await executor.execute(fetch_data, urls)
Уже лучше! Но есть проблема: если одна задача упадёт, gather() по умолчанию завалит всё остальное.
Вариант 2: С обработкой ошибок
from dataclasses import dataclass
@dataclass(frozen=True)
class Result:
"""Либо value, либо error."""
url: str
value: dict | None = None
error: Exception | None = None
@property
def is_ok(self) -> bool:
return self.error is None
class RateLimitedExecutor:
# ... __init__ тот же ...
async def execute(self, func, items) -> list[Result]:
tasks = [self._run_with_limit(func, item) for item in items]
# return_exceptions=True — не валим всё если одна упала
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
self._wrap_result(item, result)
for item, result in zip(items, results)
]
def _wrap_result(self, url: str, result) -> Result:
if isinstance(result, Exception):
return Result(url=url, error=result)
return Result(url=url, value=result)
# Теперь можно обрабатывать отдельно успехи и фейлы
results = await executor.execute(fetch_data, urls)
successful = [r.value for r in results if r.is_ok]
failed = [r for r in results if not r.is_ok]
print(f"OK: {len(successful)}, Failed: {len(failed)}")
# Посмотреть первые 5 ошибок
for failure in failed[:5]:
print(f"{failure.url}: {failure.error}")
Это реально удобно! Особенно когда обрабатываешь тысячи запросов и 1-2% могут упасть по таймауту или 500-кой — не хочется из-за них терять остальные 98%.
Timeout'ы и retry (потому что всё ломается)
Семафор решает проблему с количеством запросов, но не защищает от другой беды — зависших соединений.
История из жизни: один из наших внешних API иногда просто... виснет. Запрос отправляется, но ответа нет. И нет. И нет. Минута проходит, две, пять... А код терпеливо ждёт. В итоге все 10 слотов семафора заняты зависшими запросами, и новые встают в очередь.
Добавляем timeout
Решение простое — timeout на каждый запрос:
class RateLimitedExecutor:
def __init__(
self,
max_concurrent: int = 10,
timeout_seconds: float = 5.0
):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._timeout = timeout_seconds
async def _run_with_limit(self, func, item):
async with self._semaphore:
try:
async with asyncio.timeout(self._timeout):
return await func(item)
except TimeoutError:
print(f"Timeout for {item}")
raise
Теперь если запрос не ответил за 5 секунд — мы его убиваем и освобождаем слот для следующего.
А что насчёт retry?
Timeout решает проблему зависших запросов, но иногда хочется дать API второй шанс. Например, он может вернуть 503 (service unavailable) просто потому что перегружен на секунду.
Вот как я добавил retry с exponential backoff:
from dataclasses import dataclass
@dataclass
class RetryConfig:
max_retries: int = 3
timeout_seconds: float = 5.0
backoff_factor: float = 2.0
class RateLimitedExecutor:
def __init__(self, max_concurrent: int = 10, retry_config: RetryConfig = None):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._retry_config = retry_config or RetryConfig()
async def _run_with_limit(self, func, item):
async with self._semaphore:
return await self._try_with_retry(func, item)
async def _try_with_retry(self, func, item):
last_error = None
for attempt in range(self._retry_config.max_retries):
try:
async with asyncio.timeout(self._retry_config.timeout_seconds):
result = await func(item)
if attempt > 0:
print(f"Успех со {attempt + 1}-й попытки: {item}")
return result
except (TimeoutError, ConnectionError) as e:
last_error = e
print(f"Попытка {attempt + 1} провалилась: {item}")
# Если это не последняя попытка — ждём
if attempt < self._retry_config.max_retries - 1:
pause = self._retry_config.backoff_factor ** attempt
await asyncio.sleep(pause)
print(f"Все {self._retry_config.max_retries} попытки провалились: {item}")
raise last_error
Exponential backoff — это когда паузы между попытками растут: 1 сек, 2 сек, 4 сек, 8 сек... Идея в том, что если сервис перегружен, мы даём ему время восстановиться, а не долбим с прежней скоростью.
Использование:
executor = RateLimitedExecutor(
max_concurrent=10,
retry_config=RetryConfig(
max_retries=3,
timeout_seconds=5.0,
backoff_factor=2.0,
)
)
results = await executor.execute(fetch_data, urls)
После добавления retry количество успешных запросов у меня выросло процентов на 5-7. Оказалось, многие фейлы были временными.
Bonus: Что ещё можно улучшить
После того как основная задача была решена, я добавил ещё пару улучшений:
1. Progress bar
from tqdm.asyncio import tqdm
# Вместо обычного gather
results = await tqdm.gather(*tasks, desc="Checking vectors")
Теперь я вижу прогресс выполнения в реальном времени. Психологически легче, когда знаешь, что осталось 2 часа, а не "хз сколько".
2. Батчирование чтения с диска
def load_vectors_lazy(filename: str, batch_size: int = 10000):
"""Читать вектора батчами вместо все сразу."""
with open(filename, 'rb') as f:
while batch := read_batch(f, batch_size):
yield batch
# Обрабатываем по 10k векторов за раз
for batch in load_vectors_lazy("vectors.bin", batch_size=10000):
results = await checker.check_all(batch)
save_results(results)
Так не нужно держать все 10M векторов в памяти одновременно.
3. Graceful shutdown
import signal
shutdown_event = asyncio.Event()
def signal_handler(signum, frame):
print("Получен сигнал остановки, завершаю текущие задачи...")
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# В основном цикле
for batch in batches:
if shutdown_event.is_set():
print("Остановка по сигналу")
break
results = await process_batch(batch)
Теперь можно прервать по Ctrl+C без потери данных — все текущие запросы завершатся корректно.
Заключение: что я вынес из этой истории
В итоге, после всех переделок, финальный код выглядел так:
class VectorChecker:
def __init__(self, api_url: str, max_concurrent: int = 50):
self.api_url = api_url
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: aiohttp.ClientSession | None = None
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=30, connect=5)
self.session = aiohttp.ClientSession(timeout=timeout)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def check_vector(self, vector: list[float]) -> Result:
"""Проверить один вектор с retry и timeout."""
async with self.semaphore:
for attempt in range(3):
try:
async with self.session.post(
f"{self.api_url}/search",
json={"vector": vector, "top_k": 1}
) as response:
if response.status == 200:
data = await response.json()
return Result(
success=True,
similarity=data["similarity"]
)
elif response.status == 429:
# Rate limit — подождём
await asyncio.sleep(2 ** attempt)
continue
else:
return Result(success=False, error=f"HTTP {response.status}")
except asyncio.TimeoutError:
if attempt == 2:
return Result(success=False, error="Timeout")
await asyncio.sleep(1)
return Result(success=False, error="Max retries exceeded")
async def check_all(self, vectors: list[list[float]]) -> dict:
"""Проверить все вектора и вернуть статистику."""
tasks = [self.check_vector(v) for v in vectors]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
return {
"total": len(results),
"successful": len(successful),
"failed": len(failed),
"accuracy": len(successful) / len(results) * 100,
"avg_similarity": sum(r.similarity for r in successful) / len(successful) if successful else 0
}
# Использование
async def main():
vectors = load_vectors("vectors.bin") # 10M векторов
async with VectorChecker("https://api.internal", max_concurrent=50) as checker:
stats = await checker.check_all(vectors)
print(f"""
Результаты тестирования:
Всего проверено: {stats['total']:,}
Успешно: {stats['successful']:,}
Ошибок: {stats['failed']:,}
Accuracy: {stats['accuracy']:.2f}%
Средняя схожесть: {stats['avg_similarity']:.4f}
""")
asyncio.run(main())
Финальные метрики
Метрика |
До оптимизации |
После оптимизации |
|---|---|---|
Время выполнения |
∞ (краш) |
~12 часов |
Память клиента |
20+ GB → краш |
500 MB (стабильно) |
Нагрузка на API |
10M req/30s → краш |
50 req/sec (стабильно) |
Accuracy |
N/A |
99.97% |
Успешных запросов |
0% (всё упало) |
99.95% |
Основные выводы
-
Semaphore — это must have для любых массовых I/O операций
Защищает и твой код, и внешние сервисы
Простой в использовании, эффективный
-
Добавляй timeout везде
Один зависший запрос блокирует слот навсегда
asyncio.timeout()илиasyncio.wait_for()
-
Retry решает 90% временных проблем
API иногда отдаёт 429 или 503 — это нормально
Exponential backoff даёт API время восстановиться
-
Result type вместо exceptions
Один упавший запрос не должен ронять всё
Собирай статистику: сколько успешных, сколько failed, какие ошибки
-
Подбирай concurrency лимит экспериментально
Начни с малого (10-50)
Смотри на метрики API и клиента
Увеличивай постепенно
Чек-лист перед production
✅ Используешь async with для Semaphore
✅ Добавлен timeout на каждый I/O вызов
✅ Реализован retry для временных ошибок
✅ Errors обрабатываются через Result type
✅ Есть логирование и метрики
✅ Протестировано на реальной нагрузке
✅ Подобран оптимальный concurrency лимит
P.S. Про память
Кстати, после всех оптимизаций с семафором я заметил, что клиент всё ещё жрёт ~5 GB RAM. Оказалось, это все вектора, загруженные разом через numpy.fromfile():
# Так делал я (плохо):
vectors = np.fromfile("vectors.bin", dtype=np.float32)
vectors = vectors.reshape(-1, 128) # 10M × 128 × 4 bytes = 5.12 GB
tasks = [check_vector(session, v) for v in vectors]
Решение оказалось простым — читать батчами:
def load_vectors_in_batches(filename: str, batch_size: int = 10000):
"""Читать вектора батчами по 10k."""
with open(filename, 'rb') as f:
while True:
chunk = np.fromfile(f, dtype=np.float32, count=batch_size * 128)
if chunk.size == 0:
break
yield chunk.reshape(-1, 128)
async def main():
async with VectorChecker(api_url, max_concurrent=50) as checker:
for batch in load_vectors_in_batches("vectors.bin", batch_size=10000):
stats = await checker.check_all(batch)
print(f"Batch: {stats['successful']}/{len(batch)} OK")
Результат:
Память клиента: с 5 GB → 50 MB
Можно прервать и продолжить с нужного батча
Можно сохранять промежуточные результаты
outlingo
... Господь, жги!
Такие задачи решаются не через семафоры и 10 миллионов корутин, а через воркеры и очередь заданий
Khachatur86 Автор
Согласен! Producer-Consumer — правильное решение (код в P.S. секции). Но статья про семафор как инструмент rate limiting. Показал путь от gather → semaphore → queue, чтобы понятно было зачем каждый шаг. 10M корутин — для демонстрации проблемы