Привет, Хабр!

Библиотека asyncio на Питоне - это база для создания масштабируемых асинхронных приложений. Однако, даже при всем разнообразии возможностей, которые предоставляет asyncio, иногда возникает необходимость выйти за рамки стандартных методов и создать что-то свое, специально подходящее для конкретного проекта или задачи.

Для этого можно создавать собственные awaitables и кастомные циклы событий.

Awaitables

Awaitable объекты представляют собой сущности, которые могут быть использованы в выражениях await для ожидания завершения асинхронных операций. В asyncio есть три основных типа awaitable объектов: корутины, футуры и таски.

  1. Корутины представляют собой функции, которые могут быть приостановлены и возобновлены во время выполнения, позволяя другим корутинам работать параллельно. Для определения корутины используется ключевое слово async def, а для вызова - ключевое слово await.

  2. Футуры представляют собой абстракцию асинхронного вычисления, которое может быть завершено в будущем. Они предоставляют интерфейс для ожидания завершения асинхронных операций и получения их результата.

  3. Таски являются специальным видом футур, представляющим собой исполняемые корутины. Они создаются с помощью функции asyncio.create_task() для запуска корутины в отдельном потоке выполнения.

Перейдем к созданию собственных awaitable's

Допустим, есть внешний сервис, который хочется использовать в асинхронном приложении. Для этого можно создать пользовательский awaitable объект, который ожидает событие от этого сервиса. Предположим, что также есть сервис, который генерирует событие, когда новые данные доступны для чтения. Можно создать свой собственный awaitable объект, который ожидает это событие:

import asyncio

class CustomAwaitable:
    def __init__(self):
        self._event = asyncio.Event()

    async def wait_for_event(self):
        await self._event.wait()

    def set_event(self):
        self._event.set()

async def main():
    custom_awaitable = CustomAwaitable()
    # Ждем события
    await custom_awaitable.wait_for_event()
    print("Событие произошло!")

asyncio.run(main())

Иногда требуется ожидать не одно событие, а несколько. Для этого можно создать пользовательский awaitable объект, который ожидает несколько событий и возвращает управление, как только все они произошли:

import asyncio

class MultiEventAwaitable:
    def __init__(self, num_events):
        self._num_events = num_events
        self._events = [asyncio.Event() for _ in range(num_events)]

    async def wait_for_events(self):
        await asyncio.gather(*self._events)

    def set_event(self, index):
        self._events[index].set()

async def main():
    multi_event_awaitable = MultiEventAwaitable(3)
    # устанавливаем события
    multi_event_awaitable.set_event(0)
    multi_event_awaitable.set_event(1)
    multi_event_awaitable.set_event(2)
    # Ждем всех событий
    await multi_event_awaitable.wait_for_events()
    print("Все события произошли!")

asyncio.run(main())

Предположим, есть необходимость взаимодействовать с внешним API, который предоставляет данные для асинхронного приложения. Можно создать пользовательский awaitable объект, который ожидает ответ от этого API и возвращает данные для дальнейшей обработки. Реализуем простой HTTP клиент, который будет использовать aiohttp библиотеку для отправки запросов к API и ожидания ответа:

import aiohttp

class ExternalAPIAwaitable:
    def __init__(self, url):
        self.url = url

    async def fetch_data(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as response:
                return await response.json()

async def main():
    api_awaitable = ExternalAPIAwaitable("https://api.example.com/data")
    data = await api_awaitable.fetch_data()
    print("Получены данные от внешнего API:", data)
    
asyncio.run(main())

ExternalAPIAwaitable представляет собой пользовательский awaitable объект, который ожидает ответ от внешнего API.

Циклы событий

Цикл событий представляет собой бесконечный цикл, который ожидает наступления асинхронных событий и вызывает соответствующие колбэки для их обработки. Он отслеживает различные задачи, ожидающие завершения операций ввода-вывода, и переключает контекст выполнения между ними при необходимости.

Цикл в основном состоит из трех основных компонентов:

  1. Задачи представляют асинхронные операции, которые должны быть выполнены в цикле событий. Каждая задача представляет собой корутину, которая может быть запущена в цикле событий.

  2. Цикл событий содержит очередь задач, в которой хранятся все задачи, ожидающие выполнения. При запуске цикла событий, он извлекает задачи из этой очереди и выполняет их.

  3. Каждый тип события имеет свой обработчик, который вызывается при возникновении события. Например, для событий ввода-вывода есть обработчики, которые срабатывают при завершении операций ввода-вывода.

Рассмотрим примеры создания таких кастомных циклов событий.

Создание кастомного цикла событий с использованием asyncio.AbstractEventLoop

import asyncio

class CustomEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._running = False

    def run_forever(self):
        self._running = True
        while self._running:
            # обработка событий, выполнение задач и т.д.
            pass

    def stop(self):
        self._running = False

# экземпляр кастомного цикла событий
loop = CustomEventLoop()

# добавление задач в кастомный цикл событий
async def task():
    print("Задача выполняется...")
    await asyncio.sleep(1)
    print("Задача выполнена!")

loop.call_soon(loop.create_task, task())

# запуск
loop.run_forever()

Создали кастомный цикл событий, унаследовавшись от asyncio.AbstractEventLoop. Метод run_forever() и метод stop() управляют запуском и остановкой цикла. Также добавили асинхронную задачу в цикл событий с помощью loop.create_task() и запустили цикл событий с помощью loop.run_forever().

asyncio.get_event_loop_policy() для кастомизации политики цикла событий

import asyncio

class CustomEventLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy):
    def _loop_factory(self):
        return CustomEventLoop()

# установка кастомной политики цикла событий
asyncio.set_event_loop_policy(CustomEventLoopPolicy())

# получение цикла событий
loop = asyncio.get_event_loop()

# добавление задачи в кастомный цикл событий
async def task():
    print("Задача выполняется...")
    await asyncio.sleep(1)
    print("Задача выполнена!")

loop.run_until_complete(task())

Создали кастомный класс CustomEventLoopPolicy, который наследуется от asyncio.events.BaseDefaultEventLoopPolicy для управления созданием кастомного цикла событий. Затем установили эту кастомную политику с помощью asyncio.set_event_loop_policy() и получили экземпляр цикла событий с помощью asyncio.get_event_loop(). Задача была добавлена в цикл событий и выполнена с помощью loop.run_until_complete().

Расширение стандартного цикла событий с помощью сабклассирования

import asyncio

class CustomEventLoop(asyncio.get_event_loop().__class__):
    async def run_custom_task(self):
        print("Запуск пользовательской задачи...")
        await asyncio.sleep(1)
        print("Пользовательская задача выполнена!")

# экземпляр кастомного цикла событий
loop = CustomEventLoop()

# запуск пользовательской задачи
async def main():
    await loop.run_custom_task()

# запуск основной асинхронной функции
loop.run_until_complete(main())

Создали кастомный цикл событий, расширив стандартный цикл событий asyncio.get_event_loop().__class__. Затем добавили пользовательскую задачу в этот цикл событий и запустили ее с помощью loop.run_until_complete().


В завершение хочу порекомендовать вам бесплатные вебинары курса Python Developer. Professional:

Комментарии (1)


  1. KazakovDenis
    27.04.2024 20:49
    +4

    То, что вы показали - это не "собственные Awaitables". Их асинхронные методы, которые вы используете - да, но не классы-владельцы. Что считается Awaitable описано в соответствующем PEP 492