Привет! Я Андрей Сташок, бэкенд-разработчик в KTS. В этой статье я расскажу о запуске параллельных тестов через pytest-xdist.

Почему это важно?

Объясню на нашем примере. При разработке продуктов мы постоянно выполняем юнит-тестирование. Раньше мы проверяли все последовательно, и с расширением тестовой базы время проведения испытаний заметно возрастало. Распараллеливание через pytest-xdist помогло нам сильно ускориться, и сегодня я хочу поделиться этим трюком с вами.

Я расскажу, как запускать параллельные тесты для реляционной БД PostgreSQL (с драйверами asyncpg и psycopg2) и key-value БД Redis. Для подключения к реляционной БД мы будем использовать SQLAlchemy, а для Redis — библиотеку redis. Кроме того, я рассмотрю, как автоматизировать выполнение миграций при каждом запуске тестов с использованием alembic.

Оглавление

Введение

Для начала стоит оговориться, что и на Хабре, и за его пределами уже существуют похожие статьи. Например:

Однако эти материалы либо не касаются Python, либо не рассматривают работу с базой данных. В своей статье я собрал все, что удалось найти на эту тему, и изложил в прикладном формате. Репозиторий с примером кода доступен по ссылке.

Итак, давайте разберемся, для чего мы вообще пишем тесты и зачем нам их параллельный запуск.

Главная причина написания юнит-тестов — повышение качества разработки и ее ускорение в долгосрочной перспективе за счет быстрой обратной связи. Соответственно, при написании кода мы часто стараемся по максимуму обложить его автотестами, ведь чем больше кейсов протестировано, тем меньше вероятность выкатить баг.

Но с ростом проекта тестовая база растет, а желание прогонять тесты для каждого небольшого изменения пропорционально падает. И что же делать, когда ваши тесты гонятся уже 20 минут? Именно с такой проблемой мы и столкнулись.

Вот пример с реального проекта:

Запуск тестов последовательно
Запуск тестов последовательно

Кстати, время — это не единственная проблема. Миграции в БД тоже заметно сбавляли наш энтузиазм к запуску тестов. Нам хотелось, чтобы тесты запускались одной командой, а переключение между ветками не приводило к необходимости перенакатывать миграции вручную.

Нам удалось решить обе проблемы, и в этой статье я поделюсь инструкцией, основанной на нашем решении.

Инструкция исходит из того, что у нас уже есть проект с большой тестовой базой, и для тестирования в нем используется pytest. Наша задача — минимальными усилиями ускорить тесты и настроить автоматическое приведение базы в соответствие с текущими миграциями. Работать мы будем с репозиторием. Весь код из статьи можно найти там.

Автоматизация миграций

Начнем с автоматизации миграций, так как это наиболее простая задача, и ее решение пригодится нам в дальнейшем.

Первым делом стоит откатить все миграции, имеющиеся в базе, очистить базу, если там что-то есть, и накатить актуальное состояние. После окончания тестов откатить миграции снова очистив базу.

Последовательность работы с миграциями
Последовательность работы с миграциями

Таким образом база всегда будет оставаться пустой, и при переходе между ветками у нас не возникнет проблем с неактуальностью ее состояния.

Поскольку для тестирования мы используем pytest, логично будет выполнять запуск миграций из фикстуры с scope=”session”. Это значит, что фикстура запустится один раз при запуске тестов с флагом autouse=True, который заставит pytest автоматически запускать фикстуру перед тестами без явного ее указания. Ниже представлена такая фикстура:

@pytest.fixture(autouse=True, scope="session")
def _migrations():
    import alembic.config
    
    alembic.config.main(["downgrade", "base"])
    alembic.config.main(["upgrade", "head"])
    
    yield
    alembic.config.main(["downgrade", "base"])

На всякий случай напомню, что pytest выполняет код фикстуры до yield, затем передает управление в тест, а после завершения теста продолжает выполнение фикстуры до return.

Запустим тесты pytest -s (флаг s нужен для вывода в stdout). Видим, что наши миграции успешно накатились и откатились (рис 4).

Запуск тестов с автоматическими миграциями
Запуск тестов с автоматическими миграциями

Запуск pytest-xdist

Вторая проблема, которую нужно решить — долгое выполнение последовательных тестов. Очевидно, решить ее можно путем параллельного запуска.

И тут нам на помощь приходит pytest-xdist. На данный момент это единственная поддерживаемая библиотека для параллельного запуска тестов. Она позволяет запустить тесты в отдельных процессах; таким образом, с ее помощью мы можем ускорить тесты в n раз, где n — количество доступных ядер на машине.

Не буду пересказывать документацию, а просто выделю несколько важных для понимания моментов о pytest-xdist:

  • pytest-xdist расширяет возможности pytest. С помощью опции -n можно указать, в скольких процессах следует запустить тесты;

  • мастер-процесс порождает процессы обработчиков, которым распределяет тесты;

  • фикстура worker_id возвращает идентификатор текущего рабочего процесса (gw0, gw1 и т. д.) или master. Если тесты не распределены (например, пройден -n0 или вообще не передан -n), то также возвращается master;

  • фикстуры с пометкой scope="session" запускаются один раз для каждого воркера, поднимаемого xdist.

Установим pytest-xdist:

pip install pytest-xdist

Казалось бы, все готово. Но попытка запустить тесты в лоб с большой вероятностью приведет к ошибкам, как на скриншоте ниже.

Запуск тестов без подготовки
Запуск тестов без подготовки

Откуда берутся эти ошибки? Представьте, что у вас есть два теста, тестирующие одну и ту же функцию. Оба теста запускаются параллельно и оба вызывают функцию add_value_to_db.

Пример тестируемой функции:

async def add_value_to_db(value: str) -> ExampleModel:
    async with global_db.sessionmaker() as session:
        stmt = sa.insert(ExampleModel).values(my_value=value).returning(ExampleModel)
        result = await session.scalar(stmt)
        await session.commit()
    return result

После ее выполнения оба теста проверяют содержимое базы и обнаруживают, что в базе лежит 2 записи, хотя функция должна была создать одну. А если поле my_value уникальное, то функция add_value_to_db в одном из тестов вообще упадет с ошибкой. Таких примеров можно придумать множество.

Если тесты используют базу, то попытка запустить их в разных процессах может вызвать конфликт доступа к данным. Таким образом, у нас нарушается изоляция тестов: одни тесты становятся зависимыми от поведения других.

Способы поддержания изоляции тестов для реляционной БД

Как же нам избежать состояния гонки и соблюсти изоляцию тестов? Существует несколько способов это сделать:

  • использовать моки на запросы в базу;

  • использовать транзакции и не коммитить изменения в базу;

  • использовать in-memory db для каждого процесса;

  • использовать разные базы в разных процессах;

  • использовать разные схемы БД в разных процессах. 

Преимущества и недостатки всех этих вариантов уже рассмотрены в другой статье, поэтому не буду повторяться. Остановлюсь чуть подробнее только на варианте с разными схемами БД в разных процессах, поскольку его мы и будем использовать.

Стоит заметить, что схемы есть не во всех базах данных: например, MySQL их не поддерживает. Однако мы работаем с PostgreSQL.

Коротко о схемах: база данных содержит одну или несколько именованных схем, которые, в свою очередь, содержат таблицы. Схемы также содержат именованные объекты других видов, включая типы данных, функции и операторы. Одно и то же имя объекта можно свободно использовать в разных схемах: например, и schema1, и myschema могут содержать таблицы с именем mytable. Подробнее о схемах в PostgreSQL можно почитать в документации.

У создания схем для каждого процесса есть следующие преимущества:

  • код в тестах будет взаимодействовать с базой идентично тому, как он будет делать это в работающем приложении, чего нельзя достичь в случае с моками или in-memory db;

  • мы можем дописать несколько фикстур для создания и использования схемы, и все заработает. Не нужно будет вносить изменения в код и тесты.
    К слову, такого результата не получится достичь путем использования транзакций. Особую проблему при работе с транзакциями вызывает работа с несколькими сессиями или подключениями;

  • схемы удобно агрегируются внутри одной БД и сами по себе являются более легковесными сущностями, чем БД. Таким образом, схемами мы не засоряем пространство баз данных.

Запуск тестов в процессах с разными схемами

Попробуем запустить тесты в разных схемах для разных БД. Ниже представлен жизненный цикл воркера, т.е. последовательность действий, которую воркер выполняет при работе.

Изображение выглядит как текст, диаграмма, линия, снимок экрана  Контент, сгенерированный ИИ, может содержать ошибки.
Жизненный цикл воркера

Создание схемы в БД

Напишем фикстуру, которая будет возвращать имя текущей схемы и записывать его в переменную окружения. При этом мы хотим сохранить возможность синхронного запуска тестов, поэтому для master укажем схему public.

Фикстура имени схемы:

@pytest.fixture(autouse=True, scope="session")
def schema_name(worker_id) -> str:
    """Возвращает уникальное имя схемы для каждого процесса pytest."""
    if worker_id == "master":
        schema_name = "public"
    else:
        schema_name = f"test_schema_{worker_id}"
    os.environ["SCHEMA_NAME"] = schema_name
    return schema_name

где autouse=True, scope="session” — автоматический запуск фикстуры при запуске воркера.

Обратите внимание: мы сохраняем переменную окружения SCHEMA_NAME. Нам это пригодится, когда мы будем накатывать миграции.

Далее напишем фикстуру, которая создаст схемы в БД и после завершения всех тестов удалит схему.

Фикстура создания схемы:

@pytest.fixture(autouse=True, scope="session")
async def addschema(schema_name: str):
    if schema_name == "public":
        yield
        return
    engine = create_async_engine(f"postgresql+asyncpg://{os.getenv("DB_URL")}",)
    async with engine.begin() as conn:
        await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name};"))
    yield
    async with engine.begin() as conn:
        await conn.execute(text(f"DROP SCHEMA {schema_name} CASCADE;"))

Накатка миграций

Немного перепишем функцию, которую мы ранее писали для миграций. Основное отличие заключается в том, что теперь мы не будем откатывать миграции, т. к. смысла в этом нет: мы получаем новую схему, которую все равно удалим в конце.

Фикстура вызова alembic:

@pytest.fixture(autouse=True, scope="session")
def migrations(schemaname: str, addschema):
    import alembic.config
    if schema_name == "public":
        alembic.config.main(["downgrade", "base"])
    alembic.config.main(["upgrade", "head"])
    yield
    if schema_name == "public":
        alembic.config.main(["downgrade", "base"])

Также нам нужно изменить код в файле env.py, который alembic генерирует при инициализации. Сам файл отвечает за подключение к БД при накатке миграций.

В сниппете ниже показан измененный код функции из env.py. В движок мы передаем дополнительный аргумент connect_args={"options": f"-c search_path={os.getenv("SCHEMA_NAME", "public")}"}. Это ключевой момент: благодаря этому параметру движок сможет создавать подключение сразу к нужной схеме.

Заметьте, как раз тут мы и используем переменную окружения SCHEMA_NAME. Мы не можем передать имя схемы в файл env.py напрямую из фикстуры, поэтому пользуемся другим способом — складываем имя схемы в переменную окружения. 

Альтернативный способ задать схему для подключения — выполнить SQL-команду SET search_path = my_schema сразу после установления соединения.

Важно оговориться, что переданные аргументы применимы для psycopg2, для других драйверов они могут отличаться. Далее в этой статье я также приведу аргументы для asyncpg.

Функция выполнения миграций:

config.set_main_option("sqlalchemy.url", f"postgresql+psycopg2://{os.getenv("DB_URL")}")

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
        connect_args={
            "options": f"-c search_path={os.getenv("SCHEMA_NAME", "public")}"
        },
    )
    
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        
        with context.begin_transaction():
            context.run_migrations()

Еще момент: если вы используете какие-то расширения (extension), то при разделении БД на схемы может возникнуть проблема — PostgreSQL поставит расширение только в одну из схем. Если вам важно учитывать расширения в тестах, я советую выделить отдельную схему в БД и установить расширения туда, а затем через запятую указывать ее в search_path. Подробнее о проблеме можно почитать тут.

Подключение к нужной схеме

Чтобы заменить стандартное подключение при тестах на подключение к нужной схеме, нам нужно понимать, где в нашем приложении происходит подключение к БД. Как вариант, это можно сделать путем подмены конфигов, здесь все зависит от конкретной реализации.

Напишем фикстуру, которая подменяет движок подключений к БД. Тут мы тоже передаем в движок дополнительный параметр connect_args={"server_settings": {"search_path": schema_name}}, только на этот раз мы используем драйвер asyncpg.

Фикстура подмены движка:

@pytest.fixture(autouse=True)
async def _pg_connect_to_schema(schema_name: str, _add_schema):
    global_db.engine = create_async_engine(
        f"postgresql+asyncpg://{os.getenv("DB_URL")}",
        connect_args={
            "server_settings": {"search_path": schema_name}
        },  # Указываем схему к которой хотим подключиться
    )
    global_db.sessionmaker = async_sessionmaker(
        global_db.engine, expire_on_commit=False
    )
    yield
    await global_db.engine.dispose()

Здесь мы передаем в фикстуру аргумент _add_schema: это необходимо для того, чтобы _pg_connect_to_schema выполнилась строго после _add_schema.

Очистка базы

После каждого теста база данных очищается — из всех таблиц удаляются данные с помощью команды truncate. Фикстура, которая это делает, запускается после каждого отдельного теста, потому что мы не указываем параметр scope.

Очистка базы:

@pytest.fixture(autouse=True)
async def truncateall(
    schema_name: str, pg_sessionmaker: async_sessionmaker[AsyncSession]
):
    yield
    async with pg_sessionmaker() as pg_session:
        except_tables = ("alembic_version",)
        res = list(
            await pg_session.scalars(
                text(
                    "select tablename from pg_catalog.pg_tables where schemaname = '{}' {};".format(
                        schema_name,
                        "".join(
                            [f" and tablename != '{table}'" for table in except_tables]
                        ),
                    )
                )
            )
        )
        if not res:
            return
        await pg_session.execute(
            text(
                "truncate table {}".format(", ".join(f"{schema_name}.{r}" for r in res))
            )
        )
        await pg_session.commit()

Теперь попробуем запустить тесты pytest -n auto:

Запуск тестов в разных схемах
Запуск тестов в разных схемах

Обратите внимание: мы не обязаны запускать тесты параллельно. Мы можем запустить их и последовательно, и нам за это ничего не будет. Только выполняться они будут дольше.

Последовательный запуск тестов
Последовательный запуск тестов

Способы поддержания изоляции тестов для Redis

Теперь попробуем сделать то же самое, но уже для Redis. Для этой БД есть два способа сохранить изоляцию тестов:

  • Использовать уникальные префиксы для каждого процесса тестов, т. е. записывать ключи не key, а f”{worker_id}_{key}”.

    Важно учитывать, что из коробки библиотека Redis не позволяет указывать префиксы для ключей. Чтобы использовать этот способ, придется писать собственную обертку над базовой библиотекой.

  • Использовать разные базы данных для разных процессов. Redis позволяет это сделать внутри одного экземпляра.

    У такого способа тоже есть ограничение: по умолчанию Redis имеет 16 баз, то есть дефолтная конфигурация не позволит запустить более 16 процессов с тестами. Чтобы запустить больше процессов, нужно отредактировать поле databases в файле redis.conf и перезапустить Redis с новым конфигом. Для Docker Compose это будет выглядеть так:

    entrypoint: ["/bin/sh", "-c", "echo 'databases 16' > /tmp/redis.conf && redis-server /tmp/redis.conf"]

Я рассмотрю второй способ, т.к. его можно подключить к любому готовому коду, не внося изменений в логику приложения.

Жизненный цикл воркера при работе с Redis выглядит следующим образом:

Изображение выглядит как текст, диаграмма, Шрифт, снимок экрана  Контент, сгенерированный ИИ, может содержать ошибки.
Жизненный цикл воркера для Redis

Подключение к нужной базе

Как мы обсудили ранее, фикстура worker_id возвращает id воркера (gw0, gw1, и т.д.) или master. Сейчас нам понадобится фикстура, которая на основании worker_id будет возвращать номер базы данных в Redis. Для мастера это будет нулевая база, т.к. если запустился только мастер, то тесты работают синхронно.

Фисктура, возвращающая номер базы:

@pytest.fixture()
def redis_db_id(worker_id) -> int:
    if worker_id == "master":
        return 0
    else:
        return int(worker_id.split("gw")[1])

Далее напишем фикстуру, подменяющую подключение к Redis на подключение к нужной БД:

@pytest.fixture(autouse=True)
async def redisconnect_to_db(redis_db_id: int):
    global_regis.client = aioredis.Redis.from_url(
        url=f"{os.getenv("REDIS_URL", "redis://localhost:6379")}/{redis_db_id}",
        decode_responses=True,
    )

Очистка базы

Напишем фикстуру, очищающую базу, к которой мы подключены:

@pytest.fixture(autouse=True)
async def flushdb(redisclient: aioredis.Redis):
    yield
    await redis_client.flushdb()

И теперь запустим и убедимся, что все работает:

Параллельный запуск тестов в Redis
Параллельный запуск тестов в Redis

Здесь стоит рассказать про одну опциональную доработку. Как мы помним, Redis поддерживает ограниченное количество БД, и динамически создавать их нельзя. Из-за этого попытка запустить больше воркеров, чем есть БД в наличии, приведет к следующему результату:

Превышение количества существующих баз данных
Превышение количества существующих баз данных

Чтобы избежать такого результата, можно дописать pytest-хук, который будет бросать нам ошибку, если мы случайно запустим слишком много воркеров. Он берет из переменной окружения указанное количество баз данных и сравнивает его с количеством воркеров.

Проверочный pytest-хук:

def pytest_configure(config):
    if not hasattr(config, "workerinput"):
        redis_dbs = int(os.getenv("REDIS_DATABASES", "16"))
        xdist_worker_count = config.option.numprocesses
        if xdist_worker_count and xdist_worker_count > redis_dbs:
            raise pytest.UsageError(
                f"Использование более {redis_dbs} процессов запрещено (запрошено {xdist_worker_count}).\n"
                "Уменьшите значение -n/--numprocesses или увеличьте количество баз данных Redis.\n"
            )

Конечно, когда вы сами контролируете, сколько баз у вас есть и сколько воркеров вы запускаете, такая проверка может показаться избыточной. Однако pytest-xdist позволяет запускать тесты c параметром -n auto, и при его использовании будет не до конца очевидно, сколько воркеров поднимется.

Попробуем запустить тесты pytest -n 100:

Пример ошибки хука
Пример ошибки хука

Результаты и заключение 

Итак, самое интересное. Давайте сравним, на какое время нам удалось ускорить тесты:

Запуск тестов параллельно (30 с)
Запуск тестов параллельно (30 с)
Запуск тестов последовательно (142 с)
Запуск тестов последовательно (142 с)

Таким образом, в «лабораторных» условиях распараллеливание помогло ускорить тестирование в 4.7 раза.

Если же говорить про реальный проект, который я упоминал в начале, то его пайплайн получилось ускорить в 2,5 раза.

Параллельный запуск тестов на реальном проекте
Параллельный запуск тестов на реальном проекте

Так что рекомендую взять этот способ на вооружение. На практике нам он помог запускать тесты в разных процессах, избегая конкурентного доступа к базе данных и связанных с этим проблем. При этом для его использования не нужно вносить изменения в уже написанные тесты или сам проект — достаточно добавить некоторые фикстуры.

Итого: использование такого подхода позволит ускорить тестирование во столько раз, сколько ядер процессора доступно на машине.

Спасибо, что дочитали статью. Также рекомендую познакомиться с материалами моих коллег про бэкенд-разработку и про тестирование:

Комментарии (3)


  1. piton_nsk
    26.05.2025 15:51

    Если тесты используют базу, то попытка запустить их в разных процессах может вызвать конфликт доступа к данным. 

    А рабочее приложение как функционирует, если тесты могут вызывать конфликты?


    1. andrey7657956 Автор
      26.05.2025 15:51

      Привет.

      Проблемы при параллельной работе тестов и правда могут быть вызваны не продуманной архитектурой приложения. Но тут мы говорим не об этом. Мы рассматриваем ситуацию когда из-за параллельного запуска нарушается изоляция тестов, из-за этого тест не может корректно проверить юзкейс или запуститься.

      Например, есть два теста один проверяет создание записи в базе, а другой проверяет ошибку, возникающую при попытке получить не существующие запись. Если первый тест создаст запись раньше и не успеет очистить ее после себя, второй найдет ее и юзкейс с ошибкой не будет проверен.

      Также стоит сказать, что для тестирования сложной бизнес-логики часто приходится создавать окружение с помощью вызова фикстур или фабрик. Если мы запустим в параллель два теста, использующих одинаковое окружение, то при попытке дважды создать одно и тоже окружение (одни и те же записи в базе для тестов), могут возникнуть ошибки.

      Например, у тебя переиспользуется фикстура которая создает запись с уникальным полем. Тогда один из тестов, работающих параллельно просто не запустится, тк не сможет применить фикстуру.


      1. piton_nsk
        26.05.2025 15:51

        когда из-за параллельного запуска нарушается изоляция тестов, из-за этого тест не может корректно проверить юзкейс или запуститься

        В большинстве случаев это и означает кривую реализацию тестов и/или самого приложения. Если на сайт больше одного пользователя зашло, он же не ломается. А почему тесты не работают?

        Например, есть два теста один проверяет создание записи в базе, а другой проверяет ошибку, возникающую при попытке получить не существующие запись. 

        Создание записи в базе проверяется по айдишнику, сделали запись, получили айди (если айдишник генерится на клиенте, еще проще), потом проверили что запись создалась и все поля сохранились как надо. Тест, который проверяет логику работы с несуществующей записью, пытается получить запись по заведомо несуществующему айдишнику и другие тесты ему мешать не должны. У вас же рабочее приложение не ломается от того, что кто-то создал запись в базе. Поэтому и тесты не должны.

        Конечно, ситуации бывают всякие, иногда надо и строго определенное окружение, например в базе должно быть именно 3 записи о продажах, чтобы проверить как строится отчет по продажам, что туда попадает что надо, а что не надо не попадает. Но все же более правильно постараться изменить подход к тестам и возможно к архитектуре приложения, а не подгонять окружение под каждый тест. Иначе можно получить тесты которые правильно работают только после подгонки окружения, а рабочее приложение постоянно сыпется.