Это продолжение цикла статей про asyncio. Предыдущая статья здесь.

Дорогие читатели! К сожалению, выход третьей статьи цикла про asyncio прямо-таки неприлично затянулся. Поверьте, на то были весьма веские причины, связанные с неожиданным поворотом моей личной карьеры. Тем приятнее было мне все это время наблюдать появление все новых и новых подписчиков, неожиданно высоко оценивших мое скромное творчество и не отчаявшихся дождаться его продолжения. Эту статью я посвящаю вам - верным и умеющим ждать.

7. Цикл событий и его друзья

Ну вот и пришла пора познакомиться с долгожданным циклом событий. Цикл событий — это ядро asyncio, своего рода шина данных, которая организует переключения между задачами, следит за событиями, обрабатывает исключения и выполняет кучу других низкоуровневых фоновых задач. Цикл событий создается неявным образом каждый раз при запуске asyncio.run. Разработчику не нужно беспокоиться о его создании вручную, как это делалось в старых версиях python.

Слово "loop" иногда сбивает с толку, заставляя думать о некоем бесконечном цикле. Легко убедиться, что никакого while True по умолчанию внутри asyncio.run нет:

Пример 7.1

import asyncio


async def main():
    print('Запускаем асинхронную задачу')
    task = asyncio.create_task(asyncio.sleep(2))
    await task

asyncio.run(main())
print('Вышли из цикла событий')

Цикл событий asyncio работает до тех пор, пока есть хотя бы одна незавершенная задача. Если все задачи завершились, выполнение цикла тоже завершается. "Убей тело и голова умрет".

Однако на практике нам часто приходится организовывать бесконечный цикл самостоятельно, чтобы не дать завершиться хотя бы одной задаче, а вместе с ней и циклу событий asyncio. Это может быть веб-сервер или CLI или что-то в этом духе.

Давайте попробуем написать асинхронный веб-сервер. Ну, конечно, не полноценный сервер, который имеется в асинхронных веб-фреймворках, таких, как aiohttp или FastAPI, а его простую модель. Наш "сервер" слушает (якобы) некий порт, в который стучится (якобы) клиент. После установления соединения сервер (якобы) получает от клиента сообщение.

Пример 7.2

import asyncio


async def get_message():
    await asyncio.sleep(2)  # имитация ожидания сообщения от клиента
    print('Привет сервер!')


async def listen_port():
    while True:
        await asyncio.sleep(5)  # имитация ожидания запроса на соединение от клиента
        print('Получен запрос на соединение, ждем сообщения')
        asyncio.create_task(get_message())


async def main():
    await asyncio.create_task(listen_port())


asyncio.run(main())

Поскольку сервер должен быть всегда в ожидании клиентов, нам пришлось вручную организовать бесконечный цикл, чтобы не дать завершиться задаче listen_port, а вместе с ней и циклу событий asyncio. Подобным образом организована работа настоящих веб-серверов.

Давайте подробно рассмотрим как работает цикл событий во взаимодействии с нашей программой.

Главный поток нашей программы передает управление циклу событий asyncio в момент вызова asyncio.run. Далее запускается задача listen_port (здесь и далее я буду называть задачи по именам асинхронных функций - корутин, которые они оборачивают). В точке await asyncio.sleep(5) цикл событий отдает управление таймеру операционной системы, а сам продолжает крутиться, ведь других задач у него пока нет. По истечении 5 секунд операционная система вывешивает флаг о событии "5 секунд истекли" и цикл событий при очередном проходе возобновляет выполнение задачи listen_port с оператора print('Получен запрос на соединение, ждем сообщения'). Далее создается новая задача get_message, в которой цикл событий снова передает управление операционной системе в точке await asyncio.sleep(2). Обратите внимание, в этот момент в цикле событий имеются две незавершенные задачи: listen_port, крутящаяся в бесконечном цикле, и get_message, ожидающая события операционной системы. По истечении 2 секунд операционная система вывешивает флаг о событии "2 секунды истекли" и и цикл событий при очередном проходе возобновляет выполнение задачи get_message с оператора print('Привет сервер!'). После этого задача get_message завершается и в цикле событий остается только задача listen_port. И т. д.

Создатели asyncio прилагают все усилия, чтобы дать разработчику высокоуровневый синтаксический сахар, позволяющий сосредоточиться на логике приложения, не задумываясь над деталями внутреннего устройства библиотеки. Но иногда все же нам приходится извлекать на свет Божий и низкоуровневые сущности.

Давайте попробуем превратить нашу примитивную модель веб-сервера во что-то более практически полезное. Напишем простейший эхо-сервер, способный асинхронно обслуживать несколько клиентов.

Пример 7.3

import asyncio
import socket
from asyncio import AbstractEventLoop


async def echo(connection: socket,
               loop: AbstractEventLoop) -> None:
    # используем низкоуровневые функции цикла событий
    while data := await loop.sock_recv(connection, 1024):
        await loop.sock_sendall(connection, data)


async def listen_for_connection(server_socket: socket,
                                loop: AbstractEventLoop):
    while True:
        connection, address = await loop.sock_accept(server_socket)
        connection.setblocking(False)
        print(f"Получен запрос на соединение от {address}")
        asyncio.create_task(echo(connection, loop))


async def main():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    server_address = ('127.0.0.1', 8000)
    server_socket.setblocking(False)
    server_socket.bind(server_address)
    server_socket.listen()
    
    # извлекаем цикл событий
    asyncio_loop = asyncio.get_event_loop()

    await listen_for_connection(server_socket, asyncio_loop)

asyncio.run(main())

Общая структура программы подобна примеру 7.2. Мы не будем здесь подробно разбирать работу с сокетами. В контексте asyncio нам важно, что это именно тот случай, когда для доступа к низкоуровневым асинхронным функциям sock_sendall и sock_recv нам приходится получать доступ непосредственно к циклу событий asyncio при помощи функции asyncio.get_event_loop()

Наберите в терминале telnet localhost 8000 и убедитесь, что наш эхо-сервер работает корректно, возвращая обратно все принятые от клиента последовательности символов.

Может возникнуть вопрос: а зачем мы извлекаем из недр цикла событий asyncio функции sock_accept и sock_recv, когда могли бы использовать "канонические" socket.accept и socket.recv из модуля socket? Ответ простой - использование блокирующих операций в асинхронном коде лишает его всех преимуществ конкурентности. То есть, пока для одного клиента выполняется установление соединение или ожидание прием символа, остальные будут ожидать своей очереди. В прошлой статье мы с вами уже обсуждали непродуктивность такого подхода.

Ну, хорошо. А если предположить ситуацию, что для общения с неким внешним устройством у нас нет подходящего набора асинхронных функций (неблокирующего драйвера), а есть только традиционный, блокирующий? Это может быть драйвер файлового ввода-вывода (например, логгер), доступа к базе данных, удаленному серверу и т. п. На самом деле не такая уж редко встречающаяся проблема, особенно если вы работаете в зрелом проекте с большим объемом написанного кода. В этом случае нам придется написать неблокирующий драйвер самостоятельно. Об этом пойдет речь в следующем разделе.

8. Пишем асинхронный драйвер

Итак, предположим гипотетическую ситуацию, что нам нужно написать асинхронного клиента для API сервиса погоды, с которым мы работали в разделе 5. Но стандарты нашего проекта (вот беда!) предписывают нам применять для подобных целей непременно библиотеку requests, замечательную во всех отношениях, кроме одного - она является блокирующей, поэтому ее использование в асинхронных функциях не приведет к увеличению быстродействия за счет конкурентного выполнения запросов.

Вспомним каноническое использование requests:

Пример 8.1

import time
import requests


def get_weather(city):
    url = f'http://api.openweathermap.org/data/2.5/weather'
    params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}

    weather_json = requests.get(url=url, params=params).json()
    print(f'{city}: {weather_json["weather"][0]["main"]}')


def main(cities_):
    for city in cities_:
        get_weather(city)


cities = ['Moscow', 'St. Petersburg', 'Rostov-on-Don', 'Kaliningrad', 'Vladivostok',
          'Minsk', 'Beijing', 'Delhi', 'Istanbul', 'Tokyo', 'London', 'New York']

print(time.strftime('%X'))

main(cities)

print(time.strftime('%X'))

Обратите внимание! Чтобы дать вам возможность немедленно насладиться результатом выполнения кода, я снова дарю вам свой ключик от api.openweathermap.org. Если все вы вместе будете эксплуатировать его слишком интенсивно, сервис на время заблокируется. В этом случае вам необходимо зарегистрироваться на нем самостоятельно и подставить свой собственный ключ в APPID.

Запускаем программу и ожидаемо видим последовательный, не очень быстрый, опрос городов.

Чтобы адаптировать блокирующие операции ввода-вывода к asyncio нам придется использовать механизм многопоточности. В python есть модуль threading для работы с потоками. Подробное объяснение многопоточности в python явно выходит за рамки нашего цикла статей. Скажу вкратце, что это еще одна модель конкурентности, позволяющая распараллелить операции ввода вывода. Многопоточность обладает целым рядом врожденных недостатков по сравнению с асинхронностью, но зато позволяет писать более наглядный для человеческого восприятия код. Так уж мы устроены, что нам проще воспринимать концепцию параллельного выполнения, чем вот эти вот все "в точке await цикл событий asyncio передает управление вводом-выводом операционной системе, а сам крутится в ожидании очередных задач". Так или иначе, в python встроен механизм, который позволяет "распихивать" задачи asyncio, содержащие блокирующие функции ввода вывода, по разным потокам и работать с ними при помощи стандартного asyncio.gather, не особенно заморачиваясь, как оно там все внутри устроено. Механизм этот называется пул потоков. Выглядит это примерно вот так:

Пример 8.2

import asyncio
import functools
import time
import requests
from concurrent.futures import ThreadPoolExecutor


def get_weather(city):
    url = f'http://api.openweathermap.org/data/2.5/weather'
    params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}

    weather_json = requests.get(url=url, params=params).json()
    print(f'{city}: {weather_json["weather"][0]["main"]}')


async def main(cities_):
    # извлекаем цикл событий
    loop = asyncio.get_running_loop()

    with ThreadPoolExecutor() as pool:
        # раскладываем задачи по отдельным потокам
        tasks = [loop.run_in_executor(pool, functools.partial(get_weather, city)) for city in cities_]
        
        await asyncio.gather(*tasks)


print(time.strftime('%X'))

cities = ['Moscow', 'St. Petersburg', 'Rostov-on-Don', 'Kaliningrad', 'Vladivostok',
          'Minsk', 'Beijing', 'Delhi', 'Istanbul', 'Tokyo', 'London', 'New York']

asyncio.run(main(cities))

print(time.strftime('%X'))

Функция run_in_executor требует в качестве второго параметра функцию, поэтому для подстановки параметра мы используем слегка громоздкую конструкцию functools.partial.

Задачи, созданные при помощи пула потоков ведут себя точно так же, как и обычные задачи asyncio, несмотря на то, что не содержат асинхронных функций (get_weather в данном случае является традиционной, блокирующей). Вы можете убедиться в этом, сравнив скорость и порядок обработки городов при выполнении примера 8.2 с классическим вариантом применения requests из примера 8.1.

9. Создаем асинхронную числодробилку при помощи пула процессов

Важно понимать, что многопоточность в python позволяют обойти GIL только для операций ввода-вывода. Если вы применяете операции, загружающие процессор ("программы-числодробилки"), то никакого выигрыша от применения пула потоков в асинхронных приложениях (как, впрочем, и в традиционных), вы не получите. Убеждаемся в этом:

Пример 9.1

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor


# числодробилка
def cpu_bounded():
    for i in range(10000000):
        _ = i ** 3


async def main(n):
    loop = asyncio.get_running_loop()

    with ThreadPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, cpu_bounded) for _ in range(n)]

        await asyncio.gather(*tasks)

# варьируйте n от 1 до 10 и проследите за временем выполнением программы
n = 1

print(time.strftime('%X'))

asyncio.run(main(n))

print(time.strftime('%X'))

На моем ноутбуке однократное выполнение числодробилки занимает примерно 2 секунды. Если бы пул потоков позволял выполнять такие операции конкурентно, то увеличение n лишь незначительно увеличивало бы общее время выполнения программы (за счет накладных расходов на создание дополнительных потоков). Однако, поэкспериментировав с разными значениями n, вы убедитесь, что это не так - зависимость времени выполнения от n практически линейная.

Что же делать в тех редких случаях, когда нам кровь из носу нужно использовать "числодробилку" в асинхронном приложении? На помощь приходит пул процессов!

Мультипроцессность - это третий и последний механизм реализации конкурентности python (после асинхронности и многопоточности). Он реализован в модуле multiprocessing и является, пожалуй, самым простым для понимания, но самым затратным с точки зрения вычислительных ресурсов. Кроме того, его необдуманное применение чревато целым рядом трудноуловимых багов. Суть мультипроцессности в том, каждый конкурентный участок кода выполняется на отдельном ядре процессора. Пока свободных ядер хватает, достигается эффект истинной параллельности. Как таковой GIL никуда не девается, только у каждого процесса интерпретатор свой собственный, независимый от других. А что если ядер процессора перестает хватать? Ну, тогда "лишним" процессам приходится немного подождать. Так что, если вы вознамерились создать, скажем, мультипроцессный веб-сервер на 100k rps, то вас ждет большое разочарование.

Как и в случае с потоками, asyncio любезно прячет от нас детали реализации процессов. Легким движением руки пул потоков из примера 9.1 превращается... в пул процессов:

Пример 9.2

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


# числодробилка
def cpu_bounded():
    for i in range(10000000):
        _ = i ** 3


async def main(n):
    loop = asyncio.get_running_loop()

    # используем пул процессов
    with ProcessPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, cpu_bounded) for _ in range(n)]

        await asyncio.gather(*tasks)

# варьируйте n от 1 до 10 и проследите за временем выполнением программы
n = 1

print(time.strftime('%X'))

asyncio.run(main(n))

print(time.strftime('%X'))

В процессоре моего ноутбука 12 ядер, так что увеличение n от 1 до 10 не слишком изменило общее время исполнения программы.

Если применение пула потоков в асинхронном коде - это своего рода "костыль", то применение пула процессов - это "костыль из костылей". Если жизнь заставляет вас применять подобные решения, стоит задуматься, а нужна ли вам вообще асинхронность?Скорее всего в этом случае вам стоит применять традиционный блокирующий код, распараллеливая загружающие процессор участки при помощи модуля multiprocessing.

10. Что дальше?

На этом заканчивается цикл статей "Асинхронный python без головной боли". Но, конечно, при этом продолжается моя история любви с asyncio и мое творчество на хабре.

Вы получили достаточный базовый набор навыков, чтобы успешно применять asyncio на практике и продолжить углублять свои знания. Если вам посчастливится участововать в настоящем "асинхронном" проекте, это получится автоматически. Если же вы изучаете asyncio с целью расширения профессионального кругозора и повышения своей ценности на рынке труда, советую переходить к изучению одного из популярных асинхронных фреймворков. На сегодняшний день безусловный лидер в популярности - это FastAPI. Но есть и другие: aiohttp, Tornado, Sanic, а также большое количество на столь известных, но не менее интересных решений. По мере построения архитектуры проекта у вас неизбежно будет возникать нужда в асинхронных библиотеках-драйверах. Рекомендую обратить пристальное внимание на экосистему aio-libs. Там есть все (ну, почти) для сквозной реализации асинхронного проекта. Кстати, великолепная aiohttp тоже родом именно оттуда. Если чего-то та вам все же не хватает - гитхаб вам в помощь.

Из книг могу посоветовать недавно вышедшую на русском языке "Asyncio и конкурентное программирование на Python". К слову сказать, именно эта книга (еще в англоязычном издании) вдохновила меня на написание этого цикла статей.

Не прощаемся. Я улетел, но обещал вернуться...

Комментарии (5)


  1. 0Bannon
    17.11.2023 18:14

    А я на инглише эту книжку читаю сейчас, тоже нравится. Вот действительно после неё хоть что-то понятно становится.


    1. vlakir Автор
      17.11.2023 18:14
      +1

      Когда станет скучно с Фаулером, подружитесь с Рамальо. Снова все станет непонятно и чудесато) Но читать надо, и не только про asyncio.


      1. mshadow
        17.11.2023 18:14

        А можете подсказать название книги, а то google не в курсе по Рамальо?


        1. vlakir Автор
          17.11.2023 18:14
          +1

          Python. К вершинам мастерства это на русском. А в оригинале он, кажется, Fluent python. Только надо брать второе издание, первое уже порядком устарело.


  1. Alesh
    17.11.2023 18:14

    Асинхронный python вроде уже лет пять, как не вызывает никаких болей. Многие даже и не задумываются над тем, что пишут именно асинхронный код используя популярный фреймворк в своем проекте.