Перед вами восьмая часть (1234567) перевода руководства по модулю asyncio в Python. Здесь вы найдёте разделы исходного материала с 20 по 22.

20. Неблокирующие потоки

Весьма полезной особенностью asyncio можно назвать возможность использования неблокирующих потоков.

20.1. Asyncio-потоки

В модуле asyncio имеются средства для работы с неблокирующими сокетами ввода/вывода. Эти средства представлены потоками.

Потоки — это высокоуровневые примитивы, готовые к использованию в async/await-среде, предназначенные для работы с сетевыми соединениями. Потоки позволяют отправлять и принимать данные без использования коллбэков или низкоуровневых протоколов и транспортов.

Asyncio Streams

Сокет можно открыть, что даёт доступ к средствам для записи данных в сокет и чтения их из него.

Записывать данные в сокет и читать их из него можно, используя корутины, приостанавливая их работу там, где это нужно.

После завершения работы с сокетом его можно закрыть.

Возможности asyncio-сокетов являются низкоуровневыми, в том смысле, что средства для работы с необходимыми протоколами нужно реализовывать самостоятельно.

В число этих протоколов могут входить следующие:

  • HTTP или HTTPS — для взаимодействия с веб-серверами.

  • SMTP — для взаимодействия с почтовыми серверами.

  • FTP — для взаимодействия с файловыми серверами.

Ещё потоки можно использовать для создания сервера, который обрабатывает запросы с использованием какого-нибудь стандартного протокола, или сервера, рассчитанного на применение протокола собственной разработки, ориентированного на конкретное приложение.

Теперь, когда мы кое-что узнали об asyncio-потоках, поговорим о том, как ими пользоваться.

20.2. Как открыть соединение

Для открытия клиентского TCP-соединения с использованием asyncio-сокета можно прибегнуть к функции asyncio.open_connection().

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Устанавливает сетевое соединение и возвращает пару объектов (reader, writer). Эти объекты являются экземплярами классов StreamReader и StreamWriter.

Asyncio Streams

Эта функция является корутиной, завершения работы которой необходимо дождаться. Она возвращает значение сразу после открытия соединения сокета.

Функция возвращает объекты StreamReader и StreamWriter, которые позволяют взаимодействовать с сокетом.

Например:

...
# открытие соединения
reader, writer = await asyncio.open_connection(...)

Функция asyncio.open_connection() принимает множество аргументов, используемых для настройки соединения сокета.

Два необходимых аргумента — это хост (host) и порт (port).

Хост — это строка, описывающая сервер, к которому нужно подключиться. Она может содержать доменное имя сервера или его IP-адрес.

Порт — это номер порта сокета. Например — 80 для HTTP-серверов, 443 для HTTPS-серверов, 23 для SMTP-серверов и так далее.

Например:

...
# открытие соединения с HTTP-сервером
reader, writer = await asyncio.open_connection('www.google.com', 80)

Система поддерживает работу с зашифрованными соединениями посредством протокола SSL.

Самым распространённым примером использования защищённого соединения может служить соединение, использующее протокол HTTPS, который в последнее время приходит на замену протоколу HTTP.

Создать такое соединение можно, установив аргумент ssl в значение True:

...
открытие соединения с HTTPS-сервером
reader, writer = await asyncio.open_connection('www.google.com', 443, ssl=True)

20.3. Как запустить сервер

Открыть asyncio-сокет, на котором будет основана работа TCP-сервера, можно, воспользовавшись функцией asyncio.start_server().

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запускает сервер, работа которого основана на сокетах.

Asyncio Streams

Это — корутина, окончания работы которой нужно дождаться.

Функция возвращает объект asyncio.Server, представляющий работающий сервер.

Например:

...
# запуск tcp-сервера
server = await asyncio.start_server(...)

У этой функции имеются три обязательных аргумента: коллбэк (client_connected_cb), хост (host) и порт (port).

Коллбэк, имя которого передаётся в соответствующем аргументе — это функция, которую пишет сам программист. Она будет вызываться каждый раз, когда клиент подключается к серверу.

Коллбэк client_connected_cb вызывается каждый раз, когда устанавливается новое соединение с клиентом. Эта функция принимает пару объектов (reader, writer) в виде двух аргументов. Эти объекты представляют собой экземпляры классов StreamReader и StreamWriter.

Asyncio Streams

Аргумент host — это доменное имя или IP-адрес сервера, которое будут указывать клиенты для подключения к нему. Аргумент port — это номер порта сокета, на котором сервер будет принимать соединения. Например — 21 для FTP-сервера или 80 для HTTP-сервера.

Например:

# функция для обработки клиентских подключений
async def handler(reader, writer):
    # ...
 
...
# запуск сервера, предназначенного для приёма HTTP-соединений
server = await asyncio.start_server(handler, '127.0.0.1', 80)

20.4. Запись данных в сокет с помощью объекта StreamWriter

Данные в сокет можно записывать, пользуясь объектом asyncio.StreamWriter.

class asyncio.StreamWriter

Представляет объект, предназначенный для записи данных, который имеет API для записи данных в потоки ввода/вывода.

Asyncio Streams

Данные записываются в поток в виде байтов. Сделать это можно с помощью метода write():

write(data)

Этот метод немедленно пытается записать в базовый сокет данные, переданные ему в аргументе data. Если запись произвести не удаётся, данные помещают в очередь, во внутренний буфер записи, где они хранятся до тех пор, пока их можно будет отправить.

Asyncio Streams

Например:

...
# запись побайтного представления данных
writer.write(byte_data)

Ещё можно, с помощью метода writelines(), записать в поток несколько «строк» байтовых данных, изначально представленных в виде списка или любого итерируемого объекта:

...
# запись нескольких строк побайтного представления данных
writer.writelines(byte_lines)

Ни один из этих методов записи данных не блокирует и не приостанавливает вызывающую корутину.

После выполнения команды на запись данных нелишним будет дождаться момента, когда эта операция будет завершена, а данные будут переданы по сети. Сделать это можно, прибегнув к методу drain().

coroutine drain()

Ожидание до того момента, когда допустимо будет продолжить запись в поток.

Asyncio Streams

Это — корутина, которая приостанавливает работу вызывающей стороны до тех пор, пока данные будут переданы по сети и сокет будет готов к дальнейшей работе.

Например:

...
# запись байтового представления данных
writer.write(byte_data)
# ожидание завершения передачи данных
await writer.drain()

20.5. Чтение данных из сокета с помощью объекта StreamReader

Читать данные из сокета можно, пользуясь объектом asyncio.StreamReader.

class asyncio.StreamReader

Представляет объект, предназначенный для чтения данных, который обладает API для чтения данных из потока ввода/вывода.

Asyncio Streams

Данные считываются в байтовом формате, в результате строки, перед их использованием, может понадобиться подвергнуть преобразованию в нужную кодировку.

Все методы чтения данных представлены корутинами, завершения работы которых нужно дожидаться.

Посредством метода read() можно прочитать произвольное количество байтов. Он выполняет считывание данных до достижения признака конца файла (EOF, End Of File).

...
# чтение байтового представления данных
byte_data = await reader.read()

В дополнение к этому — количество байтов данных, которые нужно прочитать, можно указать с помощью аргумента n.

Считывает до n байтов. Если аргумент n не предоставлен или установлен в -1 — считывает данные до достижения EOF и возвращает все прочитанные байты.

Asyncio Streams

Это может пригодиться в том случае, если количество байтов, которое ожидается получить в следующем запросе, известно заранее.

Например:

...
# чтение байтового представления данных
byte_data = await reader.read(n=100)

Отдельную строку данных можно прочитать, воспользовавшись методом readline(). Он возвращает байты до тех пор, пока не встретит символ новой строки (\n) или EOF.

Считывает одну строку, где «строка» — это последовательность байтов, заканчивающаяся символом \n. Если будет принят признак конца файла, EOF, а символ \n обнаружен не был, метод возвращает частично прочитанные данные. Если принят EOF, а внутренний буфер пуст, возвращает пустой байтовый объект.

Asyncio Streams

Это способно оказаться полезным при работе со стандартными протоколами, которые оперируют строками текста.

...
# чтение строки данных
byte_line = await reader.readline()

Кроме того, существует метод readexactly(), который считывает точное количество байтов, а если сделать это не удаётся — возбуждает исключение. А метод readuntil() будет считывать байты до тех пор, пока не будет прочитан, в байтовой форме, заданный символ.

20.6. Как закрыть соединение

Сокет можно закрыть, воспользовавшись возможностями объекта asyncio.StreamWriter. А именно, можно вызывать его метод close(), который и закроет сокет.

close()

Метод закрывает поток и лежащий в его основе сокет.

Asyncio Streams

Этот метод не блокирует вызывающую сторону.

Например:

...
# закрытие сокета
writer.close()

Хотя метод close() и неблокирующий, закрывая сокет, можно, перед продолжением работы, дождаться полного его закрытия.

Делается это с помощью метода wait_closed().

coroutine wait_closed()

Ожидает закрытия сокета. Его нужно вызывать после вызова метода close() для того, чтобы подождать закрытия базового соединения.

Asyncio Streams

Этот метод представляет собой корутину, завершения работы которой надо подождать.

Например:

...
# закрытие сокета
writer.close()
# ожидание закрытия сокета
await writer.wait_closed()

Можно проверить — закрыт ли сокет, или он находится в процессе закрытия. Делается это с помощью метода is_closing():

...
# проверка того, закрыт ли сокет, или он находится в процессе закрытия
if writer.is_closing():
    # ...

Теперь, когда мы знаем о том, как пользоваться asyncio-потоками — рассмотрим рабочий пример.

21. Пример проверки состояния веб-сайта

Используя модуль asyncio можно проверить код HTTP-состояния веб-сайта. Делается это посредством открытия потока, с помощью которого отправляют и принимают HTTP-запросы и ответы.

Asyncio позволяет организовать конкурентный опрос множества сайтов для выяснения кодов их состояния. Можно даже организовать динамический вывод результатов таких опросов.

21.1. Как проверить код HTTP-состояния сайта с помощью asyncio

Модуль asyncio позволяет открывать соединения, основанные на сокетах, а так же позволяет читать и записывать данные, пользуясь потоками.

Эти возможности можно использовать для проверки кодов HTTP-состояния веб-страниц.

Решение этой задачи можно разбить на четыре шага:

  1. Открытие соединения.

  2. Запись запроса.

  3. Чтение ответа.

  4. Закрытие соединения.

Рассмотрим каждый из этих шагов.

21.2. Открытие соединения

При использовании asyncio соединение можно открыть, воспользовавшись функцией asyncio.open_connection().

Среди многих других аргументов эта функция принимает строковое имя хоста и целочисленный номер порта.

Это — корутина, завершения работы которой нужно дождаться. Она возвращает объекты StreamReader и StreamWriter, используемые, соответственно, для записи данных в сокет и для чтения данных из сокета.

Мы можем воспользоваться данной функцией для открытия HTTP-соединения на порту 80.

Например:

...
# открытие соединения сокета
reader, writer = await asyncio.open_connection('www.google.com', 80)

Прибегнув к аргументу ssl=True можно открыть SSL-соединение. Этим можно воспользоваться для открытия HTTPS-соединений на порту 443.

Например:

...
# открытие соединения сокета
reader, writer = await asyncio.open_connection('www.google.com', 443, ssl=True)

21.3. Запись HTTP-запроса

После того, как соединение открыто, мы можем обращаться к объекту StreamWriter, что позволяет выполнять HTTP-запросы.

При использовании HTTP 1.1 запрос выглядит как обычный текст. Обращение к файловому пути / выглядит так:

GET / HTTP/1.1
Host: www.google.com

Тут важно обратить внимание на то, что в конце каждой строки должны быть символы возврата каретки и новой строки (\r\n), а в конце текста запроса должна быть пустая строка.

Если описать подобные строки средствами Python, то получится следующее:

'GET / HTTP/1.1\r\n'
'Host: www.google.com\r\n'
'\r\n'

Подробности о текстах запросов, соответствующих стандарту HTTP v1.1, можно найти здесь.

Прежде чем передавать строку объекту StreamWriter, её надо преобразовать в байтовый формат.

Сделать это можно, воспользовавшись методом строки encode().

По умолчанию этот метод обрабатывает строку с использованием кодировки utf-8, нас это устроит.

...
# получение байтового представления строки
byte_data = string.encode()

Список стандартных кодировок, применяемых в Python, можно найти здесь.

Затем полученные байты можно записать в сокет, пользуясь методом write() объекта StreamWriter.

Например:

...
# отправка в сокет запроса на запись данных
writer.write(byte_data)

После выполнения запроса нелишним будет подождать завершения отправки данных, дождаться готовности сокета к дальнейшей работе с ним. Сделать это можно, прибегнув к методу drain(). Это — корутина, завершения работы которой нужно подождать:

...
# ожидание готовности сокета к дальнейшей работе с ним
await writer.drain()

21.4. Чтение HTTP-ответа

После того, как отправлен HTTP-запрос, ожидается поступление ответа, который можно прочитать. Делается это с помощью объекта StreamReader, связанного с сокетом.

Ответ можно прочитать с помощью метода read(), который читает некую последовательность байтов, или с помощью метода readline(), который считывает одну строку байтов.

Тут нас может больше заинтересовать метод readline(), так как мы используем протокол HTTP, использующий обычный текст, при применении этого протокола HTML-данные отправляются построчно.

Метод readline() — это корутина, поэтому при его использовании необходимо дождаться завершения его работы.

Например:

...
# чтение одной строки ответа
line_bytes = await reader.readline()

Ответы HTTP 1.1 состоят из двух частей. Это — заголовок, отделённый от тела ответа пустой строкой, и тело ответа, завершающееся пустой строкой.

В заголовке содержатся сведения о том, был ли запрос успешным, и о том, файл какого типа отправлен в ответе. В теле ответа находится содержимое файла. Например — код HTML-страницы.

Первая строка HTTP-заголовка содержит код состояния HTTP для запрошенной с сервера страницы.

Подробности об ответах HTTP v1.1 можно найти здесь.

Каждую строку ответа нужно перекодировать, преобразовав из байтового представления в строковое. Сделать это можно с помощью метода decode() объекта, содержащего байты данных. Кодировка, применяемая по умолчанию, как и в ранее рассмотренном методе encode(), это — utf8.

Например:

...
# декодирование байтового представления данных в строковое
line_data = line_bytes.decode()

21.5. Закрытие HTTP-соединения

Закрыть соединение сокета можно, закрыв объект StreamWriter. Делается это с помощью метода close():

...
# закрытие соединения
writer.close()

Это — неблокирующий метод, его вызов может не привести к немедленному закрытию сокета.

Теперь, когда мы знаем о том, как делать HTTP-запросы и читать ответы с использованием asyncio — посмотрим на примеры проверки кодов состояний веб-страниц.

21.6. Пример последовательной проверки кодов HTTP-состояний страниц

Можно, используя модуль asyncio, написать код, который будет проверять коды состояний множества сайтов.

В этом примере мы сначала разработаем корутину, которая будет проверять код состояния ресурса, доступного по переданному ей URL. Затем мы будем вызывать эту корутину для каждого из сайтов, входящих в первую десятку самых посещаемых веб-ресурсов.

Определим корутину, которая будет принимать, в виде строки, URL, и возвращать код HTTP-состояния.

# получение кода HTTP/S состояния страницы
async def get_status(url):
    # ...

URL необходимо разобрать, разделив на составные части.

Нам, при выполнении HTTP-запроса, нужно имя хоста и путь к файлу. Нам, кроме того, необходимо знать об используемой схеме URL (HTTP или HTTPS) для того чтобы определить, нужно или нет применять SSL.

Распарсить URL можно с помощью функции urllib.parse.urlsplit(). Она принимает строку, представляющую URL, и возвращает именованный кортеж, содержащий все элементы URL:

...
# разбиение URL на компоненты
url_parsed = urlsplit(url)

После этого, зная схему URL и пользуясь именем хоста, можно открыть HTTP-соединение:

...
# открытие соединения
if url_parsed.scheme == 'https':
    reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
else:
    reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)

Далее — можно, пользуясь именем хоста и путём к файлу, создать запрос HTTP GET. Закодированные должным образом байтовые данные можно записать в сокет с помощью подходящего метода объекта StreamWriter:

...
# отправка GET-запроса
query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
# запись запроса в сокет
writer.write(query.encode())
# ожидание завершения записи байтов в сокет
await writer.drain()

Теперь можно прочитать HTTP-ответ.

Нам нужна лишь первая строка ответа, которая содержит код HTTP-состояния страницы:

...
# чтение одной строки ответа
response = await reader.readline()

На этом этапе работы соединение может быть закрыто:

...
# закрытие соединения
writer.close()

И наконец — мы декодируем байтовые данные, прочитанные из ответа сервера, убираем лишние пробельные символы и возвращаем код состояния HTTP:

...
# декодирование ответа и его очистка от ненужных пробельных символов
status = response.decode().strip()
# возврат результата
return status

Если собрать всё это воедино — получится корутина get_status(), код которой приведён ниже.

Тут нет кода для обработки ошибок, например — таких, которые могут возникнуть в том случае, если хост недоступен, или в случае, если он отвечает на запросы слишком медленно. Добавление в эту корутину системы обработки ошибок может стать неплохим упражнением для читателя.

# получение кода HTTP/S состояния страницы
async def get_status(url):
    # разбиение URL на компоненты
    url_parsed = urlsplit(url)
    # открытие соединения
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # отправка GET-запроса
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    # запись запроса в сокет
    writer.write(query.encode())
    # ожидание завершения записи байтов в сокет
    await writer.drain()
    # чтение одной строки ответа
    response = await reader.readline()
    # закрытие соединения
    writer.close()
    # декодирование ответа и его очистка от ненужных пробельных символов
    status = response.decode().strip()
    # возврат результата
    return status

Теперь корутину get_status() можно вызвать несколько раз, передавая ей адреса страниц или сайтов, которые нужно проверить.

В данном случае мы определяем список из 10 самых посещаемых сайтов в мире:

...
# список из 10 самых посещаемых сайтов, которые нужно проверить
sites = ['https://www.google.com/',
    'https://www.youtube.com/',
    'https://www.facebook.com/',
    'https://twitter.com/',
    'https://www.instagram.com/',
    'https://www.baidu.com/',
    'https://www.wikipedia.org/',
    'https://yandex.ru/',
    'https://yahoo.com/',
    'https://www.whatsapp.com/'
    ]

С помощью корутины get_status() каждый из этих сайтов можно опросить по очереди.

В данном случае мы опрашиваем сайты последовательно, в цикле, и по очереди выводим сведения о коде HTTP-состояния для каждого из них:

...
# проверка кодов HTTP-состояния для всех сайтов
for url in sites:
    # получение кода состояния для URL
    status = await get_status(url)
    # вывод URL и его кода состояния
    print(f'{url:30}:\t{status}')

Используя asyncio можно написать более совершенный код, не ограничивающийся последовательным опросом сайтов. Но уже написанный код является хорошей отправной точкой, его можно улучшить в будущем.

Теперь создадим корутину main(), в которой собрано всё то, что мы уже сделали. Она занимается опросом интересующих нас сайтов.

# главная корутина
async def main():
    # список из 10 самых посещаемых сайтов, которые нужно проверить
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.facebook.com/',
        'https://twitter.com/',
        'https://www.instagram.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.whatsapp.com/'
        ]
    # проверка кодов HTTP-состояния для всех сайтов
    for url in sites:
        # получение кода состояния для URL
        status = await get_status(url)
        # вывод URL и его кода состояния
        print(f'{url:30}:\t{status}')

Теперь можно создать корутину main() и воспользоваться ей как точкой входа в asyncio-программу:

...
# запуск asyncio-программы
asyncio.run(main())

Соберём всё, что мы написали, и выйдем на полный код примера, который представлен ниже.

# SuperFastPython.com
# проверка кодов состояния множества страниц
import asyncio
from urllib.parse import urlsplit
 
# получение кода HTTP/S состояния страницы
async def get_status(url):
    # разбиение URL на компоненты
    url_parsed = urlsplit(url)
    # открытие соединения
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # отправка GET-запроса
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    # запись запроса в сокет
    writer.write(query.encode())
    # ожидание завершения записи байтов в сокет
    await writer.drain()
    # чтение одной строки ответа
    response = await reader.readline()
    # закрытие соединения
    writer.close()
    # декодирование ответа и его очистка от ненужных пробельных символов
    status = response.decode().strip()
    # возврат результата
    return status
 
# главная корутина
async def main():
    # список из 10 самых посещаемых сайтов, которые нужно проверить
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.facebook.com/',
        'https://twitter.com/',
        'https://www.instagram.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.whatsapp.com/'
        ]
    # проверка кодов HTTP-состояния для всех сайтов
    for url in sites:
        # получение кода состояния для URL
        status = await get_status(url)
        # вывод URL и его кода состояния
        print(f'{url:30}:\t{status}')
 
# запуск asyncio-программы
asyncio.run(main())

При запуске этого кода сначала создаётся корутина main(), которая используется как точка входа в программу.

Корутина main() выполняется и определяет список, который содержит первую десятку самых посещаемых сайтов.

Затем осуществляется последовательный обход этого списка. Корутина main() приостанавливается и вызывает корутину get_status() для выяснения кода состояния одного сайта.

Корутина get_status() запускается, разбирает переданный ей URL и открывает соединение. Она конструирует HTTP-запрос GET и отправляет его хосту. Далее — она читает ответ, декодирует его и возвращает.

Корутина main() возобновляет работу и выводит код HTTP-состояния обработанного URL.

Этот процесс повторяется для каждого адреса из списка.

На выполнение программы уходит примерно 5,6 секунд, то есть — на обработку одного URL тратится, в среднем, примерно полсекунды.

Здесь продемонстрирован пример использования возможностей asyncio для выяснения кодов HTTP-состояний веб-страниц. Но, несмотря на то, что тут задействован этот модуль, наш код не пользуется его возможностями в полной мере, не выполняет задачи по опросу сайтов в конкурентном режиме.

https://www.google.com/       :    HTTP/1.1 200 OK
https://www.youtube.com/      :    HTTP/1.1 200 OK
https://www.facebook.com/     :    HTTP/1.1 302 Found
https://twitter.com/          :    HTTP/1.1 200 OK
https://www.instagram.com/    :    HTTP/1.1 200 OK
https://www.baidu.com/        :    HTTP/1.1 200 OK
https://www.wikipedia.org/    :    HTTP/1.1 200 OK
https://yandex.ru/            :    HTTP/1.1 302 Moved temporarily
https://yahoo.com/            :    HTTP/1.1 301 Moved Permanently
https://www.whatsapp.com/     :    HTTP/1.1 302 Found

Поговорим теперь о том, как усовершенствовать данный пример, сделать так, чтобы корутины выполнялись бы в конкурентном режиме.

21.7. Пример конкурентной проверки кодов HTTP-состояний страниц

Сила модуля asyncio в том, что он позволяет организовывать конкурентное выполнение множества корутин.

Конкурентные запросы кодов HTTP-состояния веб-сайтов можно выполнять, воспользовавшись функцией asyncio.gather().

Эта функция принимает одну или несколько корутин, дожидается их выполнения и возвращает результаты их работы в виде итерируемого объекта. Далее — можно обойти список URL и возвращённый функцией объект для вывода результатов.

Этот подход может оказаться проще, чем тот, которым мы пользовались.

Сначала создаётся список корутин:

...
# создание всех корутин для выполнения запросов
coros = [get_status(url) for url in sites]

Далее — мы, пользуясь функцией asyncio.gather(), можем выполнить корутины и получить итерируемый объект с возвращёнными ими результатами.

Обратите внимание на то, что мы не можем напрямую передать этой функции список корутин. Вместо этого необходимо распаковать список, добраться до его отдельных элементов, которые будут переданы функции в виде позиционных аргументов.

...
# выполнение всех корутин и ожидание завершения их работы
results = await asyncio.gather(*coros)

Этот код выполнит все корутины в конкурентном режиме и соберёт результаты их работы.

Затем можно обойти список URL и объект с результатами опроса сайтов, после чего всё это можно вывести.

...
# обработка всех результатов
for url, status in zip(sites, results):
    # вывод результатов
    print(f'{url:30}:\t{status}')

Соберём всё, что мы написали, и получим готовый код примера.

# SuperFastPython.com
# проверка кодов состояния множества страниц
import asyncio
from urllib.parse import urlsplit
 
# получение кода HTTP/S состояния страницы
async def get_status(url):
    # разбиение URL на компоненты
    url_parsed = urlsplit(url)
    # открытие соединения
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # отправка GET-запроса
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    # запись запроса в сокет
    writer.write(query.encode())
    # ожидание завершения записи байтов в сокет
    await writer.drain()
    # чтение одной строки ответа
    response = await reader.readline()
    # закрытие соединения
    writer.close()
    # декодирование ответа и его очистка от ненужных пробельных символов
    status = response.decode().strip()
    # возврат результата
    return status
 
# главная корутина
async def main():
    # список из 10 самых посещаемых сайтов, которые нужно проверить
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.facebook.com/',
        'https://twitter.com/',
        'https://www.instagram.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.whatsapp.com/'
        ]
    # создание всех корутин для выполнения запросов
    coros = [get_status(url) for url in sites]
    # выполнение всех корутин и ожидание завершения их работы
    results = await asyncio.gather(*coros)
    # обработка всех результатов
    for url, status in zip(sites, results):
        # вывод результатов
        print(f'{url:30}:\t{status}')
# запуск asyncio-программы
asyncio.run(main())

При запуске этого кода, как и прежде, выполняется корутина main(). В данном случае список корутин создаётся с использованием механизма спискового включения.

Затем вызывается функция asyncio.gather(). Ей передаются корутины, она приостанавливает выполнение корутины main() до тех пор, пока все переданные ей корутины не вернут результаты.

Корутины выполняются, конкурентно отправляют запросы к веб-сайтам и возвращают коды HTTP-состояния для опрошенных ресурсов.

Корутина main() возобновляет работу и получает итерируемый объект, содержащий сведения о состояниях сайтов. Потом осуществляется обход этого объекта вместе со списком URL. Тут применяется встроенная функция zip(). В процессе обхода этих структур выводятся сведения о состояниях сайтов.

Этот код проще того, который был представлен в предыдущем примере. Здесь организовано конкурентное выполнение корутин. Данный пример, кроме того, быстрее, чем последовательный код, решающий ту же задачу. А именно, на выполнение тех же действий ему, в моей системе, надо примерно 1,4 секунды.

https://www.google.com/       :    HTTP/1.1 200 OK
https://www.youtube.com/      :    HTTP/1.1 200 OK
https://www.facebook.com/     :    HTTP/1.1 302 Found
https://twitter.com/          :    HTTP/1.1 200 OK
https://www.instagram.com/    :    HTTP/1.1 200 OK
https://www.baidu.com/        :    HTTP/1.1 200 OK
https://www.wikipedia.org/    :    HTTP/1.1 200 OK
https://yandex.ru/            :    HTTP/1.1 302 Moved temporarily
https://yahoo.com/            :    HTTP/1.1 301 Moved Permanently
https://www.whatsapp.com/     :    HTTP/1.1 302 Found

Теперь поговорим о распространённых ошибках, возникающих при работе с asyncio.

22. Распространённые ошибки, которые допускают разработчики при работе с asyncio

В этом разделе представлены примеры распространённых ошибок, с которыми сталкиваются разработчики, пользуясь в Python модулем asyncio. А именно, мы обсудим 5 проблемных ситуаций, которые встречаются чаще всего:

  1. Попытка выполнения корутин путём их вызова.

  2. Корутине не позволяют выполниться в цикле событий.

  3. Использование низкоуровневого API модуля asyncio.

  4. Слишком ранний выход из главной корутины.

  5. Ложное предположение о том, что состояние гонок или взаимная блокировка невозможны.

Рассмотрим эти ситуации.

22.1. Ошибка 1: попытка выполнения корутин путём их вызова

Самая распространённая ошибка, которая чаще других встречается у тех, кто только начинает пользоваться asyncio, заключается в вызове корутины так, будто это — обычная функция.

Например, можно определить корутину, воспользовавшись конструкцией async def:

# пользовательская корутина
async def custom_coro():
    print('hi there')

Новичок, имея такое объявление, попытается вызвать корутину как функцию и будет ждать, что она выведет сообщение.

Это может выглядеть так:

...
# некорректная попытка вызова корутины так, будто это функция
custom_coro()

При таком вызове корутины её тело выполнено не будет. Вместо этого будет создан объект корутины.

Затем можно подождать завершения работы этого объекта в среде выполнения asyncio, то есть — в цикле событий.

Запустить цикл событий для выполнения корутин можно, воспользовавшись функцией asyncio.run():

...
# запуск корутины
asyncio.run(custom_coro())

Ещё можно приостановить текущую корутину и запланировать выполнение другой корутины, воспользовавшись выражением await:

...
# планирование выполнения корутины
await custom_coro()

Подробности о выполнении корутин можно найти здесь.

22.2. Ошибка 2: корутине не позволяют выполниться в цикле событий

Если корутина не была запущена — будет выдано предупреждение времени выполнения. Например — такое:

sys:1: RuntimeWarning: coroutine 'custom_coro' was never awaited

Это произойдёт в том случае, если был создан объект корутины, но не было запланировано его выполнение в цикле событий.

Предположим, корутину могут попытаться вызвать из обычного Python-кода:

...
# попытка вызова корутины
custom_coro()

Это, как мы уже видели, не приведёт к вызову корутины. В результате выполнения этой команды будет создан объект корутины.

Эти объекты обычно создают так:

...
# создание объекта корутины
coro = custom_coro()

Если программист не даст этой корутине шанса выполниться — будет выдана ошибка времени выполнения.

Как нам известно из предыдущего раздела, корутине можно позволить выполниться, запустив цикл событий и передав ему объект корутины:

...
# создание объекта корутины
coro = custom_coro()
# запуск корутины
asyncio.run(coro)

То же самое можно сделать и в одну строку:

...
# запуск корутины
asyncio.run(custom_coro())

Здесь можно найти подробности о запуске корутин.

Если при выполнении вашего asyncio-кода выдаётся такая ошибка — её причиной является тот факт, что вы создали корутину, но не запланировали её выполнение.

А запланировать выполнение корутины можно с помощью выражения await:

...
# создание объекта корутины
coro = custom_coro()
# приостанавливаем вызывающую сторону и позволяем корутине выполниться
await coro

Ещё можно запланировать независимый запуск корутины в виде задачи:

...
# создание объекта корутины
coro = custom_coro()
# планирование независимого запуска корутины в виде задачи
task = asyncio.create_task(coro)

В этом учебном руководстве можно найти больше сведений о создании задач.

22.3. Ошибка 3: использование низкоуровневого API модуля asyncio

Большой проблемой новичков может стать применение неподходящего API asyncio.

Эта проблема часто встречается по множеству причин. Среди них:

  • API сильно изменился в свежей версии Python.

  • В документации к API приведены примеры использования и старой версии API, и новой, что сбивает с толку читателя.

  • В примерах, найденных где-то в интернете, смешаны различные API.

Применение неподходящего для решения некоей задачи API приводит к увеличению длины кода, к его усложнению, к тому, что усложняется его восприятие и понимание.

Модуль asyncio предлагает программисту два API:

  1. Высокоуровневый API, рассчитанный на разработчиков приложений (то есть — на нас с вами).

  2. Низкоуровневый API, предназначенный для создателей фреймворков и библиотек (то есть — не на нас с вами).

Низкоуровневый API — это основа высокоуровневого API, в его состав входят внутренние механизмы цикла событий, транспортные протоколы, политики и многое другое.

… существуют низкоуровневые API, предназначенные для разработчиков библиотек и фреймворков.

asyncio — Asynchronous I/O

Мы, разработчики обычных приложений, должны почти всегда ограничиваться высокоуровневым API.

А начинающим совершенно необходимо держаться в рамках высокоуровневого API.

Обращаться к низкоуровневому API можно лишь иногда, для достижений неких особых целей.

Если вы начинаете с того, что получаете ссылку на цикл событий, или с того, что пользуетесь переменной loop для решения различных задач — это значит, что вы что-то делаете не так. При этом я не хочу сказать, что изучать низкоуровневый API не нужно. Это — замечательный инструмент. Интересуйтесь им, осваивайте его. Но не делайте свои первые asyncio-шаги, прибегая к низкоуровневому API.

Некоторое время пользуйтесь возможностями asyncio через средства высокоуровневого API. Создайте несколько программ. Привыкните к асинхронному программированию и к запуску корутин в соответствии со своими планами.

А уже позже присмотритесь к низкоуровневому API.

22.4. Ошибка 4: слишком ранний выход из главной корутины

Очень важный источник проблем asyncio-программ заключается в том, что программисты не дают задачам достаточно времени на то, чтобы они могли бы завершить свою работу.

В asyncio-программе можно, пользуясь методом asyncio.create_task(), запланировать независимое выполнение множества корутин.

Главная корутина, точка входа в программу, после планирования запуска корутин, может продолжать работу, занимаясь другими делами.

Если осуществляется выход из главной корутины — asyncio-программа завершается. Программа завершится даже в том случае, если всё ещё идёт независимое выполнение, в виде задач, одной или нескольких корутин.

Это может застать программиста врасплох.

Можно создать несколько задач и позволить главной корутине продолжить работу, ожидая, что все эти задачи в определённое время завершатся.

Но вместо этого, если главной корутине больше нечего делать, нужно, чтобы она подождала бы завершения оставшихся невыполненными задач.

Сделать это можно так. Сначала, с помощью функции asyncio.all_tasks(), надо получить множество, содержащее все выполняющиеся задачи. Потом из этого множества нужно убрать текущую задачу. А в итоге нужно дождаться выполнения всех оставшихся задач, прибегнув к функции asyncio.wait().

Например:

...
# получение множества, содержащего все выполняющиеся задачи
all_tasks = asyncio.all_tasks()
# получение текущей задачи
current_task = asyncio.current_task()
# удаление текущей задачи из списка текущих задач
all_tasks.remove(current_task)
# приостановка работы до момента, когда все задачи будут завершены
await asyncio.wait(all_tasks)

22.5. Ошибка 5: ложное предположение о том, что состояние гонок или взаимная блокировка невозможны

С конкурентным программированием сопряжены особенные риски и ошибки. Среди них — такие проблемы, как состояния гонок и взаимные блокировки.

Состояние гонки возникает, когда две конкурентных сущности, или их большее количество, одновременно выполняют один и тот же критический участок кода, оставляя при этом ресурсы или данные в неоднородном или непредусмотренном логикой программы состоянии. Это может привести к повреждению или потере данных.

Взаимная блокировка — это когда несколько процессов находятся в состоянии ожидания ресурсов, занятых друг другом, и ни один из них не может продолжать своё выполнение.

Многие Python-разработчики полагают, что подобные проблемы не могут возникнуть при работе с корутинами в asyncio-программах.

Причина этого заблуждения в том, что в некий момент времени в цикле событий может выполняться лишь одна корутина.

То, что в некий момент времени может выполняться лишь одна корутина — правда.

Но проблема заключается в том, что выполнение корутины может приостанавливаться и возобновляться, причём, происходить это может при использовании некоего разделяемого ресурса или некоей разделяемой переменной.

Если не защищать критически важные участки кода — то состояние гонки может возникнуть и в asyncio-программе.

А если не уделять должного внимания примитивам синхронизации — в asyncio-программах могут возникать и взаимные блокировки.

В результате получается, что при создании asyncio-программ важно уделять внимание безопасному использованию корутин, придерживаясь в применении к корутинам концепций, схожих с теми, что применяются при безопасной работе с потоками и процессами.

О, а приходите к нам работать? ???? ????

Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.

Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.

Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.

Присоединяйтесь к нашей команде.

Комментарии (2)


  1. kt97679
    07.02.2023 06:04

    Можно попросить вас привести пример для asyncio неблокирующего чтения из stdin?


  1. valentinmk
    07.02.2023 16:07

    Был проект, конектился по ssh, выполнял команду, и уже в консоли этой команды ждал данных, была беда с readline: ты отправляешь команду ждешь ответа, он даже приходит, но конца строки с той стороны не присылают.
    Ох в итоге там был монстр с тайм-аутами, который принимал решение, что ответ отправлен полностью. Иначе таска "вешалась" в бесконечном ожидании.
    Имело бы смысл добавить о таймаутах инфы, или вынести в отдельнцю статью
    Статья полностью годная! Многие моменты пока читал не находил, но вот в разделе с ошибками все было покрыто. Молодцы