При работе с асинхронным кодом нам часто приходится обрабатывать CancelledError — сигнал отмены задачи. С точки зрения приложения это выглядит как обычное исключение — достаточно добавить try/except блок и продолжить работу. Хотя в большинстве случаев это и правда работает, иногда все же приходится копнуть глубже и разобраться, как устроен механизм отмены под капотом и для чего на самом деле нужен CancelledError.

Впервые я столкнулся с такой потребностью при работе над FastAPI-сервисом для стриминга ответов от LLM. Запросы обрабатывались долго, и клиенты часто закрывали соединения, не дожидаясь ответа. В таких случаях некорректная обработка дисконнектов приводила к зависшим транзакциям, сломанному пулу соединений с БД, а также к неэффективной утилизации ресурсов. Подробнее об этом — в прошлой статье: в ней детальный разбор того, что происходит при обрыве соединения, от уровня TCP до ASGI и кода вашего приложения.

В тот момент казалось, что проблема решена: стало понятно, что стоит за дисконнектами, как они устроены и к чему приводят. В конце концов, с точки зрения приложения дисконнект выглядит просто как отмена обработчика запроса — тот самый CancelledError. Однако не все так просто. В процессе реализации появляются новые вопросы. Какой код отменять безопасно, а какой нет? Как не засорить весь код, включая бизнес-логику, однотипными обработчиками отмены? Что делать, когда есть несколько источников отмены — например, задача одновременно прерывается из-за таймаута и дисконнекта пользователя?

Это первая из двух статей, посвященных механике отмены задач. В ней мы остановимся на стандартном asyncio. Узнаем, что на самом деле представляет собой CancelledError с точки зрения event-loop. Разберемся, как работает счетчик отмены (cancel/uncancel), на котором построены TaskGroup и asyncio.timeout. Наконец, обсудим проблемы, которые возникают на практике, в первую очередь связанные с asyncio.shield.

Во второй части мы посмотрим на более высокоуровневые примитивы из anyio и trio, а также на способы достичь похожих результатов на «чистом» asyncio.

Далее в рамках статьи приводятся ссылки на исходный код CPython версии 3.14.0. Начиная с версии 3.13.0 логика отмены задач существенным образом не менялась, по этой причине все сказанное актуально для любой версии Python начиная с 3.13.0.

Сниппеты кода представляют собой упрощенный вариант реализации механики отмены задач CPython: Lib/asyncio/tasks.py, Lib/asyncio/futures.py, Modules/_asynciomodule.c.

Как устроен asyncio: Task, Future, event loop

Прежде чем погружаться в механику отмены задач, необходимо разобраться, как устроены ключевые объекты asyncio (Task, Future и корутины).

Основные сущности asyncio

  1. Корутина (coroutine) — пользовательский код (async def). Объект, который может приостанавливать свою работу с помощью ключевого слова await. Перед остановкой в корутине создается контейнер для результата (Future).

  2. Future — контейнер для результата. По готовности результат из Future передается обратно в корутину и она продолжает работу с места последнего await.

  3. Task (задача или просто таска) — объект, управляющий жизненным циклом корутины в рамках событийного цикла. Task получает Future из корутины, по готовности результата снова запускает корутину.

  4. Событийный цикл (event loop) — планировщик задач. Управляет выполнением тасок, запускает коллбэки, отвечает за события ввода-вывода.

Если соединить все вместе: event loop ждет внешних событий. Событие приводит к запуску коллбэка, который кладет результат во Future — это пробуждает ждавшую его таску. Таска выполняет корутину до следующего await, после чего цикл повторяется.

Давайте разберемся на примере:

import asyncio

async def fetch():
    print("start")            # Task исполняет корутину; эта строка выполняется.
    await asyncio.sleep(1)    # Корутина отдает Future; Task подписывается на результат и приостанавливает работу.
    print("done")             # Сработал таймер, Future заполнен, Task возобновляет работу корутины.
    return 42                 # Корутина возвращает результат; Task записывает 42 как свой результат.

asyncio.run(fetch())          # Создает цикл, оборачивает корутину в Task, выполняет ее до завершения.

Далее детально рассмотрим, как это устроено в коде CPython.

Как устроены Task и Future

Task — это наследник Future, оборачивающий корутину (с дополнительными атрибутами). Ниже представлена упрощенная структура объекта Task. Диагностические и служебные атрибуты (_name, _loop, _context, _callbacks, _source_traceback, …) опущены (tasks.py#L56, futures.py#L31):

class Task(Future):
    _coro: Coroutine                           # обернутая корутина                                       — tasks.py L103
    _fut_waiter: Future | None                 # future, результат которого в данный момент ожидает таска — tasks.py L102
    # Атрибуты, отслеживающие статус отмены таски
    _must_cancel: bool                         # есть ли отложенный запрос на отмену таски                — tasks.py L101
    _num_cancels_requested: int                # сколько было запросов на отмену таски                    — tasks.py L100

    # Исход — унаследовано от Future
    _state: {_PENDING, _CANCELLED, _FINISHED}  # текущее состояние таски                                  — futures.py L50
    _cancel_message: object | None             # аргумент msg=, переданный в cancel()                     — futures.py L55
    _cancelled_exc: CancelledError | None      # сохраненное исключение отмены (хранит traceback)         — futures.py L57
    ...

Чуть позже мы детальнее рассмотрим каждое из этих полей.

Жизненный цикл Task: __step и __wakeup

За исполнение кода корутины отвечают два метода объекта Task__step и __wakeup.

Ментальная модель: __step выполняет код корутины от одного await до следующего. Когда происходит внешнее событие, исполнение таски возобновляется через вызов коллбэка __wakeup. Суть __wakeup в том, чтобы обработать результат Future и снова запустить метод __step. Далее цикл повторяется.

Во внутренности этих двух методов мы заглянем чуть ниже, а пока давайте снова рассмотрим механизм работы Task, но в этот раз через призму этих функций:

  1. Event loop выполняет задачу.

    Происходит вызов task.__step(). (tasks.py#L266)

  2. Task начинает исполнять код корутины (или пробрасывает в нее исключение).

    Код корутины выполняется без прерываний до следующего await или return. (tasks.py#L289 — send, #L291 — throw)

  3. Создание Future и регистрация источника пробуждения.

    Внутри корутины происходит асинхронный вызов (например, asyncio.sleep или loop.sock_recv). Такой вызов создает Future для будущего результата и регистрирует источник пробуждения в event loop: asyncio.sleep использует call_later, а loop.sock_recvadd_reader. Внешнее событие приведет к вызову этого коллбэка, который, в свою очередь, поместит результат в этот Future (см. шаг 8). (tasks.py#L698, selector_events.py#L368)

  4. Выполнение корутины приостанавливается, контейнер Future передается в таску.

    Корутина передает таске объект Future через возвращаемое значение вызова coro.send(None). (tasks.py#L289)

  5. Таска подписывается на результат Future.

    Внутри __step происходит регистрация коллбэка для объекта Futureresult.add_done_callback(self.__wakeup). (tasks.py#L327). Этот коллбэк будет вызван, когда результат будет помещен во Future.

  6. Future записывается в поле таски _fut_waiter

    Вызов __step записывает Future в атрибут таски _fut_waiter; теперь объект Task знает, результат чего он ждет. (tasks.py#L329)

  7. Таска передает управление планировщику.

    Вызов __step завершается; цикл обрабатывает другие таски, таймеры и события ввода-вывода.

  8. Происходит внешнее событие.

    Срабатывает таймер, в сокет пришли байты, и так далее; вызывается коллбэк, который помещает результат в объект Future, что в свою очередь, инициирует вызов __wakeup через loop.call_soon. (futures.py#L179)

  9. Возобновление работы таски.

    Планировщик вызывает __wakeup(future), который распаковывает контейнер Future (tasks.py#L359) и вызывает __step — пробрасывая исключение, если Future завершился ошибкой, иначе возобновляет корутину. Результат из Future подставляется в место вызова await. (tasks.py#L373)

  10. Повторение цикла.

Шаги 2–9 повторяются, пока корутина не завершит исполнение (выбросив исключение StopIteration). (tasks.py#L292)

Механика отмены задач

Для отмены задачи у объекта Task есть публичный метод cancel(). Давайте разберемся, что происходит во время его вызова с точки зрения событийного цикла.

Внутренности методов __step и __wakeup

Событие отмены — момент, когда CancelledError появляется в пользовательском коде, — происходит внутри __step. Ниже представлен упрощенный код данного метода: блоки CONSUME, CLEAR, RETURN, CANCEL, SUSPEND — это ключевые места, связанные с обработкой событий отмены. Далее мы будем ссылаться на них при разборе различных сценариев отмены таски.

def __step(self, exc=None):
    coro = self._coro

    # ───────────── CONSUME: отложенная отмена ───────────── tasks.py#L270 ─
    # Флаг _must_cancel равен True: вызов cancel() выставил его в момент,
    # когда задача не ждала результат какого-либо Future — например, вызов
    # cancel() произошел сразу после создания задачи, но до ее первого
    # запуска (до первого __step). В этом случае вручную создаем
    # CancelledError и пробрасываем его ниже.
    if self._must_cancel:
        if not isinstance(exc, CancelledError):
            exc = self._make_cancelled_error()
        self._must_cancel = False

    # ───────────── CLEAR: очистка _fut_waiter ───────────── tasks.py#L274 ─
    # Future, результат которого мы ждали, завершен; сбрасываем значение
    self._fut_waiter = None

    # Исполнение пользовательского кода корутины: send(None) возобновляет ее
    # при нормальном исполнении; throw(exc) возобновляет, пробрасывая ошибку
    # в месте текущего await — например, ожидаемая внешняя операция
    # завершилась с ошибкой (либо поступил CancelledError из блока CONSUME).
    try:
        if exc is None:
            result = coro.send(None)
        else:
            result = coro.throw(exc)

    # ───────── RETURN: корутина вернула значение ────────── tasks.py#L292 ─
    except StopIteration as exc:
        if self._must_cancel:
            # Корутина сама отменила исполнение своей задачи, после чего
            # вернула значение — например, `current_task().cancel(); return`.
            self._must_cancel = False
            # Помечаем Task как отмененную 
            # (super().cancel() есть Future.cancel()).
            super().cancel(msg=self._cancel_message)
        else:
            # Штатное завершение: сохраняем возвращаемое корутиной значение
            super().set_result(exc.value)

    # ───── CANCEL: корутина выбросила CancelledError ────── tasks.py#L299 ─
    except CancelledError as exc:
        # Сохраняем исходное исключение. При пробуждении родительской задачи
        # (внутри __wakeup) вызов future.result() выбросит именно это
        # конкретное исключение, затем __step пробросит его в корутину
        # родительской задачи через вызов coro.throw(exc)
        self._cancelled_exc = exc
        # Помечаем Task как отмененную 
        # (super().cancel() есть Future.cancel()).
        super().cancel()

    # ──── SUSPEND: корутина вернула управление (await) ──── tasks.py#L308 ─
    else:
        # Корутина приостановила работу, ожидая результат асинхронного
        # вызова. Контейнер Future для результата находится в локальной
        # переменной result.

        # Подписываемся на результат Future. Внешнее событие заполнит
        # контейнер, после чего произойдет вызов коллбэка __wakeup, который
        # возобновит исполнение задачи.
        result.add_done_callback(self.__wakeup)
        # Записываем этот Future в атрибут объекта задачи: теперь задача
        # знает, результат чего ждет
        self._fut_waiter = result

        # Корутина отменила свою таску, после чего сделала асинхронный вызов
        # — например, `current_task().cancel(); await asyncio.sleep(...)`.
        # Отменяем только что созданный Future: если он отменился -
        # сбрасываем флаг _must_cancel. В случае если Future был создан и
        # сразу же завершен, флаг остается, и отмена обработается на
        # следующем шаге __step в блоке CONSUME.
        if self._must_cancel:
            if self._fut_waiter.cancel(msg=self._cancel_message):
                self._must_cancel = False

__wakeup — это коллбэк, который будет вызван, когда результат Future готов (tasks.py#L359). Он вызывается в момент, когда нужно возобновить исполнение задачи — обрабатывает результат из Future, после чего вызывает __step:

def __wakeup(self, future):
    try:
        # Выбрасывает исключение, если внешний вызов завершился с ошибкой или Future был отменен. 
        future.result()
    except BaseException as exc:
        # Запуск следующего шага задачи. Исключение будет передано в корутину через вызов coro.throw(exc).
        self.__step(exc)
    else:
        # Запуск следующего шага задачи: исполнение корутины продолжится
        # через coro.send(None). Результат Future не передается напрямую —
        # он подставится в точке последнего await как результат yield from
        # (Future.__await__ делает `return self.result()`, см.
        # https://github.com/python/cpython/blob/v3.14.0/Lib/asyncio/futures.py#L292)
        self.__step()

API для отмены Future и Task

Task — это Future, поэтому, прежде чем разбираться с Task.cancel(), давайте посмотрим, как устроен Future.cancel()(futures.py#L152):

def cancel(self, msg=None):
    if self._state != _PENDING:
        return False

    self._state = _CANCELLED
    self._cancel_message = msg

    # Future завершен. Происходит планирование запуска коллбэков.
    # Задача, ожидающая результат этого Future, зарегистрировала здесь свой __wakeup.
    # Этот коллбэк будет вызван на следующем шаге работы планировщика.
    self.__schedule_callbacks()
    return True

Task переопределяет метод cancel (tasks.py#L183). Тело функции распадается на три сценария, в зависимости от значения _fut_waiter:

def cancel(self, msg=None):
    # ────────────────── Задача завершена ────────────────── tasks.py#L206 ─
    if self.done():
        return False

    # ────────────── Инкремент счетчика отмен ────────────── tasks.py#L208 ─
    # Каждый вызов cancel() увеличивает значение счетчика
    self._num_cancels_requested += 1

    # ──────────────── Есть активный Future ──────────────── tasks.py#L214 ─
    # Задача приостановлена на ожидании результата Future — отменяем его.
    if self._fut_waiter is not None:
        if self._fut_waiter.cancel(msg=msg):
            return True

    # ──────────────── Нет активного Future ──────────────── tasks.py#L221 ─
    # (задача отменяет сама себя или еще не стартовала). Флаг будет считан в
    # начале следующего шага __step (см. блок CONSUME).
    self._must_cancel = True
    self._cancel_message = msg
    return True

Вызов Task.cancel() не отменяет задачу, а лишь запрашивает ее отмену. Future.cancel() сразу меняет состояние объекта. В то же время Task.cancel() выбрасывает CancelledError в пользовательский код позже: либо отменяя ожидаемый Future, либо выставляя флаг, который будет считан на следующем вызове __step. Сама Task считается отмененной только внутри __step, когда исполнение корутины заканчивается выбрасыванием исключения CancelledError.

Рассмотрим отдельно каждый сценарий отмены.

Внешний вызов cancel()

Внешний код выполняет вызов task.cancel(), пока Task приостановлена на await в ожидании результата Future (атрибут _fut_waiter активен). Task.cancel() отменяет ее. CancelledError доходит до задачи на следующем __step: __wakeup распаковывает результат Future, передает исключение CancelledError в метод __step, после чего бросает его в корутину через вызов coro.throw(exc). В большинстве штатных ситуаций отмена задачи происходит именно таким образом.

Пример

async def worker():
    # приостановлена на ожидании результата Future от вызова sleep (он становится _fut_waiter)
    await asyncio.sleep(10)

async def main():
    task = asyncio.create_task(worker())
    # даем worker возможность дойти до await
    await asyncio.sleep(0)

    # Перед cancel:
    #   task._fut_waiter — еще не завершенный Future (от sleep)
    #   task._must_cancel — False
    task.cancel()
    # После cancel:
    #   task._fut_waiter — тот же Future, теперь отмененный
    #   task._must_cancel — по-прежнему False, его никто не трогал

    # CancelledError попадает сюда через цепочку коллбэков __wakeup -> __step
    try:
        await task
    except asyncio.CancelledError:
        pass

Task отменяет себя изнутри

В момент выполнения пользовательского кода корутины (внутри __step, между двумя вызовами await) задача вызывает cancel() на самой себе — например, так устроен контекстный менеджер asyncio.timeout(). Так как корутина выполняется, _fut_waiter равен None; Task.cancel() выставляет флаг _must_cancel. Дальше возможно несколько сценариев:

  • Корутина доходит до await, который отдает Future → блок SUSPEND вызова __step записывает этот Future в атрибут _fut_waiter, после чего сразу же отменяет Future.

  • Корутина возвращает значение → задача помечается как отмененная в блоке RETURN.

  • В остальных случаях блок CONSUME превращает флаг _must_cancel в CancelledError, после чего бросает его в корутину.

Пример

async def worker():
    me = asyncio.current_task()
    # Перед cancel:
    #   me._fut_waiter is None (worker выполняется)
    me.cancel()
    # После cancel:
    #   me._must_cancel is True
    #   me._fut_waiter по-прежнему None

    # Т.к. _must_cancel равен True, Future, созданный вызовом sleep, будет отменен в блоке SUSPEND.
    # Далее результат данного Future в виде CancelledError попадет в задачу на следующем вызове __step
    await asyncio.sleep(10)

async def main():
    task = asyncio.create_task(worker())
    try:
        await task
    except asyncio.CancelledError:
        pass

Отмена задачи до ее старта

Вызов cancel() происходит между create_task() и первым шагом __step задачи. _fut_waiter is None, поэтому Task.cancel() выставляет флаг _must_cancel. Когда первый __step будет вызван, блок CONSUME выбросит CancelledError еще до того, как выполнится хотя бы одна строка тела корутины.

async def worker():
    print("never runs")
    await asyncio.sleep(10)

async def main():
    task = asyncio.create_task(worker())
    # Перед cancel:
    #   task._fut_waiter is None (ни один __step еще не отработал)
    task.cancel()
    # После cancel:
    #   task._must_cancel is True
    #   task._fut_waiter по-прежнему None

    # Внутри первого вызова __step в блоке CONSUME будет выброшено исключение CancelledError.
    # Код worker не будет выполнен, по этой причине строка "never runs" никогда не напечатается.
    try:
        await task
    except asyncio.CancelledError:
        pass

Корутина может быть создана, но её код может так и не выполниться. Отмена до старта задачи приводит к тому, что любой пользовательский код игнорируется: код инициализации, очистка в try/finally блоках, __aenter__ контекстного менеджера.

Как событие отмены проходит через цепочки вызовов await

Представьте две задачи: parent, который ждет child, и child, остановленный на вызове asyncio.sleep(10):

async def child():
    await asyncio.sleep(10)

async def parent():
    await asyncio.create_task(child())

p = asyncio.create_task(parent())

Представим, что происходит отмена родительской задачи. В таком случае вызов p.cancel() вообще не отменяет parent. Он лишь отменяет Future, результат которого он ожидает (в данном случае задача child);

Дочерняя задача child делает то же самое; в конце концов цепочка вызовов cancel доходит до Future от вызова sleep, после чего происходит ее отмена. Оттуда CancelledError начинает распространяться обратно вверх по цепочке вызовов — сначала попадая в child, потом в parent. Каждая задача в цепочке будет считаться отмененной, только когда ее исполнение завершается из-за CancelledError (когда корутина сама выбросит это исключение).

Схематично это можно представить таким образом:

Главные выводы

  1. Вызов Task.cancel() не отменяет задачу, а лишь запрашивает ее отмену. Задача считается отмененной только тогда, когда ее исполнение завершается из-за CancelledError. В то же время вызов cancel() для Future сразу же меняет ее состояние.

  2. Task можно отменить еще до ее старта. Вызов cancel() между созданием задачи create_task() и началом ее исполнения приводит к тому, что пользовательский код корутины не выполняется вовсе.

  3. CancelledError — не просто ошибка. Это управляющий объект в рамках механизма отмены задач.

Протокол счетчика отмены

Но на вызове cancel возможности Task не заканчиваются. Внутри объекта есть дополнительная переменная-счетчик — _num_cancels_requested. С его помощью можно реализовать вложенные scope отмены — на его основе построен asyncio.timeout.

Методы: cancelling() / uncancel()

Перед тем как изучать исходные, давайте посмотрим на интерфейс для работы с данным счетчиком. Всего есть 3 метода — cancel() увеличивает значение, uncancel() уменьшает, cancelling() возвращает значение:

Пример (цифры означают текущее возвращаемое значение вызова cancelling())

async def demo():
    t = asyncio.create_task(asyncio.sleep(60))  

    print(t.cancelling())                  # 0 исходное значение
    t.cancel();   print(t.cancelling())    # 1 инкремент
    t.cancel();   print(t.cancelling())    # 2 декремент
    t.uncancel(); print(t.cancelling())    # 1 декремент
    t.uncancel(); print(t.cancelling())    # 0 вернулись к исходному значению
    t.uncancel(); print(t.cancelling())    # 0 декремент не опускает значение ниже 0

Внутри Task.cancel() происходит инкремент данного счетчика (tasks.py#L208):

def cancel(self, msg=None):
    ...
    self._num_cancels_requested += 1
    ...

Оставшиеся два метода: uncancel() и cancelling().

def uncancel(self):
    if self._num_cancels_requested > 0:
        # (1) декремент счетчика
        self._num_cancels_requested -= 1
        if self._num_cancels_requested == 0:
            # (2) на нуле сбрасываем флаг _must_cancel
            self._must_cancel = False
    return self._num_cancels_requested
    # важно: если мы уже отменили Future — результат, который мы ждали, — вызов uncancel не повлияет на это

def cancelling(self):
    return self._num_cancels_requested

Внутренности asyncio.timeout

Снаружи данный контекстный менеджер выглядит просто — мы устанавливаем время ожидания, после чего наш код отменяется. Может показаться неочевидным, как это может быть устроено под капотом — это всего лишь контекстный менеджер — и каким-то образом он прерывает исполнение нашего кода. Кроме того, когда он срабатывает, наружу выбрасывается не CancelledError, а TimeoutError. А также есть поддержка вложенных друг в друга контекстных менеджеров.

Давайте посмотрим, что происходит под капотом такого вызова:

async with asyncio.timeout(5.0):
    await long_running_call()

Работу контекстного менеджера Timeout (timeouts.py#L25) можно разбить на 4 шага:

  1. Сохранение текущего состояния счетчика — на входе в контекстный менеджер запоминаем значение task.cancelling().

  2. Регистрируем хэндлер для отмены текущей задачи — регистрируем его выполнение через loop.call_at(when, _on_timeout), где _on_timeout просто вызывает task.cancel() по истечении дедлайна.

  3. Запускаем пользовательский код.

  4. На выходе из контекстного менеджера проверяем, была ли отменена наша задача. Если выбрасывается CancelledError, вызываем task.uncancel() — он уменьшает счетчик и возвращает текущее значение. Сравниваем это значение с сохраненным значением из шага 1:

    • значение после uncancel > исходного → кто-то отменил нашу задачу извне; эта не наша отмена — пробрасываем CancelledError наружу.

    • значение после uncancel == исходному → задача была отменена с помощью нашего коллбэка _on_timeout. Это значит, что текущий CancelledError принадлежит контекстному менеджеру — можем спокойно заменить его на TimeoutError.

    • Happy-path: __aexit__ удаляет зарегистрированный коллбэк _on_timeout, после чего происходит штатный выход из контекстного менеджера.

Упрощенный исходный код asyncio.timeout (из CPython 3.14.0 Lib/asyncio/timeouts.py):

async def __aenter__(self):
    self._task = asyncio.current_task()
    # (1) сохраняем текущее значение счетчика
    self._cancelling = self._task.cancelling()
    loop = asyncio.get_running_loop()
    # (2) планируем отмену текущей задачи по истечении дедлайна
    self._timeout_handler = loop.call_at(self._when, self._on_timeout)
    self._state = _State.ENTERED
    return self

# срабатывает после дедлайна
def _on_timeout(self):
    self._task.cancel()
    self._state = _State.EXPIRING

async def __aexit__(self, exc_type, exc_val, exc_tb):
    # всегда снимаем запланированный коллбэк (no-op, если он уже сработал)
    self._timeout_handler.cancel()
    # (4) проверяем, из-за чего задача была отменена
    if self._state is _State.EXPIRING:
        # задача была отменена нашим коллбэком
        # uncancel() возвращает значение счетчика в исходное состояние
        if self._task.uncancel() <= self._cancelling and exc_type is not None:
            if issubclass(exc_type, CancelledError):
                # значение после uncancel == исходному → отмена наша + не было других внешних запросов на отмену
                # можем подменить исключение на TimeoutError
                raise TimeoutError from exc_val
            
        # иначе: задача отменена извне — пробрасываем CancelledError дальше

Всегда пробрасывайте CancelledError дальше. В противном случае asyncio.timeout и другие схожие примитивы могут работать некорректно.

Cancel-and-wait — дополнительная возможность, которую открывает счетчик

Более локальная проблема, однако все же встречается достаточно часто: родительская задача controller хочет отменить дочернюю задачу worker. Базовое решение выглядит так:

async def controller():
    worker_task = asyncio.create_task(worker())
    ...
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        pass    # воркер завершен, идем дальше

Решение кажется верным: отменить worker, дождаться его завершения, после чего проигнорировать CancelledError, ведь мы знаем, что это исключение — следствие нашего вызова cancel. Проблема заключается в том, что на самом деле это исключение может быть следствием того, что кто-то извне отменил нашу задачу controller! Необходимо уметь различать эти ситуации.

И мы можем добиться этого с использованием счетчика:

async def cancel_and_wait(task, msg=None):
    task.cancel(msg)
    try:
        await task
    except asyncio.CancelledError:
        if asyncio.current_task().cancelling() == 0:
            return                  # исключение связано с нашим запросом отмены
        raise                       # внешний код отменил нашу задачу
    else:
        raise RuntimeError("Cancelled task did not end with an exception")

Более подробно про это можно почитать здесь CPython #103486. Готовая реализация есть в aiotools.cancel_and_wait.

Счетчики в TaskGroup

Внутри asyncio.TaskGroup также используется протокол cancel() / uncancel() / cancelling(): когда дочерняя задача завершается с ошибкой, происходит отмена других задач в группе, после чего происходит проверка значения счетчика внутри __aexit__. Идея похожа на реализацию asyncio.timeout, более подробно можно посмотреть здесь taskgroups.py.

Главные выводы

  1. У объекта Task есть встроенный счетчик числа отмен — протокол cancel() / uncancel() / cancelling(). Он лежит в основе asyncio.timeout, и TaskGroup.

  2. Всегда пробрасывайте CancelledError дальше. Иначе asyncio.timeout и другие схожие примитивы могут работать некорректно.

CancelledError на практике: частые ошибки и подводные камни

CancelledError наследуется от BaseException

Начиная с Python 3.8, asyncio.CancelledError наследуется от BaseException, а не от Exception. Поэтому данный код работает некорректно:

Неправильно:

try:
    await work()
except Exception:        # ← CancelledError сюда НЕ попадет
    await cleanup()      # ← при отмене пропускается
    raise

Правильно: обрабатывайте CancelledError явно.

try:
    await work()
except (Exception, asyncio.CancelledError):
    await cleanup()
    raise

Несмотря на то что об этом пишут практически в каждом туториале по asyncio, ошибка все еще часто встречается во многих кодовых базах.

Особенности работы asyncio.shield

Как защитить ваш код от отмены? В большинстве случаев ответ очевиден — использовать asyncio.shield(). Под капотом asyncio.shield (Lib/asyncio/tasks.py, CPython 3.14.0) создает новый объект Future, после чего запускает фоновую задачу для выполнения do_critical_work(). Когда фоновая задача готова — она помещает результат в этот искусственно созданный контейнер. При этом если caller отменяют, отменяется внешняя Future, но не сама фоновая задача.

async def caller():
    await asyncio.shield(do_critical_work())

Выглядит корректно, однако здесь есть несколько проблем.

Проблема первая: отсутствие сильной ссылки на задачу

Event-loop держит только слабые ссылки на задачи. Документация Python говорит о том, что необходимо явно сохранять ссылки на задачи

Save a reference to tasks passed to this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done.

При вызове asyncio.shield(do_critical_work()) пользовательский код никогда не видит фоновую задачу; shield прячет ее за отдельной Future.

Некоторые библиотеки используют кастомную реализацию shield для решения этой проблемы. Например, Scheduler.shield() из aiojobs (v1.4.0) дословно копирует исходник asyncio.shield() с добавлением нескольких строк (aiojobs/_scheduler.py:139–173):

def shield(self, arg):
    inner = asyncio.ensure_future(arg)
    if inner.done():
        return inner

    # Эта функция — копия asyncio.shield(), за исключением
    # добавленных двух строк ниже.
    self._shields.add(inner)                              # ← сохраняем сильную ссылку на задачу
    inner.add_done_callback(self._shields.discard)        # ← убираем сильную ссылку, когда задача завершилась

    loop = inner.get_loop()
    outer = loop.create_future()
    # ... все остальное — asyncio.shield() без изменений ...

На практике GC редко удаляет фоновые задачи. Например, при вызове asyncio.sleep создается объект-таймер, который хранит обратную ссылку на задачу. Однако полагаться на это не стоит — в общем случае event loop не гарантирует этого.

Проблема вторая: фоновая задача может затеряться

shield защищает код от отмены, но он не дожидается окончания его работы! При отмене родительской задачи, фоновая продолжает работать. В итоге результат ее работы (или исключение) не будет обработано.

async def do_critical_work():
    await asyncio.sleep(5)
    raise RuntimeError("nobody catches this")

async def caller():
    await asyncio.shield(do_critical_work())

task = asyncio.create_task(caller())
await asyncio.sleep(0.1)
task.cancel()

# caller завершается с CancelledError.
# do_critical_work() продолжает работать еще ~5с сама по себе —
# после чего она выбросит исключение, которое никто не обработает.

В простых случаях описанные выше проблемы можно попытаться обойти. Однако по мере усложнения кода это перестает масштабироваться. Для решения этих проблем anyio (которую под капотом используют FastAPI и Starlette) предлагает другой путь: связать все задачи в общее дерево, после чего помещать защищенные блоки кода в отдельные контекстные менеджеры. За счет этого мы избегаем проблем, описанных выше. Сами авторы библиотеки отмечают, что asyncio.shield() зачастую сложно использовать корректно на практике, и предлагают вместо него cancel scopes. Более подробно посмотрим на это в следующей статье.

Заключение

В данной статье мы разобрались, как под капотом устроена отмена задач в asyncio, и что на самом деле скрывается за CancelledError. Также мы посмотрели на возможности и внутреннее устройство более сложных примитивов стандартной библиотеки, таких как asyncio.timeout.

Однако применение стандартных инструментов asyncio имеет свои трудности. Сложности, связанные с asyncio.shield, — лишь один из симптомов более глобальной проблемы, вызванной отсутствием высокоуровневых примитивов.

Подробнее о вариантах решения этих проблем мы поговорим в следующей статье. В ней мы разберемся, что предлагает и как работает библиотека anyio. Также посмотрим на trio — отдельный рантайм, давший начало самой модели cancel scope, которую переняли остальные.

Комментарии (0)