Привет!

Предыстория

Была у меня задача: проверить качество работы API для поиска похожих векторов. Штука довольно стандартная для ML-систем — на вход подаёшь 128-мерный вектор (например, эмбеддинг изображения или текста), API ищет в базе ближайшие вектора и возвращает их.
Тестовый датасет: 10 миллионов векторов. Задача: для каждого вектора проверить, что API правильно находит его же (similarity должна быть 100%).

Коллеги написали API, я написал клиент для тестирования. Что может пойти не так?

Попытка 1: Тупо в лоб:

async def check_vector(vector: list[float]) -> bool:
    """Проверить один вектор."""
    response = await client.post(
        "http://api.internal/search",
        json={"vector": vector, "top_k": 1}
    )
    result = response.json()
    return result["similarity"] >= 0.99 # Это условная величина

async def main():
    # Читаем векторы с диска
    vectors = read_vectors_from_disk("vectors.bin")
    
    results = []
    for vector in vectors:
        result = await check_vector(vector)
        results.append(result)
    
    accuracy = sum(results) / len(results) * 100
    print(f"Accuracy: {accuracy}%")

Запустил. Понятно стало, что это будет работать долго, очень долго. 10 миллионов запросов по очереди — не самая быстрая идея.

Попытка 2: Пробую параллелить (asyncio.gather):

async def main():
    vectors = read_vectors_from_disk("vectors.bin")  # 10M векторов
    
    tasks = [check_vector(v) for v in vectors]
    results = await asyncio.gather(*tasks)
    
    accuracy = sum(results) / len(results) * 100
    print(f"Accuracy: {accuracy}%")

Первые минуты всё летало. Потом клиент начал тормозить. Через какое-то время (не помню сколько, но меньше 10 минут) — вообще завис. Смотрю в htop — память ж��ёт, CPU на 100%, файловые дескрипторы кончились.
Перезапускаю. Та же история.

Проблема была очевидна (это сейчас очевидно, на тот момент не очень очевидно было): я запустил 10 миллионов одновременных HTTP запросов. Мой клиент пытался:

  • Держать в памяти 10M корутин

  • Читать с диска и отправлять всё одновременно А их API просто не был рассчитан на такую нагрузку. Rate limiter срабатывал, запросы дропались, база не справлялась.

Что будет в статье

Сначала покажу, почему наивные подходы не работают (на примере моего реального факапа с 10М векторов). Потом — как asyncio.Semaphore решает проблему. И в конце — решение с обработкой ошибок, timeout-ами и retry.


В чём была проблема

Задача:

  1. Прочитать 10 миллионов векторов с диска

  2. Для каждого вектора отправить POST запрос на API

  3. Проверить, что API вернул правильный результат

  4. Посчитать accuracy

Попытка №1: Последовательно

import asyncio
import aiohttp

async def check_vector(session: aiohttp.ClientSession, vector: list[float]) -> bool:
    """Проверить один вектор через API."""
    async with session.post(
        "https://api.internal/search",
        json={"vector": vector, "top_k": 1}
    ) as response:
        result = await response.json()
        return result["similarity"] >= 0.99

async def main():
    vectors = load_vectors("vectors.bin")  # 10M векторов
    
    async with aiohttp.ClientSession() as session:
        results = []
        for vector in vectors:
            result = await check_vector(session, vector)  # ← Ждём каждый
            results.append(result)
    
    accuracy = sum(results) / len(results) * 100
    print(f"Accuracy: {accuracy}%")

asyncio.run(main())

Проблема: Каждый запрос ждёт предыдущий. Если один запрос = 50ms, то:

10,000,000 запросов × 50ms = 500,000 секунд = 138 часов = 5.7 дней

Пять дней на тестирование? Не, спасибо

Попытка №2: Всё сразу (катастрофа)

async def main():
    vectors = load_vectors("vectors.bin")  # 10M векторов
    
    async with aiohttp.ClientSession() as session:
        # Создаём 10 миллионов задач СРАЗУ
        tasks = [check_vector(session, v) for v in vectors]
        results = await asyncio.gather(*tasks)
    
    accuracy = sum(results) / len(results) * 100
    print(f"Accuracy: {accuracy}%")

Что произошло:

На стороне клиента:

  • Python создал 10M корутин в памяти → жрёт 20+ GB RAM

  • Чтение с диска не успевает → I/O bottleneck

  • Программа зависла, потом упала с MemoryError

На стороне API:

  • 10M запросов прилетели за 30 секунд

  • Connection pool исчерпан (обычно 100-1000 соединений)

  • Rate limiter включился: 429 Too Many Requests

  • База данных под нагрузкой начала тормозить

Итог: положил и свой клиент, и их API, и базу данных. Тройное комбо!


Решение: Semaphore

После разбора полётов я полез в документацию asyncio искать "как ограничить количество одновременных запросов". Наткнулся на Semaphore.

Семафор — это по сути счётчик с двумя операциями:

  • acquire() — уменьшить счётчик (или ждать, если он уже 0)

  • release() — увеличить счётчик обратно

Создаёшь семафор с лимитом:

semaphore = asyncio.Semaphore(100)  # Максимум 100 одновременных операций

Теперь у тебя есть "100 слотов". Захватил слот — работаешь. Освободил — следующая задача может начать.

Исправленный код

import asyncio
import aiohttp
from typing import List

async def check_vector(
    session: aiohttp.ClientSession,
    vector: list[float],
    semaphore: asyncio.Semaphore
) -> bool:
    """Проверить вектор с rate limiting."""
    async with semaphore: 
        async with session.post(
            "https://api.internal/search",
            json={"vector": vector, "top_k": 1}
        ) as response:
            result = await response.json()
            return result["similarity"] >= 0.99

async def main():
    vectors = load_vectors("vectors.bin") 
    
    semaphore = asyncio.Semaphore(100)
    
    async with aiohttp.ClientSession() as session:
        # Создаём все задачи (но они будут ждать слота)
        tasks = [
            check_vector(session, vector, semaphore)
            for vector in vectors
        ]
        results = await asyncio.gather(*tasks)
    
    accuracy = sum(results) / len(results) * 100
    print(f"Accuracy: {accuracy}%")

asyncio.run(main())

Что изменилось?

Было:

  • Все 10M задач стартуют одновременно

  • Память взрывается, API падает

Стало:

  • Создаются все 10M задач (это дёшево, просто корутины)

  • Но работают только 100 одновременно

  • Как только одна завершилась → следующая хватает слот

Результат:

  • Время выполнения: ~14 часов (вместо 5 дней или краша)

  • Память клиента: стабильные 500 MB (вместо 20+ GB)

  • API живой и счастливый

  • Accuracy: 99.97%

Как это работает внутри

async with semaphore:  # Попытка захватить слот
    # Если есть свободные слоты (счётчик > 0):
    #   - Уменьшаем счётчик
    #   - Входим в блок
    # Если слотов нет (счётчик = 0):
    #   - Ждём в очереди
    # После выхода из блока:
    #   - Увеличиваем счётчик
    #   - Следующая задача из очереди может войти
    await do_work()

Примерная timeline:

t=0s:   Задачи 1-100 захватили слоты, начали работу
t=50ms: Задача 1 завершилась → Задача 101 начала работу
t=51ms: Задача 2 завершилась → Задача 102 начала работу
...     Всегда 100 активных задач

Подбор лимита

Как выбрать число для Semaphore(N)? Зависит от:

  1. Возможности API ��� сколько он может обработать?

    • У нас API мог ~500 req/sec

    • При 50ms на запрос: 100 concurrent = ~2000 req/sec

    • Значит 100 — много, снизил до 50

  2. Скорость ответа — сколько времени API отвечает?

    • Быстрые ответы (10-50ms) → можно больше concurrent

    • Медленные (500ms+) → меньше concurrent, иначе задачи накапливаются

В итоге я остановился на 50 одновременных запросов:

semaphore = asyncio.Semaphore(50)

API не перегружался, клиент работал стабильно, коллеги были довольны


Production паттерны

После всех приключений я понял: копипастить async with semaphore в каждую функцию — плохая идея. Код превращается в кашу, и когда нужно поменять логику (например, добавить retry), приходится лезть в 20 мест.

Поэтому я завёл себе класс-обёртку. Выглядит избыточно, но когда в проекте 50+ мест с HTTP запросами — это спасение.

Вариант 1: Простая обёртка

from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

class RateLimitedExecutor:
    """Делает rate limiting за вас."""
    
    def __init__(self, max_concurrent: int = 10):
        self._semaphore = asyncio.Semaphore(max_concurrent)
    
    async def execute(
        self,
        func: Callable[[str], Awaitable[T]],
        items: list[str],
    ) -> list[T]:
        """Запустить функцию для всех items с rate limiting."""
        tasks = [
            self._run_with_limit(func, item) 
            for item in items
        ]
        return await asyncio.gather(*tasks)
    
    async def _run_with_limit(self, func, item):
        async with self._semaphore:
            return await func(item)

# Использование простое:
executor = RateLimitedExecutor(max_concurrent=10)
results = await executor.execute(fetch_data, urls)

Уже лучше! Но есть проблема: если одна задача упадёт, gather() по умолчанию завалит всё остальное.

Вариант 2: С обработкой ошибок

from dataclasses import dataclass

@dataclass(frozen=True)
class Result:
    """Либо value, либо error."""
    url: str
    value: dict | None = None
    error: Exception | None = None
    
    @property
    def is_ok(self) -> bool:
        return self.error is None


class RateLimitedExecutor:
    # ... __init__ тот же ...
    
    async def execute(self, func, items) -> list[Result]:
        tasks = [self._run_with_limit(func, item) for item in items]
        
        # return_exceptions=True — не валим всё если одна упала
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [
            self._wrap_result(item, result)
            for item, result in zip(items, results)
        ]
    
    def _wrap_result(self, url: str, result) -> Result:
        if isinstance(result, Exception):
            return Result(url=url, error=result)
        return Result(url=url, value=result)


# Теперь можно обрабатывать отдельно успехи и фейлы
results = await executor.execute(fetch_data, urls)

successful = [r.value for r in results if r.is_ok]
failed = [r for r in results if not r.is_ok]

print(f"OK: {len(successful)}, Failed: {len(failed)}")

# Посмотреть первые 5 ошибок
for failure in failed[:5]:
    print(f"{failure.url}: {failure.error}")

Это реально удобно! Особенно когда обрабатываешь тысячи запросов и 1-2% могут упасть по таймауту или 500-кой — не хочется из-за них терять остальные 98%.

Timeout'ы и retry (потому что всё ломается)

Семафор решает проблему с количеством запросов, но не защищает от другой беды — зависших соединений.

История из жизни: один из наших внешних API иногда просто... виснет. Запрос отправляется, но ответа нет. И нет. И нет. Минута проходит, две, пять... А код терпеливо ждёт. В итоге все 10 слотов семафора заняты зависшими запросами, и новые встают в очередь.

Добавляем timeout

Решение простое — timeout на каждый запрос:

class RateLimitedExecutor:
    def __init__(
        self, 
        max_concurrent: int = 10,
        timeout_seconds: float = 5.0
    ):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._timeout = timeout_seconds
    
    async def _run_with_limit(self, func, item):
        async with self._semaphore:
            try:
                async with asyncio.timeout(self._timeout):
                    return await func(item)
            except TimeoutError:
                print(f"Timeout for {item}")
                raise

Теперь если запрос не ответил за 5 секунд — мы его убиваем и освобождаем слот для следующего.

А что насчёт retry?

Timeout решает проблему зависших запросов, но иногда хочется дать API второй шанс. Например, он может вернуть 503 (service unavailable) просто потому что перегружен на секунду.

Вот как я добавил retry с exponential backoff:

from dataclasses import dataclass

@dataclass
class RetryConfig:
    max_retries: int = 3
    timeout_seconds: float = 5.0
    backoff_factor: float = 2.0


class RateLimitedExecutor:
    def __init__(self, max_concurrent: int = 10, retry_config: RetryConfig = None):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._retry_config = retry_config or RetryConfig()
    
    async def _run_with_limit(self, func, item):
        async with self._semaphore:
            return await self._try_with_retry(func, item)
    
    async def _try_with_retry(self, func, item):
        last_error = None
        
        for attempt in range(self._retry_config.max_retries):
            try:
                async with asyncio.timeout(self._retry_config.timeout_seconds):
                    result = await func(item)
                    
                    if attempt > 0:
                        print(f"Успех со {attempt + 1}-й попытки: {item}")
                    
                    return result
                    
            except (TimeoutError, ConnectionError) as e:
                last_error = e
                print(f"Попытка {attempt + 1} провалилась: {item}")
                
                # Если это не последняя попытка — ждём
                if attempt < self._retry_config.max_retries - 1:
                    pause = self._retry_config.backoff_factor ** attempt
                    await asyncio.sleep(pause)
        
        print(f"Все {self._retry_config.max_retries} попытки провалились: {item}")
        raise last_error

Exponential backoff — это когда паузы между попытками растут: 1 сек, 2 сек, 4 сек, 8 сек... Идея в том, что если сервис перегружен, мы даём ему время восстановиться, а не долбим с прежней скоростью.

Использование:

executor = RateLimitedExecutor(
    max_concurrent=10,
    retry_config=RetryConfig(
        max_retries=3,
        timeout_seconds=5.0,
        backoff_factor=2.0,
    )
)

results = await executor.execute(fetch_data, urls)

После добавления retry количество успешных запросов у меня выросло процентов на 5-7. Оказалось, многие фейлы были временными.

Bonus: Что ещё можно улучшить

После того как основная задача была решена, я добавил ещё пару улучшений:

1. Progress bar

from tqdm.asyncio import tqdm

# Вместо обычного gather
results = await tqdm.gather(*tasks, desc="Checking vectors")

Теперь я вижу прогресс выполнения в реальном времени. Психологически легче, когда знаешь, что осталось 2 часа, а не "хз сколько".

2. Батчирование чтения с диска

def load_vectors_lazy(filename: str, batch_size: int = 10000):
    """Читать вектора батчами вместо все сразу."""
    with open(filename, 'rb') as f:
        while batch := read_batch(f, batch_size):
            yield batch

# Обрабатываем по 10k векторов за раз
for batch in load_vectors_lazy("vectors.bin", batch_size=10000):
    results = await checker.check_all(batch)
    save_results(results)

Так не нужно держать все 10M векторов в памяти одновременно.

3. Graceful shutdown

import signal

shutdown_event = asyncio.Event()

def signal_handler(signum, frame):
    print("Получен сигнал остановки, завершаю текущие задачи...")
    shutdown_event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# В основном цикле
for batch in batches:
    if shutdown_event.is_set():
        print("Остановка по сигналу")
        break
    results = await process_batch(batch)

Теперь можно прервать по Ctrl+C без потери данных — все текущие запросы завершатся корректно.


Заключение: что я вынес из этой истории

В итоге, после всех переделок, финальный код выглядел так:

class VectorChecker:
    def __init__(self, api_url: str, max_concurrent: int = 50):
        self.api_url = api_url
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session: aiohttp.ClientSession | None = None
    
    async def __aenter__(self):
        timeout = aiohttp.ClientTimeout(total=30, connect=5)
        self.session = aiohttp.ClientSession(timeout=timeout)
        return self
    
    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()
    
    async def check_vector(self, vector: list[float]) -> Result:
        """Проверить один вектор с retry и timeout."""
        async with self.semaphore:
            for attempt in range(3):
                try:
                    async with self.session.post(
                        f"{self.api_url}/search",
                        json={"vector": vector, "top_k": 1}
                    ) as response:
                        if response.status == 200:
                            data = await response.json()
                            return Result(
                                success=True,
                                similarity=data["similarity"]
                            )
                        elif response.status == 429:
                            # Rate limit — подождём
                            await asyncio.sleep(2 ** attempt)
                            continue
                        else:
                            return Result(success=False, error=f"HTTP {response.status}")
                
                except asyncio.TimeoutError:
                    if attempt == 2:
                        return Result(success=False, error="Timeout")
                    await asyncio.sleep(1)
        
        return Result(success=False, error="Max retries exceeded")
    
    async def check_all(self, vectors: list[list[float]]) -> dict:
        """Проверить все вектора и вернуть статистику."""
        tasks = [self.check_vector(v) for v in vectors]
        results = await asyncio.gather(*tasks)
        
        successful = [r for r in results if r.success]
        failed = [r for r in results if not r.success]
        
        return {
            "total": len(results),
            "successful": len(successful),
            "failed": len(failed),
            "accuracy": len(successful) / len(results) * 100,
            "avg_similarity": sum(r.similarity for r in successful) / len(successful) if successful else 0
        }


# Использование
async def main():
    vectors = load_vectors("vectors.bin")  # 10M векторов
    
    async with VectorChecker("https://api.internal", max_concurrent=50) as checker:
        stats = await checker.check_all(vectors)
    
    print(f"""
    Результаты тестирования:
    Всего проверено: {stats['total']:,}
    Успешно: {stats['successful']:,}
    Ошибок: {stats['failed']:,}
    Accuracy: {stats['accuracy']:.2f}%
    Средняя схожесть: {stats['avg_similarity']:.4f}
    """)

asyncio.run(main())

Финальные метрики

Метрика

До оптимизации

После оптимизации

Время выполнения

∞ (краш)

~12 часов

Память клиента

20+ GB → краш

500 MB (стабильно)

Нагрузка на API

10M req/30s → краш

50 req/sec (стабильно)

Accuracy

N/A

99.97%

Успешных запросов

0% (всё упало)

99.95%

Основные выводы

  1. Semaphore — это must have для любых массовых I/O операций

    • Защищает и твой код, и внешние сервисы

    • Простой в использовании, эффективный

  2. Добавляй timeout везде

    • Один зависший запрос блокирует слот навсегда

    • asyncio.timeout() или asyncio.wait_for()

  3. Retry решает 90% временных проблем

    • API иногда отдаёт 429 или 503 — это нормально

    • Exponential backoff даёт API время восстановиться

  4. Result type вместо exceptions

    • Один упавший запрос не должен ронять всё

    • Собирай статистику: сколько успешных, сколько failed, какие ошибки

  5. Подбирай concurrency лимит экспериментально

    • Начни с малого (10-50)

    • Смотри на метрики API и клиента

    • Увеличивай постепенно

Чек-лист перед production

✅ Используешь async with для Semaphore
✅ Добавлен timeout на каждый I/O вызов
✅ Реализован retry для временных ошибок
✅ Errors обрабатываются через Result type
✅ Есть логирование и метрики
✅ Протестировано на реальной нагрузке
✅ Подобран оптимальный concurrency лимит

P.S. Про память

Кстати, после всех оптимизаций с семафором я заметил, что клиент всё ещё жрёт ~5 GB RAM. Оказалось, это все вектора, загруженные разом через numpy.fromfile():

# Так делал я (плохо):
vectors = np.fromfile("vectors.bin", dtype=np.float32)
vectors = vectors.reshape(-1, 128)  # 10M × 128 × 4 bytes = 5.12 GB

tasks = [check_vector(session, v) for v in vectors] 

Решение оказалось простым — читать батчами:

def load_vectors_in_batches(filename: str, batch_size: int = 10000):
    """Читать вектора батчами по 10k."""
    with open(filename, 'rb') as f:
        while True:
            chunk = np.fromfile(f, dtype=np.float32, count=batch_size * 128)
            if chunk.size == 0:
                break
            yield chunk.reshape(-1, 128)


async def main():
    async with VectorChecker(api_url, max_concurrent=50) as checker:
        for batch in load_vectors_in_batches("vectors.bin", batch_size=10000):
            stats = await checker.check_all(batch)
            print(f"Batch: {stats['successful']}/{len(batch)} OK")

Результат:

  • Память клиента: с 5 GB → 50 MB

  • Можно прервать и продолжить с нужного батча

  • Можно сохранять промежуточные результаты

Для ещё более продвинутого подхода можно использовать Producer-Consumer паттерн с asyncio.Queue — producer читает с диска, consumers обрабатывают через API, очередь автоматически балансирует нагрузку (backpressure). Но это уже тема для отдельной статьи!

Полезные ссылки

Комментарии (2)


  1. outlingo
    06.11.2025 21:02

    Создаются все 10M задач (это дёшево, просто корутины)

    ... Господь, жги!

    Такие задачи решаются не через семафоры и 10 миллионов корутин, а через воркеры и очередь заданий


    1. Khachatur86 Автор
      06.11.2025 21:02

      Согласен! Producer-Consumer — правильное решение (код в P.S. секции). Но статья про семафор как инструмент rate limiting. Показал путь от gather → semaphore → queue, чтобы понятно было зачем каждый шаг. 10M корутин — для демонстрации проблемы