При работе с асинхронным кодом нам часто приходится обрабатывать CancelledError — сигнал отмены задачи. С точки зрения приложения это выглядит как обычное исключение — достаточно добавить try/except блок и продолжить работу. Хотя в большинстве случаев это и правда работает, иногда все же приходится копнуть глубже и разобраться, как устроен механизм отмены под капотом и для чего на самом деле нужен CancelledError.
Впервые я столкнулся с такой потребностью при работе над FastAPI-сервисом для стриминга ответов от LLM. Запросы обрабатывались долго, и клиенты часто закрывали соединения, не дожидаясь ответа. В таких случаях некорректная обработка дисконнектов приводила к зависшим транзакциям, сломанному пулу соединений с БД, а также к неэффективной утилизации ресурсов. Подробнее об этом — в прошлой статье: в ней детальный разбор того, что происходит при обрыве соединения, от уровня TCP до ASGI и кода вашего приложения.
В тот момент казалось, что проблема решена: стало понятно, что стоит за дисконнектами, как они устроены и к чему приводят. В конце концов, с точки зрения приложения дисконнект выглядит просто как отмена обработчика запроса — тот самый CancelledError. Однако не все так просто. В процессе реализации появляются новые вопросы. Какой код отменять безопасно, а какой нет? Как не засорить весь код, включая бизнес-логику, однотипными обработчиками отмены? Что делать, когда есть несколько источников отмены — например, задача одновременно прерывается из-за таймаута и дисконнекта пользователя?
Это первая из двух статей, посвященных механике отмены задач. В ней мы остановимся на стандартном asyncio. Узнаем, что на самом деле представляет собой CancelledError с точки зрения event-loop. Разберемся, как работает счетчик отмены (cancel/uncancel), на котором построены TaskGroup и asyncio.timeout. Наконец, обсудим проблемы, которые возникают на практике, в первую очередь связанные с asyncio.shield.
Во второй части мы посмотрим на более высокоуровневые примитивы из anyio и trio, а также на способы достичь похожих результатов на «чистом» asyncio.
Далее в рамках статьи приводятся ссылки на исходный код CPython версии 3.14.0. Начиная с версии 3.13.0 логика отмены задач существенным образом не менялась, по этой причине все сказанное актуально для любой версии Python начиная с 3.13.0.
Сниппеты кода представляют собой упрощенный вариант реализации механики отмены задач CPython:
Lib/asyncio/tasks.py,Lib/asyncio/futures.py,Modules/_asynciomodule.c.
Как устроен asyncio: Task, Future, event loop
Прежде чем погружаться в механику отмены задач, необходимо разобраться, как устроены ключевые объекты asyncio (Task, Future и корутины).
Основные сущности asyncio
Корутина (coroutine) — пользовательский код (
async def). Объект, который может приостанавливать свою работу с помощью ключевого словаawait. Перед остановкой в корутине создается контейнер для результата (Future).Future — контейнер для результата. По готовности результат из
Futureпередается обратно в корутину и она продолжает работу с места последнегоawait.Task (задача или просто таска) — объект, управляющий жизненным циклом корутины в рамках событийного цикла.
TaskполучаетFutureиз корутины, по готовности результата снова запускает корутину.Событийный цикл (event loop) — планировщик задач. Управляет выполнением тасок, запускает коллбэки, отвечает за события ввода-вывода.
Если соединить все вместе: event loop ждет внешних событий. Событие приводит к запуску коллбэка, который кладет результат во Future — это пробуждает ждавшую его таску. Таска выполняет корутину до следующего await, после чего цикл повторяется.
Давайте разберемся на примере:
import asyncio async def fetch(): print("start") # Task исполняет корутину; эта строка выполняется. await asyncio.sleep(1) # Корутина отдает Future; Task подписывается на результат и приостанавливает работу. print("done") # Сработал таймер, Future заполнен, Task возобновляет работу корутины. return 42 # Корутина возвращает результат; Task записывает 42 как свой результат. asyncio.run(fetch()) # Создает цикл, оборачивает корутину в Task, выполняет ее до завершения.
Далее детально рассмотрим, как это устроено в коде CPython.
Как устроены Task и Future
Task — это наследник Future, оборачивающий корутину (с дополнительными атрибутами). Ниже представлена упрощенная структура объекта Task. Диагностические и служебные атрибуты (_name, _loop, _context, _callbacks, _source_traceback, …) опущены (tasks.py#L56, futures.py#L31):
class Task(Future): _coro: Coroutine # обернутая корутина — tasks.py L103 _fut_waiter: Future | None # future, результат которого в данный момент ожидает таска — tasks.py L102 # Атрибуты, отслеживающие статус отмены таски _must_cancel: bool # есть ли отложенный запрос на отмену таски — tasks.py L101 _num_cancels_requested: int # сколько было запросов на отмену таски — tasks.py L100 # Исход — унаследовано от Future _state: {_PENDING, _CANCELLED, _FINISHED} # текущее состояние таски — futures.py L50 _cancel_message: object | None # аргумент msg=, переданный в cancel() — futures.py L55 _cancelled_exc: CancelledError | None # сохраненное исключение отмены (хранит traceback) — futures.py L57 ...
Чуть позже мы детальнее рассмотрим каждое из этих полей.
Жизненный цикл Task: __step и __wakeup
За исполнение кода корутины отвечают два метода объекта Task — __step и __wakeup.
Ментальная модель:
__stepвыполняет код корутины от одногоawaitдо следующего. Когда происходит внешнее событие, исполнение таски возобновляется через вызов коллбэка__wakeup. Суть__wakeupв том, чтобы обработать результатFutureи снова запустить метод__step. Далее цикл повторяется.
Во внутренности этих двух методов мы заглянем чуть ниже, а пока давайте снова рассмотрим механизм работы Task, но в этот раз через призму этих функций:
-
Event loop выполняет задачу.
Происходит вызов
task.__step(). (tasks.py#L266) -
Task начинает исполнять код корутины (или пробрасывает в нее исключение).
Код корутины выполняется без прерываний до следующего
awaitилиreturn. (tasks.py#L289— send,#L291— throw) -
Создание Future и регистрация источника пробуждения.
Внутри корутины происходит асинхронный вызов (например,
asyncio.sleepилиloop.sock_recv). Такой вызов создаетFutureдля будущего результата и регистрирует источник пробуждения в event loop:asyncio.sleepиспользуетcall_later, аloop.sock_recv—add_reader. Внешнее событие приведет к вызову этого коллбэка, который, в свою очередь, поместит результат в этотFuture(см. шаг 8). (tasks.py#L698,selector_events.py#L368) -
Выполнение корутины приостанавливается, контейнер Future передается в таску.
Корутина передает таске объект
Futureчерез возвращаемое значение вызоваcoro.send(None). (tasks.py#L289) -
Таска подписывается на результат Future.
Внутри
__stepпроисходит регистрация коллбэка для объектаFuture—result.add_done_callback(self.__wakeup). (tasks.py#L327). Этот коллбэк будет вызван, когда результат будет помещен воFuture. -
Future записывается в поле таски
_fut_waiterВызов
__stepзаписываетFutureв атрибут таски_fut_waiter; теперь объектTaskзнает, результат чего он ждет. (tasks.py#L329) -
Таска передает управление планировщику.
Вызов
__stepзавершается; цикл обрабатывает другие таски, таймеры и события ввода-вывода. -
Происходит внешнее событие.
Срабатывает таймер, в сокет пришли байты, и так далее; вызывается коллбэк, который помещает результат в объект
Future, что в свою очередь, инициирует вызов__wakeupчерезloop.call_soon. (futures.py#L179) -
Возобновление работы таски.
Планировщик вызывает
__wakeup(future), который распаковывает контейнерFuture(tasks.py#L359) и вызывает__step— пробрасывая исключение, еслиFutureзавершился ошибкой, иначе возобновляет корутину. Результат изFutureподставляется в место вызоваawait. (tasks.py#L373) Повторение цикла.
Шаги 2–9 повторяются, пока корутина не завершит исполнение (выбросив исключение StopIteration). (tasks.py#L292)

Механика отмены задач
Для отмены задачи у объекта Task есть публичный метод cancel(). Давайте разберемся, что происходит во время его вызова с точки зрения событийного цикла.
Внутренности методов __step и __wakeup
Событие отмены — момент, когда CancelledError появляется в пользовательском коде, — происходит внутри __step. Ниже представлен упрощенный код данного метода: блоки CONSUME, CLEAR, RETURN, CANCEL, SUSPEND — это ключевые места, связанные с обработкой событий отмены. Далее мы будем ссылаться на них при разборе различных сценариев отмены таски.
def __step(self, exc=None): coro = self._coro # ───────────── CONSUME: отложенная отмена ───────────── tasks.py#L270 ─ # Флаг _must_cancel равен True: вызов cancel() выставил его в момент, # когда задача не ждала результат какого-либо Future — например, вызов # cancel() произошел сразу после создания задачи, но до ее первого # запуска (до первого __step). В этом случае вручную создаем # CancelledError и пробрасываем его ниже. if self._must_cancel: if not isinstance(exc, CancelledError): exc = self._make_cancelled_error() self._must_cancel = False # ───────────── CLEAR: очистка _fut_waiter ───────────── tasks.py#L274 ─ # Future, результат которого мы ждали, завершен; сбрасываем значение self._fut_waiter = None # Исполнение пользовательского кода корутины: send(None) возобновляет ее # при нормальном исполнении; throw(exc) возобновляет, пробрасывая ошибку # в месте текущего await — например, ожидаемая внешняя операция # завершилась с ошибкой (либо поступил CancelledError из блока CONSUME). try: if exc is None: result = coro.send(None) else: result = coro.throw(exc) # ───────── RETURN: корутина вернула значение ────────── tasks.py#L292 ─ except StopIteration as exc: if self._must_cancel: # Корутина сама отменила исполнение своей задачи, после чего # вернула значение — например, `current_task().cancel(); return`. self._must_cancel = False # Помечаем Task как отмененную # (super().cancel() есть Future.cancel()). super().cancel(msg=self._cancel_message) else: # Штатное завершение: сохраняем возвращаемое корутиной значение super().set_result(exc.value) # ───── CANCEL: корутина выбросила CancelledError ────── tasks.py#L299 ─ except CancelledError as exc: # Сохраняем исходное исключение. При пробуждении родительской задачи # (внутри __wakeup) вызов future.result() выбросит именно это # конкретное исключение, затем __step пробросит его в корутину # родительской задачи через вызов coro.throw(exc) self._cancelled_exc = exc # Помечаем Task как отмененную # (super().cancel() есть Future.cancel()). super().cancel() # ──── SUSPEND: корутина вернула управление (await) ──── tasks.py#L308 ─ else: # Корутина приостановила работу, ожидая результат асинхронного # вызова. Контейнер Future для результата находится в локальной # переменной result. # Подписываемся на результат Future. Внешнее событие заполнит # контейнер, после чего произойдет вызов коллбэка __wakeup, который # возобновит исполнение задачи. result.add_done_callback(self.__wakeup) # Записываем этот Future в атрибут объекта задачи: теперь задача # знает, результат чего ждет self._fut_waiter = result # Корутина отменила свою таску, после чего сделала асинхронный вызов # — например, `current_task().cancel(); await asyncio.sleep(...)`. # Отменяем только что созданный Future: если он отменился - # сбрасываем флаг _must_cancel. В случае если Future был создан и # сразу же завершен, флаг остается, и отмена обработается на # следующем шаге __step в блоке CONSUME. if self._must_cancel: if self._fut_waiter.cancel(msg=self._cancel_message): self._must_cancel = False
__wakeup — это коллбэк, который будет вызван, когда результат Future готов (tasks.py#L359). Он вызывается в момент, когда нужно возобновить исполнение задачи — обрабатывает результат из Future, после чего вызывает __step:
def __wakeup(self, future): try: # Выбрасывает исключение, если внешний вызов завершился с ошибкой или Future был отменен. future.result() except BaseException as exc: # Запуск следующего шага задачи. Исключение будет передано в корутину через вызов coro.throw(exc). self.__step(exc) else: # Запуск следующего шага задачи: исполнение корутины продолжится # через coro.send(None). Результат Future не передается напрямую — # он подставится в точке последнего await как результат yield from # (Future.__await__ делает `return self.result()`, см. # https://github.com/python/cpython/blob/v3.14.0/Lib/asyncio/futures.py#L292) self.__step()
API для отмены Future и Task
Task — это Future, поэтому, прежде чем разбираться с Task.cancel(), давайте посмотрим, как устроен Future.cancel()(futures.py#L152):
def cancel(self, msg=None): if self._state != _PENDING: return False self._state = _CANCELLED self._cancel_message = msg # Future завершен. Происходит планирование запуска коллбэков. # Задача, ожидающая результат этого Future, зарегистрировала здесь свой __wakeup. # Этот коллбэк будет вызван на следующем шаге работы планировщика. self.__schedule_callbacks() return True
Task переопределяет метод cancel (tasks.py#L183). Тело функции распадается на три сценария, в зависимости от значения _fut_waiter:
def cancel(self, msg=None): # ────────────────── Задача завершена ────────────────── tasks.py#L206 ─ if self.done(): return False # ────────────── Инкремент счетчика отмен ────────────── tasks.py#L208 ─ # Каждый вызов cancel() увеличивает значение счетчика self._num_cancels_requested += 1 # ──────────────── Есть активный Future ──────────────── tasks.py#L214 ─ # Задача приостановлена на ожидании результата Future — отменяем его. if self._fut_waiter is not None: if self._fut_waiter.cancel(msg=msg): return True # ──────────────── Нет активного Future ──────────────── tasks.py#L221 ─ # (задача отменяет сама себя или еще не стартовала). Флаг будет считан в # начале следующего шага __step (см. блок CONSUME). self._must_cancel = True self._cancel_message = msg return True
Вызов
Task.cancel()не отменяет задачу, а лишь запрашивает ее отмену.Future.cancel()сразу меняет состояние объекта. В то же времяTask.cancel()выбрасываетCancelledErrorв пользовательский код позже: либо отменяя ожидаемый Future, либо выставляя флаг, который будет считан на следующем вызове__step. Сама Task считается отмененной только внутри__step, когда исполнение корутины заканчивается выбрасыванием исключенияCancelledError.
Рассмотрим отдельно каждый сценарий отмены.
Внешний вызов cancel()
Внешний код выполняет вызов task.cancel(), пока Task приостановлена на await в ожидании результата Future (атрибут _fut_waiter активен). Task.cancel() отменяет ее. CancelledError доходит до задачи на следующем __step: __wakeup распаковывает результат Future, передает исключение CancelledError в метод __step, после чего бросает его в корутину через вызов coro.throw(exc). В большинстве штатных ситуаций отмена задачи происходит именно таким образом.
Пример
async def worker(): # приостановлена на ожидании результата Future от вызова sleep (он становится _fut_waiter) await asyncio.sleep(10) async def main(): task = asyncio.create_task(worker()) # даем worker возможность дойти до await await asyncio.sleep(0) # Перед cancel: # task._fut_waiter — еще не завершенный Future (от sleep) # task._must_cancel — False task.cancel() # После cancel: # task._fut_waiter — тот же Future, теперь отмененный # task._must_cancel — по-прежнему False, его никто не трогал # CancelledError попадает сюда через цепочку коллбэков __wakeup -> __step try: await task except asyncio.CancelledError: pass
Task отменяет себя изнутри
В момент выполнения пользовательского кода корутины (внутри __step, между двумя вызовами await) задача вызывает cancel() на самой себе — например, так устроен контекстный менеджер asyncio.timeout(). Так как корутина выполняется, _fut_waiter равен None; Task.cancel() выставляет флаг _must_cancel. Дальше возможно несколько сценариев:
Корутина доходит до
await, который отдает Future → блок SUSPEND вызова__stepзаписывает этот Future в атрибут_fut_waiter, после чего сразу же отменяет Future.Корутина возвращает значение → задача помечается как отмененная в блоке RETURN.
В остальных случаях блок CONSUME превращает флаг
_must_cancelвCancelledError, после чего бросает его в корутину.
Пример
async def worker(): me = asyncio.current_task() # Перед cancel: # me._fut_waiter is None (worker выполняется) me.cancel() # После cancel: # me._must_cancel is True # me._fut_waiter по-прежнему None # Т.к. _must_cancel равен True, Future, созданный вызовом sleep, будет отменен в блоке SUSPEND. # Далее результат данного Future в виде CancelledError попадет в задачу на следующем вызове __step await asyncio.sleep(10) async def main(): task = asyncio.create_task(worker()) try: await task except asyncio.CancelledError: pass
Отмена задачи до ее старта
Вызов cancel() происходит между create_task() и первым шагом __step задачи. _fut_waiter is None, поэтому Task.cancel() выставляет флаг _must_cancel. Когда первый __step будет вызван, блок CONSUME выбросит CancelledError еще до того, как выполнится хотя бы одна строка тела корутины.
async def worker(): print("never runs") await asyncio.sleep(10) async def main(): task = asyncio.create_task(worker()) # Перед cancel: # task._fut_waiter is None (ни один __step еще не отработал) task.cancel() # После cancel: # task._must_cancel is True # task._fut_waiter по-прежнему None # Внутри первого вызова __step в блоке CONSUME будет выброшено исключение CancelledError. # Код worker не будет выполнен, по этой причине строка "never runs" никогда не напечатается. try: await task except asyncio.CancelledError: pass
Корутина может быть создана, но её код может так и не выполниться. Отмена до старта задачи приводит к тому, что любой пользовательский код игнорируется: код инициализации, очистка в
try/finallyблоках,__aenter__контекстного менеджера.
Как событие отмены проходит через цепочки вызовов await
Представьте две задачи: parent, который ждет child, и child, остановленный на вызове asyncio.sleep(10):
async def child(): await asyncio.sleep(10) async def parent(): await asyncio.create_task(child()) p = asyncio.create_task(parent())
Представим, что происходит отмена родительской задачи. В таком случае вызов p.cancel() вообще не отменяет parent. Он лишь отменяет Future, результат которого он ожидает (в данном случае задача child);
Дочерняя задача child делает то же самое; в конце концов цепочка вызовов cancel доходит до Future от вызова sleep, после чего происходит ее отмена. Оттуда CancelledError начинает распространяться обратно вверх по цепочке вызовов — сначала попадая в child, потом в parent. Каждая задача в цепочке будет считаться отмененной, только когда ее исполнение завершается из-за CancelledError (когда корутина сама выбросит это исключение).
Схематично это можно представить таким образом:

Главные выводы
Вызов
Task.cancel()не отменяет задачу, а лишь запрашивает ее отмену. Задача считается отмененной только тогда, когда ее исполнение завершается из-заCancelledError. В то же время вызовcancel()для Future сразу же меняет ее состояние.Task можно отменить еще до ее старта. Вызов
cancel()между созданием задачиcreate_task()и началом ее исполнения приводит к тому, что пользовательский код корутины не выполняется вовсе.CancelledError — не просто ошибка. Это управляющий объект в рамках механизма отмены задач.
Протокол счетчика отмены
Но на вызове cancel возможности Task не заканчиваются. Внутри объекта есть дополнительная переменная-счетчик — _num_cancels_requested. С его помощью можно реализовать вложенные scope отмены — на его основе построен asyncio.timeout.
Методы: cancelling() / uncancel()
Перед тем как изучать исходные, давайте посмотрим на интерфейс для работы с данным счетчиком. Всего есть 3 метода — cancel() увеличивает значение, uncancel() уменьшает, cancelling() возвращает значение:
Пример (цифры означают текущее возвращаемое значение вызова cancelling())
async def demo(): t = asyncio.create_task(asyncio.sleep(60)) print(t.cancelling()) # 0 исходное значение t.cancel(); print(t.cancelling()) # 1 инкремент t.cancel(); print(t.cancelling()) # 2 декремент t.uncancel(); print(t.cancelling()) # 1 декремент t.uncancel(); print(t.cancelling()) # 0 вернулись к исходному значению t.uncancel(); print(t.cancelling()) # 0 декремент не опускает значение ниже 0
Внутри Task.cancel() происходит инкремент данного счетчика (tasks.py#L208):
def cancel(self, msg=None): ... self._num_cancels_requested += 1 ...
Оставшиеся два метода: uncancel() и cancelling().
def uncancel(self): if self._num_cancels_requested > 0: # (1) декремент счетчика self._num_cancels_requested -= 1 if self._num_cancels_requested == 0: # (2) на нуле сбрасываем флаг _must_cancel self._must_cancel = False return self._num_cancels_requested # важно: если мы уже отменили Future — результат, который мы ждали, — вызов uncancel не повлияет на это def cancelling(self): return self._num_cancels_requested
Внутренности asyncio.timeout
Снаружи данный контекстный менеджер выглядит просто — мы устанавливаем время ожидания, после чего наш код отменяется. Может показаться неочевидным, как это может быть устроено под капотом — это всего лишь контекстный менеджер — и каким-то образом он прерывает исполнение нашего кода. Кроме того, когда он срабатывает, наружу выбрасывается не CancelledError, а TimeoutError. А также есть поддержка вложенных друг в друга контекстных менеджеров.
Давайте посмотрим, что происходит под капотом такого вызова:
async with asyncio.timeout(5.0): await long_running_call()
Работу контекстного менеджера Timeout (timeouts.py#L25) можно разбить на 4 шага:
Сохранение текущего состояния счетчика — на входе в контекстный менеджер запоминаем значение
task.cancelling().Регистрируем хэндлер для отмены текущей задачи — регистрируем его выполнение через
loop.call_at(when, _on_timeout), где_on_timeoutпросто вызываетtask.cancel()по истечении дедлайна.Запускаем пользовательский код.
-
На выходе из контекстного менеджера проверяем, была ли отменена наша задача. Если выбрасывается
CancelledError, вызываемtask.uncancel()— он уменьшает счетчик и возвращает текущее значение. Сравниваем это значение с сохраненным значением из шага 1:значение после uncancel > исходного → кто-то отменил нашу задачу извне; эта не наша отмена — пробрасываем
CancelledErrorнаружу.значение после uncancel == исходному → задача была отменена с помощью нашего коллбэка
_on_timeout. Это значит, что текущийCancelledErrorпринадлежит контекстному менеджеру — можем спокойно заменить его наTimeoutError.Happy-path:
__aexit__удаляет зарегистрированный коллбэк_on_timeout, после чего происходит штатный выход из контекстного менеджера.
Упрощенный исходный код asyncio.timeout (из CPython 3.14.0 Lib/asyncio/timeouts.py):
async def __aenter__(self): self._task = asyncio.current_task() # (1) сохраняем текущее значение счетчика self._cancelling = self._task.cancelling() loop = asyncio.get_running_loop() # (2) планируем отмену текущей задачи по истечении дедлайна self._timeout_handler = loop.call_at(self._when, self._on_timeout) self._state = _State.ENTERED return self # срабатывает после дедлайна def _on_timeout(self): self._task.cancel() self._state = _State.EXPIRING async def __aexit__(self, exc_type, exc_val, exc_tb): # всегда снимаем запланированный коллбэк (no-op, если он уже сработал) self._timeout_handler.cancel() # (4) проверяем, из-за чего задача была отменена if self._state is _State.EXPIRING: # задача была отменена нашим коллбэком # uncancel() возвращает значение счетчика в исходное состояние if self._task.uncancel() <= self._cancelling and exc_type is not None: if issubclass(exc_type, CancelledError): # значение после uncancel == исходному → отмена наша + не было других внешних запросов на отмену # можем подменить исключение на TimeoutError raise TimeoutError from exc_val # иначе: задача отменена извне — пробрасываем CancelledError дальше
Всегда пробрасывайте
CancelledErrorдальше. В противном случаеasyncio.timeoutи другие схожие примитивы могут работать некорректно.
Cancel-and-wait — дополнительная возможность, которую открывает счетчик
Более локальная проблема, однако все же встречается достаточно часто: родительская задача controller хочет отменить дочернюю задачу worker. Базовое решение выглядит так:
async def controller(): worker_task = asyncio.create_task(worker()) ... worker_task.cancel() try: await worker_task except asyncio.CancelledError: pass # воркер завершен, идем дальше
Решение кажется верным: отменить worker, дождаться его завершения, после чего проигнорировать CancelledError, ведь мы знаем, что это исключение — следствие нашего вызова cancel. Проблема заключается в том, что на самом деле это исключение может быть следствием того, что кто-то извне отменил нашу задачу controller! Необходимо уметь различать эти ситуации.
И мы можем добиться этого с использованием счетчика:
async def cancel_and_wait(task, msg=None): task.cancel(msg) try: await task except asyncio.CancelledError: if asyncio.current_task().cancelling() == 0: return # исключение связано с нашим запросом отмены raise # внешний код отменил нашу задачу else: raise RuntimeError("Cancelled task did not end with an exception")
Более подробно про это можно почитать здесь CPython #103486. Готовая реализация есть в aiotools.cancel_and_wait.
Счетчики в TaskGroup
Внутри asyncio.TaskGroup также используется протокол cancel() / uncancel() / cancelling(): когда дочерняя задача завершается с ошибкой, происходит отмена других задач в группе, после чего происходит проверка значения счетчика внутри __aexit__. Идея похожа на реализацию asyncio.timeout, более подробно можно посмотреть здесь taskgroups.py.
Главные выводы
У объекта Task есть встроенный счетчик числа отмен — протокол
cancel()/uncancel()/cancelling(). Он лежит в основеasyncio.timeout, иTaskGroup.Всегда пробрасывайте
CancelledErrorдальше. Иначеasyncio.timeoutи другие схожие примитивы могут работать некорректно.
CancelledError на практике: частые ошибки и подводные камни
CancelledError наследуется от BaseException
Начиная с Python 3.8, asyncio.CancelledError наследуется от BaseException, а не от Exception. Поэтому данный код работает некорректно:
Неправильно:
try: await work() except Exception: # ← CancelledError сюда НЕ попадет await cleanup() # ← при отмене пропускается raise
Правильно: обрабатывайте CancelledError явно.
try: await work() except (Exception, asyncio.CancelledError): await cleanup() raise
Несмотря на то что об этом пишут практически в каждом туториале по asyncio, ошибка все еще часто встречается во многих кодовых базах.
Особенности работы asyncio.shield
Как защитить ваш код от отмены? В большинстве случаев ответ очевиден — использовать asyncio.shield(). Под капотом asyncio.shield (Lib/asyncio/tasks.py, CPython 3.14.0) создает новый объект Future, после чего запускает фоновую задачу для выполнения do_critical_work(). Когда фоновая задача готова — она помещает результат в этот искусственно созданный контейнер. При этом если caller отменяют, отменяется внешняя Future, но не сама фоновая задача.
async def caller(): await asyncio.shield(do_critical_work())
Выглядит корректно, однако здесь есть несколько проблем.
Проблема первая: отсутствие сильной ссылки на задачу
Event-loop держит только слабые ссылки на задачи. Документация Python говорит о том, что необходимо явно сохранять ссылки на задачи
Save a reference to tasks passed to this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done.
При вызове asyncio.shield(do_critical_work()) пользовательский код никогда не видит фоновую задачу; shield прячет ее за отдельной Future.
Некоторые библиотеки используют кастомную реализацию shield для решения этой проблемы. Например, Scheduler.shield() из aiojobs (v1.4.0) дословно копирует исходник asyncio.shield() с добавлением нескольких строк (aiojobs/_scheduler.py:139–173):
def shield(self, arg): inner = asyncio.ensure_future(arg) if inner.done(): return inner # Эта функция — копия asyncio.shield(), за исключением # добавленных двух строк ниже. self._shields.add(inner) # ← сохраняем сильную ссылку на задачу inner.add_done_callback(self._shields.discard) # ← убираем сильную ссылку, когда задача завершилась loop = inner.get_loop() outer = loop.create_future() # ... все остальное — asyncio.shield() без изменений ...
На практике GC редко удаляет фоновые задачи. Например, при вызове
asyncio.sleepсоздается объект-таймер, который хранит обратную ссылку на задачу. Однако полагаться на это не стоит — в общем случае event loop не гарантирует этого.
Проблема вторая: фоновая задача может затеряться
shield защищает код от отмены, но он не дожидается окончания его работы! При отмене родительской задачи, фоновая продолжает работать. В итоге результат ее работы (или исключение) не будет обработано.
async def do_critical_work(): await asyncio.sleep(5) raise RuntimeError("nobody catches this") async def caller(): await asyncio.shield(do_critical_work()) task = asyncio.create_task(caller()) await asyncio.sleep(0.1) task.cancel() # caller завершается с CancelledError. # do_critical_work() продолжает работать еще ~5с сама по себе — # после чего она выбросит исключение, которое никто не обработает.
В простых случаях описанные выше проблемы можно попытаться обойти. Однако по мере усложнения кода это перестает масштабироваться. Для решения этих проблем anyio (которую под капотом используют FastAPI и Starlette) предлагает другой путь: связать все задачи в общее дерево, после чего помещать защищенные блоки кода в отдельные контекстные менеджеры. За счет этого мы избегаем проблем, описанных выше. Сами авторы библиотеки отмечают, что asyncio.shield() зачастую сложно использовать корректно на практике, и предлагают вместо него cancel scopes. Более подробно посмотрим на это в следующей статье.
Заключение
В данной статье мы разобрались, как под капотом устроена отмена задач в asyncio, и что на самом деле скрывается за CancelledError. Также мы посмотрели на возможности и внутреннее устройство более сложных примитивов стандартной библиотеки, таких как asyncio.timeout.
Однако применение стандартных инструментов asyncio имеет свои трудности. Сложности, связанные с asyncio.shield, — лишь один из симптомов более глобальной проблемы, вызванной отсутствием высокоуровневых примитивов.
Подробнее о вариантах решения этих проблем мы поговорим в следующей статье. В ней мы разберемся, что предлагает и как работает библиотека anyio. Также посмотрим на trio — отдельный рантайм, давший начало самой модели cancel scope, которую переняли остальные.