В статье разберём некоторые техники обнаружения плавающих багов, вызванных конкурентностью. Сделаем подход к автоматическому тестированию устойчивости веб-сервисов к различным race condition. Примеры будут на python + asyncio + sqlalchemy, но эти подходы применимы к любым моделям конкурентности, которые подвержены состояниям гонки.

Intro

Допустим, у нас есть баг. Давайте сначала рассмотрим его на минималистичном примере, в котором мы не-корутино-безопасно инкрементим счетчик:

import asyncio
import random

db: dict[str, int] = {}


async def simulate_io() -> None:
    await asyncio.sleep(random.uniform(0, 0.01))


async def set_value(key: str, value: int) -> None:
    await simulate_io()
    db[key] = value


async def get_value(key: str) -> int:
    await simulate_io()
    return db[key]


async def increment(key: str) -> None:
    value = await get_value(key)     # (1)
    await set_value(key, value + 1)  # (2)

Два конкурентных вызова increment приводят к гонке (race condition), в которой, если нам не повезёт, мы потеряем результат одной операции, и счетчик увеличится только на единицу вместо двойки.

Везение зависит от того, в каком порядке выполняются операции. Например, вот такой порядок приведёт к lost update:

Попробуем написать тест, который ловил бы этот баг:

async def test_increment_twice() -> None:
    await set_value("foo", 0)
    await asyncio.gather(increment("foo"), increment("foo"))
    assert await get_value("foo") == 2

Такой тест будет моргающим (flaky). С одной стороны, это лучше, чем ничего: он может помочь обнаружить проблему, но опять же — если повезет. Как сделать его надёжней?

Random fuzzing — вносим хаос и добавляем задержки

Самое простое что можно придумать — повторить этот же тест много раз. На нашем примере это работает (works on my machine©):

$ pytest example.py --count 100
===================================== test session starts =====================================
platform darwin -- Python 3.12.3, pytest-8.3.3, pluggy-1.5.0

example.py ..F.....FF.F.FFF..............F.........F.FF.FFF....FFF..FFFF......F..F.F... [ 76%]
.F.F....F.FF...F.F......                                                                [100%]

================================ 31 failed, 69 passed in 3.12s ================================

Конечно, у такого подхода куча минусов:

  • Очевидный оверхэд.

  • Всё ещё никаких гарантий: непонятно, сколько прогонов достаточно, чтобы убедиться в отсутствии проблемы.

  • Плохая воспроизводимость: ивент луп в тестах и под высокой нагрузкой на проде может вести себя очень по-разному, а некоторые аномалии могут никогда не проявиться в тестах.

Последний пункт можно немного сгладить, искусственно заставив корутины выполняться в «более случайном» порядке. Такая техника называется "concurrency fuzzing". Например, мы можем расставить рандомные задержки в местах переключения контекста (как simulate_io в нашем примере). Конечно, это ещё больший костыль: тесты станут чудовищно медленными, плюс придётся трогать тестируемый код.

Чуть более радикальный способ — пропатчить планировщик самого ивент лупа (strongly not recommended):

from collections import deque
from selectors import BaseSelector


class _MagicRandomDeque(deque[asyncio.Handle]):
    def popleft(self) -> asyncio.Handle:
        random.shuffle(self)
        return super().popleft()


class FuzzingLoop(asyncio.SelectorEventLoop):
    def __init__(self, selector: BaseSelector | None = None) -> None:
        super().__init__(selector)
        self._ready: = _MagicRandomDeque()


class FuzzingLoopPolicy(asyncio.DefaultEventLoopPolicy):
    def new_event_loop(self) -> asyncio.AbstractEventLoop:
        return FuzzingLoop()


@pytest.fixture()
def event_loop_policy() -> asyncio.AbstractEventLoopPolicy:
    return FuzzingLoopPolicy()

Теперь на каждой итерации цикла все готовые к выполнению колбэки asyncio-тасок будут запускаться в случайном порядке. Кстати, в trio случайный порядок выполнения заложен by design:

import trio

value: int


async def set_value(x: int) -> None:
    global value
    value = x


async def test() -> None:
    async def _run() -> None:
        async with trio.open_nursery() as nursery:
            nursery.start_soon(set_value, 1)
            nursery.start_soon(set_value, 2)

    trio.run(_run)
    assert value == 2
$ pytest tests/test_race_trio.py --count 100
...
================================ 47 failed, 53 passed in 0.96s ================================

Exhaustive search

А что, если вместо того, чтобы надеяться на случайность, мы полностью переберём все возможные последовательности операций? Давайте начнём с того, что посчитаем, сколько их вообще может быть, то есть сколькими способами можно упорядочить взаимно-чередующиеся операции. Комбинаторика говорит нам, что если у нас есть k конкурентных потоков (корутин/гринлетов/etc.), каждый из которых выполняет n_iопераций, то количество вариантов сериализации равно:

\frac{(n_1+n_2+\cdots+n_k)!}{n_1!n_2!\cdots n_k!}

Например, для двух потоков, один из которых совершает четыре операции, а второй пять, получим:

(4+5)!/(4!⋅5!)=126

В реальном коде новые операции часто зависят от результатов предыдущх. Кроме того, использование примитивов синхронизации будет уменьшать количество возможных последовательностей, поэтому эту формулу можно считать оценкой верхней границы.

В нашем хэллоу-ворлд примере с потерянным инкрементом два потока и две операции, то есть (2+2)!/(2!⋅2!)=6 вариантов. Обозначим потоки буквами A, В, а операции числами 1, 2. Вот они все слева направо сверху вниз:

A1 – A2 – B1 – B2
A1 – B1 – A2 – B2
A1 – B1 – B2 – A2
B1 - A1 - B2 - A2
B1 - A1 - A2 - B2
B1 – B2 - A1 – A2

Чтобы перебрать все эти цепочки в нашем тесте, нам нужно: 

1) запоминать, какие из них мы уже выполнили;

2) понять, когда все уникальные цепочки пройдены. 

Конечно, в рантайме в момент выполнения кода мы знаем только про текущую и прошлые операции, предрасчитать все цепочки заранее в общем случае нельзя — нужен онлайн-алгоритм. Для этого удобнее представить все последовательности в виде дерева, в каждом узле которого мы выбираем какую операцию выполнить следующей:

Дерево всех возможных последовательностей операций
Дерево всех возможных последовательностей операций

План примерно такой:

  • При выполнении каждой операции ставим ее на паузу и пытаемся дождаться ещё одной — той самой, с которой может возникнуть гонка.

  • Если дождались, добавляем обе в текущий узел дерева.

  • Согласно нашему алгоритму обхода выбираем, какую из двух операций выполнить следующей.

  • Помечаем выбранный узел как посещенный и выполняем операцию.

  • У нас осталась единственная невыполненная операция на паузе — снова начинаем ждать вторую.

  • Если оба потока завершились, помечаем текущую ноду как конечную (лист) и возвращаемся в корень для следующего прохода.

  • Если при этом все ноды уже посещены, значит мы прошли все пути в дереве и на этом можно смело заканчивать.

Давайте попробуем реализовать наш план, но сначала немного сменим декорации:

async def increment(key: str) -> None:
    async with engine.begin() as conn:
        res = await conn.execute(select(table.c.value).where(table.c.key == key))
        value = res.scalar_one()
        await conn.execute(table.update().values(value=value + 1))

Во-первых, так пример станет ближе к нашей повседневной реальности – теперь в роли конкурентных операций у нас будут SQL-запросы. А во-вторых, алхимия предоставляет Events API с удобными хуками для навешивания своих колбэков на различные события, например, перед выполнением запроса — как раз то, что нам надо.

Тут я приведу сильно урезанный для читабельности псевдокод, полная версия со всеми танцами вокруг алхимии лежит на Гитхабе. (Кстати, на случай если вы заглянули в репу и вам стало интересно почему в алхимии так странно реализована асинхронность и причем тут гринлеты – оставлю тут ссылку на одну из наших предыдущих статей)

@dataclass
class Node:
    value: str
    children: list[Node] = field(default_factory=list)
    visited: bool = False

    def is_explored(self) -> bool:
        return (
            self.visited
            and all(child.is_explored() for child in self.children)
        )

def before_execute(query: str, *_: Any) -> None:
    global curr
    node = Node(asyncio.current_task().get_name())
    if node not in curr.children:
        curr.children.append(node)

    while True:
        wait_for(lambda: len(curr.children) >= 2, timeout=0.1)
        nxt = next(child for child in curr.children if not child.is_explored())
        nxt.visited = True
        curr = nxt
        if curr == node:
            break

sqlalchemy.event.listen(engine, "before_cursor_execute", before_execute)

root = curr = Node("root")
while not root.is_explored():
    await set_value("foo", 0)
    await asyncio.gather(increment("foo"), increment("foo"))
    if await get_value("foo") != 2:
        print("Lost update detected")

    curr = root
    curr.visited = True

Выводы

Обозначим плюсы и минусы нашего подхода.

Плюсы:

  • Можем гарантированно перебрать все возможные последовательности конкурентных операций за минимальное количество итераций.

  • В теории применимо к любым моделям конкурентности и любым аномалиям. Мы рассматривали пример с нарушением консистентности данных, но все вышесказанное можно приложить, например, и к дэдлокам.

Минусы:

  • Есть ограничение: код должен быть детерминированным при детерминированном порядке операций. Так как мы строим дерево последовательностей динамически, одна и та же ветка этого дерева должна быть воспроизводимой. Например, если в нашей логике где-то есть if random() < 0.5 — то всё взорвётся.

  • Неустраняемый оверхэд на таймаут при ожидании второй операции. Его придётся подбирать эмпирически, в тестах с БД у меня получилось минимально-работающее значение 10 мс.

Что дальше?

В заключение немного утопичных мыслей: а можно ли написать такой тест, который автоматически найдёт вообще все конкурентные аномалии в моем сервисе?

Считаем, что весь стейт сервиса — это его БД плюс состояние моков (принимаем, что все внешние интеграции в тестах у нас замоканы), а любые действия над стейтом можно совершать только вызовами API. В такой постановке мы можем считать что аномалии —это все ситуации, при которых конкурентное выполнение любых двух вызовов API из начального стейта S_1 приводит к такому конечному стейту S_2, который нельзя получить при последовательном выполнении тех же запросов в любом порядке.

Формулировка получилась сложная, поэтому ещё раз в виде плана по шагам:

  • Берём два API-запроса и выполняем их последовательно.

  • Запоминаем полученное результирующее состояние БД.

  • Откатываем БД обратно к начальному состоянию.

  • Снова выполняем те же два запроса, снова последовательно, но уже в обратном порядке.

  • Снова запоминаем состояние БД.

  • Эти два состояния БД считаем единственно валидными, и если при конкурентном выполнении тех же двух запросов БД придёт в иное состояние, то считаем такую ситуацию конкурентной аномалией.

Тогда, в теории, мы можем написать такой тест:

def everything():
	for method in my_api:
    	for params in list_possible_params_for(method):
        	yield lambda: api.call(method, params)

            
def test_everything():
	for initial_state in list_possible_db_states():
    	set_state(initial_state)

    	for op1, op2 in combinations_with_replacement(everything(), 2):
        	valid_end_states = get_valid_end_states(op1, op2)

        	for interleaving in fuzzer.explore(op1, op2):
            	state = snapshot()

            	if state not in valid_end_states:
                	print(
                    	f"Oh no, concurrent execution of {op1} and {op2} "
                    	"resulted in inconsistent state"
                	)

            	set_state(initial_state)

                
def get_valid_end_states(op1, op2):
	initial_state = snapshot()
	op1()
	op2()
	end_state_1 = snapshot()
    
	set_state(initial_state)
	op2()
	op1()
	end_state_2 = snapshot()
    
	set_state(initial_state)
	return (end_state_1, end_state_2)

Сложность тут только в том, что сложность (pun intended) такого теста будет примерно O(∞). Конечно, на практике мы вряд ли хотим перебирать вообще все состояния БД и возможные параметры методов. 

  • Во-первых, их бесконечное множество.

  • Во-вторых, некоторые методы часто вообще заведомо изолированы друг от друга. Они не пересекаются по таблицам в БД и каким-либо глобальным блокировкам, поэтому логично было бы исключить такие пары. 

Вместо всех возможных состояний БД и параметров мы, скорее, хотим ограничиться набором условно-эквивалентных классов. Но тогда дизайнить такие наборы состояний придется самим. Обычно это нетривиальная задача — за пять минут её не автоматизируешь.

Хорошо, тогда другая идея — обычно какие-то API-тесты у нас уже написаны, и они проверяют функциональные требования, иногда и немного нефункциональные тоже, но почти никогда не проверяют race conditions. Что если переиспользовать существующие тесты? По сути, просто прогнать их ещё раз, но уже не с оригинальными бизнесовыми ассертами, а проверяя инварианты состояния при конкурентном запуске? Кажется, что такой подход имеет больше шансов на жизнеспособность. Но это уже тема для другой статьи.

Комментарии (7)


  1. propell-ant
    28.01.2025 09:00

    В тестах база не нужна, стейт можно и в памяти хранить. Сложность не уменьшится, но время перебора сильно сократится.


    1. qweeze Автор
      28.01.2025 09:00

      Смотря что тестировать – в наших сервисах обычно много работы с БД, и это как раз то что хочется покрывать тестами в первую очередь. Максимум что мы можем сделать для ускорения – потюнить настройки персистентности постгри. Ну и тесты распараллелить конечно.


      1. propell-ant
        28.01.2025 09:00

        Просто мне показалось, что решается проблема поиска возможных race condition, и база тут является внешним фактором. То есть решающее значение имеет порядок вызовов, а не задержки обработки.

        Запуская тесты на живой базе вы привязываетесь к движку и типу хранилища, и даже к настройкам базы. Насколько релевантны окажутся результаты тестов при изменении, например, движка базы?


        1. qweeze Автор
          28.01.2025 09:00

          race condition-ы здесь в широком смысле – как в коде, так и при работе с БД. В первом примере мы рассматриваем гонку 2 корутин за изменение переменной в памяти, а в последнем примере это конкурентные апдейты строки в БД.

          То есть решающее значение имеет порядок вызовов, а не задержки обработки

          Все так, но порядок выполнения операций зависит от задержек. Например в тестах race condition может не проявляться, а в проде под боевой нагрузкой начнет стрелять из-за изменившихся задержек.

          Насколько релевантны окажутся результаты тестов при изменении, например, движка базы?

          Идея с полным перебором всех возможных последовательностей как раз решает эту проблему – мы пытаемся исключить влияние случайных задержек.


  1. redbeardster
    28.01.2025 09:00

    TLA+ не пробовали ?


    1. qweeze Автор
      28.01.2025 09:00

      До формальной верификации мы еще не доросли :) Для обычной продуктовой разработки это слишком долго и дорого относительно цены ошибки. Но может быть придем и к TLA+ если будет подходящий юзкейс


      1. redbeardster
        28.01.2025 09:00

        Не сказал бы, что очень долго и дорого, но воля Ваша. Ну что ж, пока потренируюсь, а потом можно будет и Isabelle :)