В статье разберём некоторые техники обнаружения плавающих багов, вызванных конкурентностью. Сделаем подход к автоматическому тестированию устойчивости веб-сервисов к различным race condition. Примеры будут на python + asyncio + sqlalchemy, но эти подходы применимы к любым моделям конкурентности, которые подвержены состояниям гонки.
Intro
Допустим, у нас есть баг. Давайте сначала рассмотрим его на минималистичном примере, в котором мы не-корутино-безопасно инкрементим счетчик:
import asyncio
import random
db: dict[str, int] = {}
async def simulate_io() -> None:
await asyncio.sleep(random.uniform(0, 0.01))
async def set_value(key: str, value: int) -> None:
await simulate_io()
db[key] = value
async def get_value(key: str) -> int:
await simulate_io()
return db[key]
async def increment(key: str) -> None:
value = await get_value(key) # (1)
await set_value(key, value + 1) # (2)
Два конкурентных вызова increment
приводят к гонке (race condition), в которой, если нам не повезёт, мы потеряем результат одной операции, и счетчик увеличится только на единицу вместо двойки.
Везение зависит от того, в каком порядке выполняются операции. Например, вот такой порядок приведёт к lost update:
Попробуем написать тест, который ловил бы этот баг:
async def test_increment_twice() -> None:
await set_value("foo", 0)
await asyncio.gather(increment("foo"), increment("foo"))
assert await get_value("foo") == 2
Такой тест будет моргающим (flaky). С одной стороны, это лучше, чем ничего: он может помочь обнаружить проблему, но опять же — если повезет. Как сделать его надёжней?
Random fuzzing — вносим хаос и добавляем задержки
Самое простое что можно придумать — повторить этот же тест много раз. На нашем примере это работает (works on my machine©):
$ pytest example.py --count 100
===================================== test session starts =====================================
platform darwin -- Python 3.12.3, pytest-8.3.3, pluggy-1.5.0
example.py ..F.....FF.F.FFF..............F.........F.FF.FFF....FFF..FFFF......F..F.F... [ 76%]
.F.F....F.FF...F.F...... [100%]
================================ 31 failed, 69 passed in 3.12s ================================
Конечно, у такого подхода куча минусов:
Очевидный оверхэд.
Всё ещё никаких гарантий: непонятно, сколько прогонов достаточно, чтобы убедиться в отсутствии проблемы.
Плохая воспроизводимость: ивент луп в тестах и под высокой нагрузкой на проде может вести себя очень по-разному, а некоторые аномалии могут никогда не проявиться в тестах.
Последний пункт можно немного сгладить, искусственно заставив корутины выполняться в «более случайном» порядке. Такая техника называется "concurrency fuzzing". Например, мы можем расставить рандомные задержки в местах переключения контекста (как simulate_io
в нашем примере). Конечно, это ещё больший костыль: тесты станут чудовищно медленными, плюс придётся трогать тестируемый код.
Чуть более радикальный способ — пропатчить планировщик самого ивент лупа (strongly not recommended):
from collections import deque
from selectors import BaseSelector
class _MagicRandomDeque(deque[asyncio.Handle]):
def popleft(self) -> asyncio.Handle:
random.shuffle(self)
return super().popleft()
class FuzzingLoop(asyncio.SelectorEventLoop):
def __init__(self, selector: BaseSelector | None = None) -> None:
super().__init__(selector)
self._ready: = _MagicRandomDeque()
class FuzzingLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self) -> asyncio.AbstractEventLoop:
return FuzzingLoop()
@pytest.fixture()
def event_loop_policy() -> asyncio.AbstractEventLoopPolicy:
return FuzzingLoopPolicy()
Теперь на каждой итерации цикла все готовые к выполнению колбэки asyncio-тасок будут запускаться в случайном порядке. Кстати, в trio случайный порядок выполнения заложен by design:
import trio
value: int
async def set_value(x: int) -> None:
global value
value = x
async def test() -> None:
async def _run() -> None:
async with trio.open_nursery() as nursery:
nursery.start_soon(set_value, 1)
nursery.start_soon(set_value, 2)
trio.run(_run)
assert value == 2
$ pytest tests/test_race_trio.py --count 100
...
================================ 47 failed, 53 passed in 0.96s ================================
Exhaustive search
А что, если вместо того, чтобы надеяться на случайность, мы полностью переберём все возможные последовательности операций? Давайте начнём с того, что посчитаем, сколько их вообще может быть, то есть сколькими способами можно упорядочить взаимно-чередующиеся операции. Комбинаторика говорит нам, что если у нас есть конкурентных потоков (корутин/гринлетов/etc.), каждый из которых выполняет операций, то количество вариантов сериализации равно:
Например, для двух потоков, один из которых совершает четыре операции, а второй пять, получим:
В реальном коде новые операции часто зависят от результатов предыдущх. Кроме того, использование примитивов синхронизации будет уменьшать количество возможных последовательностей, поэтому эту формулу можно считать оценкой верхней границы.
В нашем хэллоу-ворлд примере с потерянным инкрементом два потока и две операции, то есть вариантов. Обозначим потоки буквами A, В, а операции числами 1, 2. Вот они все слева направо сверху вниз:
A1 – A2 – B1 – B2
A1 – B1 – A2 – B2
A1 – B1 – B2 – A2
B1 - A1 - B2 - A2
B1 - A1 - A2 - B2
B1 – B2 - A1 – A2
Чтобы перебрать все эти цепочки в нашем тесте, нам нужно:
1) запоминать, какие из них мы уже выполнили;
2) понять, когда все уникальные цепочки пройдены.
Конечно, в рантайме в момент выполнения кода мы знаем только про текущую и прошлые операции, предрасчитать все цепочки заранее в общем случае нельзя — нужен онлайн-алгоритм. Для этого удобнее представить все последовательности в виде дерева, в каждом узле которого мы выбираем какую операцию выполнить следующей:
План примерно такой:
При выполнении каждой операции ставим ее на паузу и пытаемся дождаться ещё одной — той самой, с которой может возникнуть гонка.
Если дождались, добавляем обе в текущий узел дерева.
Согласно нашему алгоритму обхода выбираем, какую из двух операций выполнить следующей.
Помечаем выбранный узел как посещенный и выполняем операцию.
У нас осталась единственная невыполненная операция на паузе — снова начинаем ждать вторую.
Если оба потока завершились, помечаем текущую ноду как конечную (лист) и возвращаемся в корень для следующего прохода.
Если при этом все ноды уже посещены, значит мы прошли все пути в дереве и на этом можно смело заканчивать.
Давайте попробуем реализовать наш план, но сначала немного сменим декорации:
async def increment(key: str) -> None:
async with engine.begin() as conn:
res = await conn.execute(select(table.c.value).where(table.c.key == key))
value = res.scalar_one()
await conn.execute(table.update().values(value=value + 1))
Во-первых, так пример станет ближе к нашей повседневной реальности – теперь в роли конкурентных операций у нас будут SQL-запросы. А во-вторых, алхимия предоставляет Events API с удобными хуками для навешивания своих колбэков на различные события, например, перед выполнением запроса — как раз то, что нам надо.
Тут я приведу сильно урезанный для читабельности псевдокод, полная версия со всеми танцами вокруг алхимии лежит на Гитхабе. (Кстати, на случай если вы заглянули в репу и вам стало интересно почему в алхимии так странно реализована асинхронность и причем тут гринлеты – оставлю тут ссылку на одну из наших предыдущих статей)
@dataclass
class Node:
value: str
children: list[Node] = field(default_factory=list)
visited: bool = False
def is_explored(self) -> bool:
return (
self.visited
and all(child.is_explored() for child in self.children)
)
def before_execute(query: str, *_: Any) -> None:
global curr
node = Node(asyncio.current_task().get_name())
if node not in curr.children:
curr.children.append(node)
while True:
wait_for(lambda: len(curr.children) >= 2, timeout=0.1)
nxt = next(child for child in curr.children if not child.is_explored())
nxt.visited = True
curr = nxt
if curr == node:
break
sqlalchemy.event.listen(engine, "before_cursor_execute", before_execute)
root = curr = Node("root")
while not root.is_explored():
await set_value("foo", 0)
await asyncio.gather(increment("foo"), increment("foo"))
if await get_value("foo") != 2:
print("Lost update detected")
curr = root
curr.visited = True
Выводы
Обозначим плюсы и минусы нашего подхода.
Плюсы:
Можем гарантированно перебрать все возможные последовательности конкурентных операций за минимальное количество итераций.
В теории применимо к любым моделям конкурентности и любым аномалиям. Мы рассматривали пример с нарушением консистентности данных, но все вышесказанное можно приложить, например, и к дэдлокам.
Минусы:
Есть ограничение: код должен быть детерминированным при детерминированном порядке операций. Так как мы строим дерево последовательностей динамически, одна и та же ветка этого дерева должна быть воспроизводимой. Например, если в нашей логике где-то есть
if random() < 0.5
— то всё взорвётся.Неустраняемый оверхэд на таймаут при ожидании второй операции. Его придётся подбирать эмпирически, в тестах с БД у меня получилось минимально-работающее значение 10 мс.
Что дальше?
В заключение немного утопичных мыслей: а можно ли написать такой тест, который автоматически найдёт вообще все конкурентные аномалии в моем сервисе?
Считаем, что весь стейт сервиса — это его БД плюс состояние моков (принимаем, что все внешние интеграции в тестах у нас замоканы), а любые действия над стейтом можно совершать только вызовами API. В такой постановке мы можем считать что аномалии —это все ситуации, при которых конкурентное выполнение любых двух вызовов API из начального стейта приводит к такому конечному стейту , который нельзя получить при последовательном выполнении тех же запросов в любом порядке.
Формулировка получилась сложная, поэтому ещё раз в виде плана по шагам:
Берём два API-запроса и выполняем их последовательно.
Запоминаем полученное результирующее состояние БД.
Откатываем БД обратно к начальному состоянию.
Снова выполняем те же два запроса, снова последовательно, но уже в обратном порядке.
Снова запоминаем состояние БД.
Эти два состояния БД считаем единственно валидными, и если при конкурентном выполнении тех же двух запросов БД придёт в иное состояние, то считаем такую ситуацию конкурентной аномалией.
Тогда, в теории, мы можем написать такой тест:
def everything():
for method in my_api:
for params in list_possible_params_for(method):
yield lambda: api.call(method, params)
def test_everything():
for initial_state in list_possible_db_states():
set_state(initial_state)
for op1, op2 in combinations_with_replacement(everything(), 2):
valid_end_states = get_valid_end_states(op1, op2)
for interleaving in fuzzer.explore(op1, op2):
state = snapshot()
if state not in valid_end_states:
print(
f"Oh no, concurrent execution of {op1} and {op2} "
"resulted in inconsistent state"
)
set_state(initial_state)
def get_valid_end_states(op1, op2):
initial_state = snapshot()
op1()
op2()
end_state_1 = snapshot()
set_state(initial_state)
op2()
op1()
end_state_2 = snapshot()
set_state(initial_state)
return (end_state_1, end_state_2)
Сложность тут только в том, что сложность (pun intended) такого теста будет примерно O(∞). Конечно, на практике мы вряд ли хотим перебирать вообще все состояния БД и возможные параметры методов.
Во-первых, их бесконечное множество.
Во-вторых, некоторые методы часто вообще заведомо изолированы друг от друга. Они не пересекаются по таблицам в БД и каким-либо глобальным блокировкам, поэтому логично было бы исключить такие пары.
Вместо всех возможных состояний БД и параметров мы, скорее, хотим ограничиться набором условно-эквивалентных классов. Но тогда дизайнить такие наборы состояний придется самим. Обычно это нетривиальная задача — за пять минут её не автоматизируешь.
Хорошо, тогда другая идея — обычно какие-то API-тесты у нас уже написаны, и они проверяют функциональные требования, иногда и немного нефункциональные тоже, но почти никогда не проверяют race conditions. Что если переиспользовать существующие тесты? По сути, просто прогнать их ещё раз, но уже не с оригинальными бизнесовыми ассертами, а проверяя инварианты состояния при конкурентном запуске? Кажется, что такой подход имеет больше шансов на жизнеспособность. Но это уже тема для другой статьи.
Комментарии (7)
redbeardster
28.01.2025 09:00TLA+ не пробовали ?
qweeze Автор
28.01.2025 09:00До формальной верификации мы еще не доросли :) Для обычной продуктовой разработки это слишком долго и дорого относительно цены ошибки. Но может быть придем и к TLA+ если будет подходящий юзкейс
redbeardster
28.01.2025 09:00Не сказал бы, что очень долго и дорого, но воля Ваша. Ну что ж, пока потренируюсь, а потом можно будет и Isabelle :)
propell-ant
В тестах база не нужна, стейт можно и в памяти хранить. Сложность не уменьшится, но время перебора сильно сократится.
qweeze Автор
Смотря что тестировать – в наших сервисах обычно много работы с БД, и это как раз то что хочется покрывать тестами в первую очередь. Максимум что мы можем сделать для ускорения – потюнить настройки персистентности постгри. Ну и тесты распараллелить конечно.
propell-ant
Просто мне показалось, что решается проблема поиска возможных race condition, и база тут является внешним фактором. То есть решающее значение имеет порядок вызовов, а не задержки обработки.
Запуская тесты на живой базе вы привязываетесь к движку и типу хранилища, и даже к настройкам базы. Насколько релевантны окажутся результаты тестов при изменении, например, движка базы?
qweeze Автор
race condition-ы здесь в широком смысле – как в коде, так и при работе с БД. В первом примере мы рассматриваем гонку 2 корутин за изменение переменной в памяти, а в последнем примере это конкурентные апдейты строки в БД.
Все так, но порядок выполнения операций зависит от задержек. Например в тестах race condition может не проявляться, а в проде под боевой нагрузкой начнет стрелять из-за изменившихся задержек.
Идея с полным перебором всех возможных последовательностей как раз решает эту проблему – мы пытаемся исключить влияние случайных задержек.