Автор оригинала: Jacob Padilla

Сейчас asyncio — одна из самых модных тем в Python, и это справедливо — это отличный способ обработки программ, связанных с вводом-выводом! Когда я изучал asyncio, мне потребовалось некоторое время, чтобы понять, как это на самом деле работает. Но позже я узнал, что это по сути просто очень хороший слой поверх генераторов Python. В этой статье я собираюсь создать упрощенную версию asyncio, используя только генераторы Python. Затем я собираюсь реорганизовать пример, чтобы использовать ключевые слова async и await с помощью dunder-метода __await__, прежде чем замкнуть круг и заменить свою версию на настоящий asyncio. Надеюсь, создав простую версию asyncio в конце этой статьи, вы сможете лучше понять, как он творит свою магию!

Обзор генераторов

Если вы уже знакомы с генераторами, пропустите эту часть, но если нет, то именно на них построен asyncio, поэтому очень важно понимать, как они работают.

Прежде всего, причина, по которой генераторы существуют, заключается в том, что они позволяют сделать ваш код более эффективным с точки зрения памяти. Представьте, что у вас есть следующий цикл:

for i in range(100_000_000):
    print(i)

Если бы range был не генератором, а функцией, возвращающей список для последующего его перебора, код, подобный вышеприведенному примеру, был бы очень неэффективным в плане использования памяти, поскольку вы бы создали список из 100 миллионов элементов... Однако, поскольку range это генератор, по крайней мере в Python 3+, вы генерируете числа только по мере необходимости, одно за другим, не сохраняя всю последовательность в памяти.

Есть несколько способов создания генераторов, но мы сосредоточимся на функциях генераторов. Эти генераторы определяются как любые другие функции, но используют для возврата данных оператор yield. Этот оператор превращает обычную функцию в генератор, которая вместо того, чтобы выполняться сразу, может приостанавливать и возобновлять свое состояние при вызове next(iterator).

Возьмем, к примеру, следующую функцию-генератор:

def generator():
   yield 'hello'
   yield 'world'
iterator = generator()


Когда вы вызываете генератор, Python видит ключевое слово yield и вместо того, чтобы запустить код внутри функции, как он это обычно делает — возвращает объект генератора. Как только у нас есть объект генератора, мы можем вызвать next(iterator) и он запустит код функции до первого/следующего оператора  yield:

print(next(iterator))  # Output: hello
print(next(iterator))  # Output: world

Если мы попробуем вызвать next(iterator) еще раз, генератор вызовет исключение  StopIteration, поскольку в функции генератора больше нет операторов yield.

Еще одна интересная функция генераторов Python — это yield from — возможность из генератора вызывать подгенератор или итерируемый объект, что позволяет вам создавать цепочку генераторов!

def generator():
   yield 'hello'
def another_generator():
   yield from generator()
iterable = another_generator()
print(next(iterable))  # Output: hello


Генераторы — это нечто большее, чем я говорю, например, генераторные включения, которые похожи на списочные включения, но создаются с круглыми скобками вместо квадратных, и возможность отправлять данные генераторам с помощью  iterator.send(value). Однако для данной статьи важно помнить о генераторах, что они позволяют запускать и останавливать функцию, сохраняя ее состояние!

Цикл событий

Цикл событий, который отвечает за запуск и управление всеми текущими задачами, является ядром asyncio и первое, что мы воссоздадим с помощью генераторов. Хотя цикл событий asyncio написан на языке C, его проще всего представить в виде списка, содержащего все текущие задачи. Пока что представьте эти задачи просто как объекты-генераторы. Менеджер цикла событий будет проходить по каждой задаче в списке и использовать функцию  next(task) для запуска каждой из них. Затем эта задача будет запущена, и когда она будет выполнять работу, связанную с вводом-выводом, например, спать, она будет использовать ключевое слово  yield чтобы приостановить свое выполнение и вернуть управление этому циклу событий, который затем перейдет к следующей задаче в цикле.

Вот пример этого — у нас есть две задачи, обе печатают свой номер и затем приостанавливают свое выполнение. Поскольку менеджер цикла событий — это тот, кто вызывает next(), после того как задача приостановлена, он получает управление обратно и затем переходит к выполнению следующей задачи в цикле.

def task1():
   while True:
       print('Task 1')
       yield
def task2():
   while True:
       print('Task 2')
       yield
event_loop = [task1(), task2()]
while True:
   for task in event_loop:
       next(task)


Впоследствии вывод этого кода будет выглядеть следующим образом и будет продолжаться до бесконечности, поскольку обе функции генератора никогда не завершатся из-за циклов while True.

Task 1
Task 2
Task 1
Task 2
…

Sleep

Если мы возьмем тот же код сверху, мы можем добавить подгенераторы к нашим задачам с помощью yield from. Ниже я добавил спящий генератор, который приостановит выполнение задач, пока не истечет указанное время. Это работает, потому что sleepбудет продолжать выдавать результат, пока не пройдет определенное количество секунд, после чего он выйдет из цикла while. Поскольку в sleep больше нет операторов   yield,  возникает исключение StopIteration, которое сигнализирует оператору yield from в функциях задач о необходимости перейти к следующей строке кода.

import time
def sleep(seconds):
   start_time = time.time()
   while time.time() - start_time < seconds:
       yield
def task1():
   while True:
       print('Task 1')
       yield from sleep(1)
def task2():
   while True:
       print('Task 2')
       yield from sleep(5)
event_loop = [task1(), task2()]
while True:
   for task in event_loop:
       next(task)

Выход:

Task 1
Task 2
Task 1
Task 1
Task 1
Task 1
Task 2
Task 1
…

От yield к await

Теперь мы можем взять приведенный выше код и перейти от использования yield к await с помощью dunder-метода  __await__ и ключевого слова async. Когда класс имеет метод __await__, мы можем использовать ключевое слово  await перед экземпляром класса, чтобы вызвать его. В asyncio вы обычно работаете с  объектами Task через функцию, например asyncio.create_task. Эти  объекты Task наследуются от объекта asyncio Future, у которого есть метод __await__ . Мы также можем использовать await перед корутиной, которая является объектом, созданным при вызове функции с  ключевым словом async перед ней. Корутины похожи на функции-генераторы в том смысле, что выполнение корутины также можно приостанавливать и возобновлять.

Можете думать о  ключевом слове await как о простом синониме yield from с некоторыми дополнительными правилами проверки. Таким образом, при написании кода await object вы в основном говорите либо yield из метода __await__ в экземпляре класса «object», ИЛИ «object» может быть другой корутиной (вроде подгенератора).

На самом деле вы можете взглянуть на исходный код Asyncio и увидеть, что метод __await__ внутри  объекта Future по сути просто вызывает yield если Future (или Task) не завершено:


Исходный код Asyncio. Метод __await__ в объекте Future
Исходный код Asyncio. Метод __await__ в объекте Future

Чтобы перенести код, который мы написали в разделе выше, на использование async и await, нам сначала нужно создать свой собственный  класс Task, поскольку функция не может иметь dunder-метод __await__. Ниже приведена простая версия, которую я придумал:

from queue import Queue
event_loop = Queue()
class Task():
    def init(self, generator):
        self.iter = generator
        self.finished = False
    def done(self):
        return self.finished
    def await(self):
        while not self.finished:
            yield self
def create_task(generator):
    task = Task(generator)
    event_loop.put(task)
    return task


На этот раз вместо использования списка Python для создания цикла событий мы используем очередь, что имеет немного более осмысленно, поскольку мы хотим иметь возможность добавлять и удалять задачи из цикла за постоянное время.

Для нашего  класса Task мы сохраняем объект генератора в self.iter а также устанавливаем self.finished в False, который будет отслеживать, закончил ли генератор работу (он завершает работу, когда вызывается StopIteration). У нашего  объекта Task также есть dunder-метод __await__, который будет просто продолжать возвращать управление циклу событий, пока задача не будет завершена. Наконец, после создания объекта Task с помощью  вспомогательной функции create_task мы добавляем его в цикл событий, который планирует его запуск.

Теперь давайте создадим менеджер цикла событий, который будет запускать задачи:

def run(main):
    event_loop.put(Task(main))
    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)


Можете заметить, что это начинает имитировать фактический API asyncio, поскольку для запуска цикла событий нам нужно вызвать  начальную функцию run. Функция сначала оборачивает основную функцию в  объект Task и добавляет ее в цикл событий. Затем будет запущен цикл  While, и для каждого прогона будем получать через очередь
следующую задачу для запуска. Вместо использования next(task.iter) теперь нам нужно использовать task.iter.send(None), что является просто странной особенностью работы с ключевыми словами async/await, но делает то же самое. Также мы хотим обернуть этот вызов в блок try-except, поскольку если StopIteration выбрасывается исключение, мы можем установить task.finished в True, но если никаких исключений не возникает,
код перейдет к оператору else, который добавляет задачу обратно в цикл событий для повторного запуска.

Далее нам нужно сделать функцию sleep асинхронно-совместимой. Раньше мы использовали функцию-генератор с циклом while и одну yield для управления засыпанием. Мне нравится этот подход, но вы не можете использовать  ключевое слово await в сочетании с функцией-генератором — это должен быть объект с dunder-методом __await__ или функция корутины. Поэтому, чтобы решить эту проблему, я переместил код в другую функцию, и теперь фактическая  функция sleep создает объект задачи, а затем ожидает его. Этот await вызывает метод  __await__ внутри объекта Task, который затем выдаст результат, позволяя циклу событий перейти к другой задаче. Когда цикл событий доберется до новой задачи _sleep, он проверит время, и если прошло недостаточно времени, также вызовет  yield чтобы вернуть управление циклу событий. Если спящая задача снова вызывается через цикл событий, подобно тому, как генератор сохраняет свое состояние, корутина все равно будет ожидать  возврата sleep, и поскольку sleep все еще будет ожидать завершения задачи _sleep, dunder-метод задачи __await__ снова будет вызван, а поскольку задача не завершена, будет вызван метод yield в dunder-методе.

import time
def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield
async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task


Вот весь код вместе:

from queue import Queue
import time


event_loop = Queue()


def _sleep(seconds):
    start_time = time.time()
    while time.time() - start_time < seconds:
        yield


async def sleep(seconds):
    task = create_task(_sleep(seconds))
    return await task


class Task():
    def __init__(self, generator):
        self.iter = generator
        self.finished = False

    def done(self):
        return self.finished

    def __await__(self):
        while not self.finished:
            yield self


def create_task(generator):
    task = Task(generator)
    event_loop.put(task)

    return task


def run(main):
    event_loop.put(Task(main))

    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.iter.send(None)
        except StopIteration:
            task.finished = True
        else:
            event_loop.put(task)

Теперь, когда мы создали цикл событий, способ создания задач и функцию sleep, мы можем импортировать файл (называемый «jacobio.py») и взять код из прошлой версии, когда мы использовали yield-ы. Заменим все операторы yield from на await, добавим ключевое слово async к функциям с await, чтобы обозначить, что эти функции могут быть ожидающими, а затем создать основную функцию, как в asyncio, чтобы добавить задачи в цикл событий:

import jacobio

async def task1():
    for _ in range(2):
        print('Task 1')
        await jacobio.sleep(1)

async def task2():
    for _ in range(3):
        print('Task 2')
        await jacobio.sleep(0)

async def main():
    one = jacobio.create_task(task1())
    two = jacobio.create_task(task2())

    await one
    await two
    
    print('done')


if __name__ == '__main__':
    jacobio.run(main())

Выход:

Task 1
Task 2
Task 2
Task 2
Task 1
done

Ожидание с AsyncIO

Теперь мы можем взять наш код выше и заменить все вхождения «jacobio» на «asyncio», и теперь мы полностью используем пакет asyncio!

import asyncio
async def task1():
    for _ in range(2):
        print('Task 1')
        await asyncio.sleep(1)
async def task2():
    for _ in range(3):
        print('Task 2')
        await asyncio.sleep(0)
async def main():
    one = asyncio.create_task(task1())
    two = asyncio.create_task(task2())
    await one
    await two
    
    print('done')
if name == 'main':
    asyncio.run(main())


За кулисами Asyncio делает гораздо больше, но мы смогли перейти от базовых генераторов
к воссозданию основных частей asyncio с нуля! Я попытался сделать менеджер цикла событий максимально простым, и хотя это основная идея asyncio, учитывая масштаб и сложность фактического пакета, моя реализация немного отличается от фактического исходного кода. Кроме того, теперь, когда у нас есть вся мощь настоящего пакета asyncio, нам не нужно создавать две задачи только для ожидания обеих; вместо этого мы можем использовать функцию, например, asyncio.gather() для обработки нескольких задач. Если вам интересно узнать обо всех способах управления задачами в asyncio,
ознакомьтесь с моей статьей об обработке задач asyncio как профессионал!

Комментарии (1)


  1. atues
    23.07.2024 07:56
    +1