Привет, Хабр! Публикуем пятую часть (перваявтораятретьячетвёртая) перевода руководства по модулю asyncio в Python. Здесь представлены разделы оригинала с №10 по №13.

10. Конкурентный запуск нескольких корутин

Сила asyncio в том, что этот модуль позволяет запускать множество корутин в конкурентном режиме.

Эти корутины могут быть созданы в виде группы и сохранены, а потом их можно, все вместе, в одно и то же время, запустить.

Реализовать такой сценарий работы можно, воспользовавшись функцией asyncio.gather().

Познакомимся с этой функцией.

10.1. Функция asyncio.gather()

Функция asyncio.gather() позволяет вызывающей стороне группировать объекты, допускающие ожидание.Эти объекты, после группировки, можно запустить в конкурентном режиме. Кроме того — можно организовать ожидание их выполнения. Их выполнение можно и отменить.

awaitable asyncio.gather(*aws, return_exceptions=False)

Запускает в конкурентном режиме объекты, допускающие ожидание, находящиеся в группе объектов aws.

Coroutines and Tasks

Эта функция пригодится и для группировки, и для выполнения множества корутин или задач:

...
выполнение коллекции объектов, допускающих ожидание
results = await asyncio.gather(coro1(), asyncio.create_task(coro2()))

Функцией asyncio.gather() можно воспользоваться в ситуации, когда есть возможность заранее создать множество задач или корутин, которые потом нужно одновременно запустить и, прежде чем продолжать работу, нужно дождаться завершения работы каждой из них.

Это — вполне реальная ситуация в том случае, если требуется получить результат от множества похожих задач, например — от одной и той же задачи или корутины, разные экземпляры которой обрабатывают разные данные.

Объекты, допускающие ожидание, можно выполнить в конкурентном режиме. После того, как они возвратят результаты, которых ожидает основная программа —  она сможет продолжить работу.

Применение функции gather() даёт больше возможностей, чем обычное ожидание завершения работы задач.

Она позволяет рассматривать группу объектов, допускающих ожидание, как один такой объект.

Вот какие возможности это открывает:

  • Запуск группы объектов, допускающих ожидание, и ожидание завершения их работы с использованием выражения await.

  • Получение результатов от всех сгруппированных объектов, допускающих ожидание, путём вызова в нужное время метода result().

  • Выполнение группы объектов, допускающих ожидание, может быть отменено с помощью метода cancel().

  • Проверка того, завершена ли работа всех сгруппированных объектов, допускающих ожидание, может быть выполнена с помощью метода done().

  • Выполнить функции-коллбэки можно только тогда, когда все задачи в группе завершат работу.

И это — далеко не все возможности функции gather().

10.2. Как пользоваться функцией asyncio.gather()

Здесь мы подробнее поговорим о способах использования функции asyncio.gather().

Эта функция, в качестве аргументов, принимает один объект, допускающий ожидание, или несколько таких объектов.

Вспомните о том, что в роли объекта, допускающего ожидание, может выступать корутина, объект Future, или объект Task.

В результате — вызывать функцию gather() можно, передавая ей следующие сущности:

  • Множество задач.

  • Множество корутин.

  • Смешанный набор объектов, некоторые из которых являются задачами, а некоторые — корутинами.

Например:

...
выполнение нескольких корутин
asyncio.gather(coro1(), coro2())

Если функции gather() передали объекты Task — они уже будут выполняться, так как планирование выполнения таких объектов — это часть процесса их создания.

Эта функция принимает объекты, допускающие ожидание, в виде позиционных аргументов.

Нельзя создать список или коллекцию таких объектов и передать их этой функции, так как это приведёт к ошибке:

...
функции нельзя передать напрямую список объектов, допускающих ожидание
asyncio.gather([coro1(), coro2()])

Список объектов, допускающих ожидание, можно передать этой функции в том случае, если он сначала распакован и представлен в виде набора его элементов с использованием оператора «звёздочка» — *:

...
вызов функции с передачей ей распакованного списка объектов, допускающих ожидание
asyncio.gather(*[coro1(), coro2()])

Если этой функции передают корутины — они будут автоматически обёрнуты в объекты Task.

Функция gather() не является блокирующей. Она, вместо блокировки выполнения кода, возвращает объект asyncio.Future, который представляет группу объектов, допускающих ожидание.

Например:

...
получение объекта Future, который представляет несколько объектов, допускающих ожидание
group = asyncio.gather(coro1(), coro2())

После того, как объект Future будет создан, его выполнение в цикле событий планируется автоматически.

Объект, допускающий ожидание, представляющий группу объектов, и все объекты в группе, будут выполнены, как только представится такая возможность.

Это значит, что если вызывающая сторона больше ничего не делает — группа объектов, допускающих ожидание, выполнение которых запланировано, будет запущена (исходя из предположения о том, что вызывающая сторона приостанавливает своё выполнение).

Это, кроме того, значит и то, что нам не нужно ожидать выполнения объекта Future, возвращённого функцией gather():

...
получение объекта Future, который представляет несколько объектов, допускающих ожидание
group = asyncio.gather(coro1(), coro2())
приостановка работы и ожидание в течение некоторого времени, чтобы дать группе объектов возможность выполниться
await asyncio.sleep(10)

Можно подождать выполнения возвращённого объекта Future, что приведёт к ожиданию завершения выполнения всех объектов в группе:

...
выполнение группы объектов, допускающих ожидание
await group

После ожидания объекта Future, возвращённого функцией gather(), в нашем распоряжении окажется список значений, возвращаемых объектами, допускающими ожидание.

Если эти объекты не возвращают значения, в списке окажется возвращаемое значение, используемое по умолчанию — None.

Например:

...
выполнение группы объектов, допускающих ожидание, и получение возвращаемых значений
results = await group

Чаще вышеописанные операции оформляют в виде одной строки кода:

...
однострочный вариант запуска задач и получения результатов
results = await asyncio.gather(coro1(), coro2())

10.3. Пример использования функции gather() при работе со списком из нескольких корутин

При работе с группами корутин распространён такой подход: заранее создают несколько корутин, а позже передают их функции gather().

Это позволяет программе подготовить задачи, которые нужно выполнить в конкурентном режиме, а затем инициировать их конкурентное выполнение и подождать их завершения.

Несколько корутин можно собрать в единый список или вручную, или используя механизм спискового включения:

...
создание нескольких корутин
coros = [task_coro(i) for i in range(10)]

После этого можно воспользоваться функцией gather(), обработав с её помощью все корутины из списка.

Список, как уже говорилось, нельзя напрямую передать функции gather(), так как это приведёт к возникновению ошибки.

Вместо этого данная функция нуждается в том, чтобы каждый объект, допускающий ожидание, был бы передан ей в качестве отдельного позиционного аргумента.

Добиться этого можно, распаковав список, передаваемый функции. Сделать это можно с помощью оператора «звёздочка» — *, который выполнит все необходимые действия:

...
выполнение задач
await asyncio.gather(*coros)

Если собрать всё это вместе, то у нас получится пример, демонстрирующий запуск списка заранее подготовленных корутин с использованием функции gather():

# SuperFastPython.com
# пример запуска с помощью gather() множества корутин, находящихся в списке
import asyncio
 
# корутина, используемая для создания задач
async def task_coro(value):
    # вывод сообщения
    print(f'>task {value} executing')
    # приостановка работы на некоторое время
    await asyncio.sleep(1)
 
# корутина, используемая в роли точки входа в программу
async def main():
    # вывод сообщения
    print('main starting')
    # создание множества корутин
    coros = [task_coro(i) for i in range(10)]
    # выполнение задач
    await asyncio.gather(*coros)
    # вывод сообщения
    print('main done')
 
# запуск asyncio-программы
asyncio.run(main())

При выполнении этого примера запускается корутина main(), которая является точкой входа в программу. Затем эта корутина создаёт список из 10 объектов корутин, используя механизм спискового включения.

Этот список передаётся функции gather(), его, с помощью оператора «звёздочка», распаковывают, представляя в виде 10 отдельных значений.

Корутина main() ожидает выполнения объекта Future, возвращённого после вызова функции gather(). Выполнение программы приостанавливается, она ждёт завершения выполнения всех запланированных корутин.

Корутины выполняются в тот момент, когда у них появляется такая возможность. Каждая из них выводит уникальное сообщение и, перед завершением работы, «засыпает».

Только после того, как все корутины из группы завершатся, корутина main() возобновляет работу и выводит итоговое сообщение.

Этот пример демонстрирует подготовку коллекции корутин и передачу их, в виде отдельных аргументов, функции gather().

main starting
>task 0 executing
>task 1 executing
>task 2 executing
>task 3 executing
>task 4 executing
>task 5 executing
>task 6 executing
>task 7 executing
>task 8 executing
>task 9 executing
main done

Подробности о функции gather() можно найти здесь.

Теперь поговорим о том, как организовать ожидание выполнения некоего условия для группы asyncio-задач.

11. Ожидание выполнения некоего условия для коллекции задач

Функция asyncio.wait() позволяет организовать ожидание выполнения некоего условия для нескольких asyncio-задач.

При ожидании можно ориентироваться на различные условия. Это может быть завершение выполнения всех задач, завершение выполнения одной из задач, аварийное завершение одной из задач с выдачей исключения.

Познакомимся с этой функцией поближе.

11.1. Функция asyncio.wait()

Функция asyncio.wait() может использоваться для организации ожидания завершения коллекции asyncio-задач.

Вспомните о том, что asyncio-задача — это экземпляр класса asyncio.Task, который применяется для оборачивания корутин. Это позволяет независимо планировать выполнение корутин и выполнять их. А экземпляр класса Task даёт нам средства для работы с задачей — для запрашивания сведений о её состоянии и для получения результатов её работы.

Подробности об asyncio-задачах можно найти здесь.

Поговорим об использовании функции asyncio.wait().

11.2. Как пользоваться функцией asyncio.wait()

Функция asyncio.wait() принимает коллекции объектов, допускающих ожидание. Обычно это — объекты Task.

Коллекция может быть представлена списком, словарём или множеством, которые содержат заранее созданные объекты задач. Создать их можно, например, пользуясь функцией asyncio.create_task() и механизмом спискового включения:

...
создание множества задач
tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]

Функция asyncio.wait() не завершит работу до тех пор, пока в применении к переданной ей коллекции задач не будет выполнено некое условие.

По умолчанию это условие заключается в том, чтобы выполнение всех задач было бы завершено.

Функция wait() возвращает кортеж, включающий в себя два множества. Первое множество содержит все объекты задач, соответствующие условию. Второе содержит все остальные объекты задач, то есть те, которые условию не соответствуют.

Эти множества называют множеством завершённых (done) задач и множеством незавершённых (pending) задач:

...
ожидание завершения всех задач
done, pending = await asyncio.wait(tasks)

Функция asyncio.wait() — это, с технической точки зрения — функция корутины, которая возвращает корутину.

Это позволяет организовать ожидание корутины с последующим получением кортежа множеств:

...
создание корутины, выполнения которой нужно дождаться
wait_coro = asyncio.wait(tasks)
ожидание выполнения корутины с помощью await
tuple = await wait_coro

Условие, которое ожидает функция, можно задать с помощью аргумента return_when, который по умолчанию установлен в значение asyncio.ALL_COMPLETED. Это значение можно передать функции и в явном виде:

...
ожидание завершения всех задач
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

Используя функцию asyncio.wait() можно настроить её и так, чтобы она завершалась бы при завершении одной из переданных ей задач. Делается это путём установки аргумента return_when в значение FIRST_COMPLETED:

...
ожидание завершения одной из задач
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

Первая завершённая задача при таком подходе попадает в группу завершённых задач, а выполнение остальных задач при этом не отменяется, они продолжают выполняться в конкурентном режиме.

Функция asyncio.wait() позволяет дождаться первого сбоя задачи с выдачей исключения. Это условие задаётся путём передачи этой функции аргумента return_when со значением FIRST_EXCEPTION:

...
ожидание первой сбойной задачи
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

В данном случае множество завершённых задач будет содержать первую задачу, которая дала сбой и вызвала исключение. Если же сбой не дала ни одна задача — множество завершённых задач будет включать в себя все задачи, а возврат из wait() произойдёт только после завершения работы всех задач.

Указать то, как долго мы хотим ждать выполнения заданного условия, можно с помощью аргумента timeout, который содержит тайм-аут, выраженный в секундах.

Если заданный промежуток времени истечёт до выполнения условия, возвращается кортеж задач, содержащий подмножество задач, соответствующих условию на момент возврата. Например — это может быть подмножество завершённых задач в том случае, если условием ожидания является завершение всех задач.

Например:

...
ожидание завершения всех задач с тайм-аутом
done, pending = await asyncio.wait(tasks, timeout=3)

Если время выйдет до того, как будет выполнено условие, какого-либо исключения выдано не будет, а задачи, не соответствующие условию, не будут отменены.

Теперь, когда мы немного разобрались с функцией asyncio.wait(), посмотрим на рабочий пример её применения.

11.3. Пример организации ожидания выполнения некоего условия для нескольких задач

Исследуем ожидание завершения выполнения нескольких задач с использованием функции asyncio.wait().

В этом примере мы определим простую корутину, которая будет использоваться для создания задач. Эта корутина генерирует случайное значение, ненадолго приостанавливается и выводит сообщение со сгенерированным ей значением.

Главная корутина создаёт на основе этой корутины несколько задач с использованием механизма спискового включения. После этого организуется ожидание выполнения всех этих задач.

Вот полный код примера:

# SuperFastPython.com
# пример ожидания завершения всех задач
from random import random
import asyncio
 
# корутина, которая будет выполняться в новой задаче
async def task_coro(arg):
    # генерирование случайного значения в диапазоне между 0 и 1
    value = random()
    # блокировка на некоторое время
    await asyncio.sleep(value)
    # вывод значения
    print(f'>task {arg} done with {value}')
 
# главная корутина
async def main():
    # создание нескольких задач
    tasks = [asyncio.create_task(task_coro(i)) for i in range(10)]
    # ожидание завершения выполнения всех задач
    done,pending = await asyncio.wait(tasks)
    # вывод результатов
    print('All done')
 
# запуск asyncio-программы
asyncio.run(main())

При запуске этого примера сначала создаётся корутина main(), которая используется как точка входа в asyncio-программу.

Затем эта корутина, пользуясь механизмом спискового включения, создаёт список из 10 задач. Каждой из этих задач передаётся уникальный аргумент от 0 до 9.

Потом корутина main() приостанавливается и ожидает завершения выполнения всех задач.

Задачи выполняются. Каждая из них генерирует случайное значение, на некоторое время «засыпает», а после этого выводит сгенерированное значение.

После того, как все задачи завершатся, корутина main() возобновляет работу и выводит итоговое сообщение.

В этом примере показано то, как функцию wait() можно использоваться для организации ожидания завершения коллекции задач.

Это, вероятно, тот самый сценарий её использования, который встречается чаще всего.

Обратите внимание на то, что результаты выполнения этого кода каждый раз будут разными. Это так из-за того, что в нём используется генератор случайных чисел.

>task 5 done with 0.0591009105682192
>task 8 done with 0.10453715687017351
>task 0 done with 0.15462838864295925
>task 6 done with 0.4103492027393125
>task 9 done with 0.45567100006991623
>task 2 done with 0.6984682905809402
>task 7 done with 0.7785363531316224
>task 3 done with 0.827386088873161
>task 4 done with 0.9481344994700972
>task 1 done with 0.9577302665040541
All done

Вот — учебное руководство, в котором можно узнать подробности о функции wait().

Наша следующая тема — организация ожидания выполнения отдельной корутины с указанием лимита времени.

12. Ожидание выполнения корутины с указанием лимита времени

Можно организовать ожидание завершения выполнения asyncio-задачи или корутины с указанием лимита времени. Делается это с помощью функции asyncio.wait_for().

Если время истечёт до завершения задачи — она отменяется.

12.1 Функция asyncio.wait_for()

Функция asyncio.wait_for() позволяет вызывающей стороне ожидать завершения выполнения asyncio-задачи или корутины с указанием тайм-аута.

Если тайм-аут не задан — эта функция будет ожидать завершения задачи.

Если тайм-аут задан и время истекло до завершения выполнения задачи — задача отменяется.

coroutine asyncio.wait_for(aw, timeout)

Ожидает завершения выполнения объекта aw, допускающего ожидание, учитывая заданный тайм-аут timeout.

Coroutines and Tasks

Эта функция позволяет вызывающей стороне и формулировать свои ожидания относительно того, как долго она готова ждать завершения задачи, и принудительно завершать задачу в том случае, если вышло время, отведённое на её выполнение.

Теперь, когда мы уже кое-что знаем о функции asyncio.wait_for() — поговорим о том, как ей пользоваться.

12.2. Как пользоваться функцией asyncio.wait_for()

Функция asyncio.wait_for() принимает объект, допускающий ожидание, и тайм-аут.

Объект может быть представлен корутиной или задачей.

Значение, представляющее тайм-аут, обязательно должно быть передано функции. Если временное ограничение не задаётся — её аргумент timeout нужно установить в значение None. Если ограничение задаётся — это может быть целое число или число с плавающей запятой, представляющее количество секунд.

Функция wait_for() возвращает корутину, которая не выполняется до тех пор, пока вызывающая сторона не использует, явным образом, выражение await для организации ожидания её выполнения, или до тех пор, пока её выполнение не будет запланировано в виде задачи.

Например:

...
ожидание завершения задачи
await asyncio.wait_for(coro, timeout=10)

Если этой функции предали корутину — она, при выполнении корутины wait_for(), будет преобразована в задачу.

Если тайм-аут истечёт до завершения задачи — задача отменяется, вызывается исключение asyncio.TimeoutError. Его может понадобиться обработать.

Например:

...
выполнение задачи с тайм-аутом
try:
    # ожидание выполнения задачи
    await asyncio.wait_for(coro, timeout=1)
except asyncio.TimeoutError:
    # ...

Если задача, выполнения которой мы ожидаем, аварийно завершит работу, выдав необработанное исключение, это исключение будет передано вызывающей стороне, ожидающей завершения работы корутины wait_for(). Это исключение может нуждаться в обработке:

...
выполнение задачи, которая может дать сбой
try:
    # ожидание завершения задачи
    await asyncio.wait_for(coro, timeout=1)
except asyncio.TimeoutError:
    # ...
except Exception:
    # ...

Рассмотрим пример использования функции asyncio.wait_for() с тайм-аутом

12.3. Пример использования функции asyncio.wait_for() с тайм-аутом

Исследуем пример использования функции asyncio.wait_for() с тайм-аутом в ситуации, когда время истекает до завершения работы задачи.

Здесь мы используем корутину, очень похожую на ту, которая использовалась в предыдущем подобном примере. Но в этом примере вызывающая сторона даёт корутине на выполнение лишь 0,2 секунды — или 200 миллисекунд. Напомню, что 1 секунда равна 1000 миллисекундам.

Корутина, используемая для создания задачи, в сравнении с предыдущим примером, изменена. Благодаря этому она «засыпает» на время, превышающее одну секунду. Это обеспечивает то, что тайм-аут всегда истекает до завершения задачи.

Вот код примера:

# SuperFastPython.com
# пример ожидания выполнения корутины с тайм-аутом
from random import random
import asyncio
 
# корутина, которая будет выполняться в новой задаче
async def task_coro(arg):
    # генерирование случайного значения в диапазоне между 1 и 2
    value = 1 + random()
    # вывод сообщения
    print(f'>task got {value}')
    # блокировка на некоторое время
    await asyncio.sleep(value)
    # вывод сообщения о завершении работы
    print('>task done')
 
# главная корутина
async def main():
    # создание задачи
    task = task_coro(1)
    # запуск задачи и ожидание её завершения с тайм-аутом
    try:
        await asyncio.wait_for(task, timeout=0.2)
    except asyncio.TimeoutError:
        print('Gave up waiting, task canceled')
 
# запуск asyncio-программы
asyncio.run(main())

При запуске этого кода сначала создаётся корутина main(), являющаяся точкой входа в asyncio-программу.

Корутина main() создаёт задачу на основе корутины. Затем она взывает функцию wait_for(), передаёт ей задачу и задаёт тайм-аут в 0,2 секунды.

Главная корутина приостанавливается и выполняется задача task_coro(). Она выводит сообщение и ненадолго «засыпает».

Корутина main() возобновляет работу после того, как истечёт тайм-аут. Корутина wait_for() отменяет выполнение корутины task_coro(), а корутина main() приостанавливается.

Задача task_coro() снова запускается и отвечает на запрос об остановке. Она вызывает исключение TimeoutError и завершает работу.

Корутина main() возобновляет работу и обрабатывает исключение TimeoutError, вызванное задачей task_coro().

Этот пример демонстрирует использование функции wait_for() с тайм-аутом, и то, как отменяются задачи, на выполнение которых не хватает отведённого им времени.

Выходные данные этой программы будут разными при каждом её запуске из-за того, что в ней используется генератор случайных чисел.

>task got 0.685375224799321
Gave up waiting, task canceled

Подробности о функции wait_for() можно найти здесь.

Поговорим теперь о том, как защищать asyncio-задачи от отмены их выполнения.

13. Защита задач от отмены их выполнения

Выполнение asyncio-задачи можно отменить, вызывав её метод cancel().

Задачу можно защитить от отмены, обернув её в вызов функции asyncio.shield().

Поговорим об этой функции.

13.1. Функция asyncio.shield()

Функция asyncio.shield() оборачивает объект, допускающий ожидание, в объект Future, который будет принимать запросы на отмену выполнения обёрнутого в него объекта.

Защищает объект, допускающий ожидание, от отмены его выполнения.

Coroutines and Tasks

Это означает, что такой объект Future можно передавать задачам, которые могут попытаться отменить его выполнение. При этом запросы на отмену будут выглядеть так, будто они завершились успешно, но при этом защищённая задача или корутина продолжит выполняться.

Это может оказаться полезным в asyncio-программах, в которых некоторые задачи допускается отменять, а некоторые, возможно, с более высоким приоритетом, отменять не допускается.

Ещё это может пригодиться в программах, в которых некоторые задачи можно безопасно отменить, а некоторые — нет. Например — задачи, которые спроектированы с учётом особенностей модуля asyncio, можно отменять. А некоторые другие задачи, не поддерживающие безопасную досрочную отмену их выполнения, нужно защищать.

Теперь, когда мы, в общих чертах, разобрались с asyncio.shield(), поговорим об использовании этой функции.

13.2. Как пользоваться функцией asyncio.shield()

Функция asyncio.shield() защищает от отмены объекты Task или корутины.

Она принимает объект, допускающий ожидание, и возвращает объект asyncio.Future.

Дождаться завершения работы этого объекта можно, воспользовавшись ключевым словом await. Его можно и передавать другим задачам или корутинам:

...
защита задачи от отмены
shielded = asyncio.shield(task)
ожидание завершения защищённой задачи
await shielded

Отменить выполнение возвращённого функцией объекта Future можно, вызвав его метод cancel(). Защищённая задача при этом продолжит выполняться, но запрос на отмену выполнения будет выглядеть так, будто он завершился успешно:

...
попытка отмены защищённой задачи
was_canceld = shielded.cancel()

Любые корутины, ожидающие выполнения объекта Future, выдадут исключение asyncio.CancelledError, которое может нуждаться в обработке:

...
try:
    # ожидание завершения защищённой задачи
    await asyncio.shield(task)
except asyncio.CancelledError:
    # ...

Тут важно помнить о том, что запрос на отмену, выполненный для объекта Future, не передаётся заключённой в него задаче.

Это значит, что подобный запрос как бы «поглощается» защитным объектом.

Например:

...
создание задачи
task = asyncio.create_task(coro())
создание объекта, включающего в себя задачу и защищающего её от отмены
shield = asyncio.shield(task)
отмена выполнения защитного объекта (задача при этом не отменяется)
shield.cancel()

Если функции asyncio.shield() передаётся корутина — она оборачивается в объект asyncio.Task() и планируется её немедленное выполнение.

Это значит, что для того, чтобы внутренняя корутина могла бы запуститься, нет нужды ожидать выполнения защитного объекта Future с помощью await.

awaitable asyncio.shield(aw)

Если aw — это корутина, то автоматически планируется её выполнение в виде объекта Task.

Coroutines and Tasks

Если защищённая задача отменяется — то запрос на её отмену распространяется и на защитный объект, выполнение которого тоже отменяется:

...
создание задачи
task = asyncio.create_task(coro())
создание защитного объекта
shield = asyncio.shield(task)
отмена задачи (это отменяет и защитный объект)
task.cancel()

Теперь, когда мы знаем о том, как пользоваться функцией asyncio.shield() — рассмотрим пример.

13.3. Пример использования функции asyncio.shield() для защиты задачи

Исследуем возможности функции asyncio.shield()по защите задач от отмены их выполнения.

В этом примере мы определяем простую корутину, которая принимает целочисленный аргумент, на секунду «засыпает», а потом возвращает этот аргумент. Эта корутина будет использоваться для создания задачи, выполнение которой можно запланировать.

Здесь мы определим и вторую корутину, которая принимает задачу, ненадолго «засыпает», а потом отменяет задачу.

В главной корутине мы защитим первую задачу и передадим её второй задаче, а потом дождёмся завершения работы защищённой задачи.

Мы ожидаем, что отмена выполнения коснётся лишь защитного объекта, а находящаяся у него внутри задача не пострадает. Отмена задачи нарушит работу главной корутины. Мы можем проверить состояние защищаемой задачи в конце программы. Ожидается, что её работа завершится нормально, что на неё не повлияет запрос на отмену выполнения, сделанный для защитного объекта.

Вот полный код примера:

# SuperFastPython.com
# пример использования функции asyncio.shield() для защиты задачи от отмены
import asyncio
 
# определение простой корутины
async def simple_task(number):
    # блокировка на некоторое время
    await asyncio.sleep(1)
    # возврат аргумента
    return number
 
# корутина, отменяющая переданную ей задачу через некоторое время
async def cancel_task(task):
    # блокировка на некоторое время
    await asyncio.sleep(0.2)
    # отмена задачи
    was_cancelled = task.cancel()
    print(f'cancelled: {was_cancelled}')
 
# главная корутина
async def main():
    # создание корутины
    coro = simple_task(1)
    # создание задачи
    task = asyncio.create_task(coro)
    # создание защищённой задачи
    shielded = asyncio.shield(task)
    # создание задачи, отменяющей переданную ей задачу
    asyncio.create_task(cancel_task(shielded))
    # обработка отмены
    try:
        # ожидание завершения работы защищённой задачи
        result = await shielded
        # вывод сведений о полученных результатах
        print(f'>got: {result}')
    except asyncio.CancelledError:
        print('shielded was cancelled')
    # ожидание
    await asyncio.sleep(1)
    # вывод сведений о задачах
    print(f'shielded: {shielded}')
    print(f'task: {task}')
 
# запуск программы
asyncio.run(main())

Здесь сначала создаётся корутина main(), которая представляет собой точку входа в программу.

Затем создаётся корутина, на основе которой создаётся объект Task, который планируется на выполнение. Эта задача защищается от отмены.

Защищённая задача передаётся корутине cancel_task(). Она так же оборачивается в задачу, которая планируется на выполнение.

После этого главная корутина ждёт завершения работы защищённой задачи, ожидая появления исключения CancelledError.

Задача некоторое время работает, после чего «засыпает». Задача, которая отменяет другую задачу, запускается, на некоторое время «засыпает», а потом отменяет защищённую задачу. Запрос на отмену задачи завершается успешно.

Всё это вызывает исключение CancelledError в защитном объекте Future, но не в защищаемой им задаче.

Корутина main() возобновляет работу и реагирует на исключение CancelledError, выводя сообщение. После этого она «засыпает» на более длительное время.

Потом защищённая задача возобновляет работу, она нормально завершается и возвращает значение.

В итоге корутина main() возобновляет работу и выводит сведения о состоянии защитного объекта Future и защищённой им задачи. В выводе программы, представленном ниже, можно видеть, что защитный объект отменён, а включённая в него задача нормально завершила работу и возвратила значение.

cancelled: True
shielded was cancelled
shielded: <Future cancelled>
task: <Task finished name='Task-2' coro=<simple_task() done, defined at ...> result=1>

Этот пример демонстрирует применение функции asyncio.shield() для защиты задач от отмены их выполнения.

Подробнее этот вопрос освещён здесь.

Наша следующая тема посвящена запуску блокирующих задач в asyncio-программах.

О, а приходите к нам работать? ???? ????

Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.

Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.

Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.

Присоединяйтесь к нашей команде.

Комментарии (0)