image

▍ Введение


При создании нового приложения с нуля, где основным ограничивающим фактором является производительность ввода-вывода, использование asyncio может быть вполне оправданным выбором. С самого начала можно интегрировать неблокирующие библиотеки, совместимые с asyncio, такие как asyncpg и aiohttp. Однако большая часть работы может быть связана с поддержкой уже существующего кода, который использует блокирующие библиотеки ввода-вывода, например, requests для HTTP-запросов или psycopg для взаимодействия с базами данных Postgres. Кроме того, возможно, что подходящих библиотек, совместимых с asyncio, просто не существует. В таком случае, как можно извлечь выгоду из конкурентных преимуществ, которые предлагает asyncio?

Многопоточность предоставляет решение для данной задачи. Поскольку блокирующие операции ввода-вывода освобождают глобальную блокировку интерпретатора, мы можем выполнять ввод-вывод параллельно в различных потоках. Аналогично библиотеке multiprocessing, asyncio позволяет использовать пулы потоков, что даёт нам все преимущества многопоточности, не отказываясь от таких API, как gather и wait.

▍ Введение в модуль threading


Для создания и управления потоками в Python используется модуль threading. Он предоставляет класс Thread, экземпляр которого принимает функцию, которая будет выполняться в отдельном потоке. Интерпретатор Python является однопоточным, что означает, что в любой момент времени может выполняться только один участок байт-кода, даже если в системе работает несколько потоков. Глобальная блокировка интерпретатора (GIL) не позволяет запускать несколько потоков одновременно.

На первый взгляд может показаться, что Python не позволяет нам извлечь выгоду из многопоточности, однако в некоторых случаях GIL освобождается, и одним из таких случаев являются операции ввода-вывода. Python может освободить GIL на время выполнения этих операций, поскольку они требуют вызова низкоуровневых функций операционной системы, которые работают за пределами интерпретатора и не могут повредить его внутренним структурам — именно для этого и предназначен GIL.

Чтобы лучше понять, как создавать и использовать потоки в контексте блокирующего ввода-вывода, давайте посмотрим, как работает многопоточный эхо-сервер. Если кто-то впервые слышит про сокеты, то советую прочитать вот эту статью. Для того, чтобы сокет обрабатывал несколько подключений, нам нужно было перевести сокеты в неблокирующий режим и использовать модуль select для отслеживания событий на сокетах. Но что делать, если мы работаем с устаревшим кодом, который не позволяет использовать неблокирующие сокеты? Можно ли тогда создать эхо-сервер, способный одновременно обрабатывать более одного клиента?

Поскольку методы сокета recv и sendall — это операции ввода-вывода, ничего не мешает выполнять их в разных потоках конкурентно. А это значит, что мы можем создать по одному потоку для каждого подключившегося клиента, и в этом потоке читать и записывать данные. Чтобы проверить эту идею, будем ждать подключения в главном потоке, а затем создавать отдельный поток для эхо-копирования данных от клиента.

#  Многопоточный эхо-сервер
from threading import Thread
import socket

def echo(client: socket):
    while True:
        data = client.recv(2048)
        print(f'Получено {data}, отправляю!')
        client.sendall(data)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    # Создаём TCP-сервер
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # Прослушиваем запросы на подключение
    server.bind(('127.0.0.1', 8000))
    server.listen()
    while True:
        # Блокируется в ожидании подключения клиентов
        connection, client_address = server.accept()
        # Как только клиент подключился, создать поток для выполнения функции echo
        thread = Thread(target=echo, args=(connection,))
        # Начать выполнение потока
        thread.start()

В этом фрагменте кода мы находимся в бесконечном цикле, ожидая подключений к серверному сокету. Когда клиент устанавливает соединение, мы создаём новый поток для выполнения функции echo. В параметре target потоку передаётся функция echo, а в параметре args — кортеж аргументов, которые должны быть переданы этой функции. Таким образом, в потоке будет вызвана echo(connection). После этого мы запускаем поток и переходим к следующей итерации цикла, продолжая ожидать новых подключений. В то же время в только что созданном потоке выполняется цикл, который ждёт данные от клиента и отправляет их обратно.

Мы можем запустить любое количество конкурентных telnet-клиентов, и все их сообщения будут корректно обрабатываться. Поскольку методы recv и sendall работают в отдельных потоках, они не блокируют друг друга. Блокировка происходит только в том потоке, где они выполняются. Это позволяет решить проблему одновременного подключения нескольких клиентов при использовании блокирующих сокетов, хотя у данного подхода есть свои сложности, связанные с потоками. Например, что произойдёт, если мы попытаемся завершить процесс с помощью CTRL+C в момент, когда клиенты подключены? Будут ли корректно остановлены созданные потоки?

Выясняется, что это не так. При вызове метода server.accept() возникает исключение KeyboardInterrupt, однако приложение не завершится, поскольку продолжают работать фоновые потоки. Более того, подключённые клиенты всё ещё могут отправлять и получать данные. К сожалению, пользовательские потоки не обрабатывают исключение KeyboardInterrupt, так как оно возникает только в главном потоке. Поэтому наши потоки продолжают считывать данные от клиентов, что мешает завершению приложения.

Существует два способа решения этой проблемы. Во-первых, можно использовать потоки-демоны, а во-вторых, разработать собственный механизм прерывания работающего потока. Потоки-демоны предназначены для выполнения длительных фоновых задач и не препятствуют завершению приложения. Если в системе работают только потоки-демоны, приложение завершится автоматически. Главный поток Python не является демоном, но если все потоки, обслуживающие подключения, сделать демонами, приложение корректно завершится при возникновении KeyboardInterrupt. Чтобы сделать потоки демонами, достаточно установить thread.daemon = True перед вызовом thread.start(). После этого изменения приложение будет правильно завершаться при нажатии CTRL+C.

Однако у этого подхода есть недостаток: потоки-демоны завершаются без уведомления, и в этот момент мы не можем выполнить код для очистки. Например, если мы хотим отправить клиенту сообщение о том, что сервер будет остановлен, возникает вопрос: можно ли как-то прервать поток и корректно закрыть сокет? Если вызвать метод shutdown у сокета, все текущие вызовы recv вернут 0, а вызовы sendall вызовут исключение. Если мы вызовем shutdown из главного потока, это прервёт обслуживающие потоки клиентов, которые заблокированы на вызовах recv или sendall. Затем мы сможем обработать исключение в клиентском потоке и выполнить очистку.
Для этого мы будем создавать потоки немного иначе, унаследовав их от класса Thread. Это позволит нам определить поток с методом close, который остановит клиентский сокет. Таким образом, методы recv и sendall будут прерваны, и мы сможем выйти из цикла while и завершить поток.

Класс Thread имеет метод run, который можно переопределить. В нашем подклассе Thread мы реализуем этот метод, поместив в него код, который должен выполняться в потоке после его запуска. В данном случае это будет цикл эхо-копирования с вызовами методов recv и sendall.

# Создание подкласса Thread для чистой остановки
from threading import Thread
import socket

class ClientEchoThread(Thread):
    def __init__(self, client):
        super().__init__()
        self.client = client
    def run(self):
        try:
            while True:
                data = self.client.recv(2048)
                # Если нет данных, возбудить исключение. 
                # Это бывает, когда подключение было закрыто клиентом или остановлено сервером
                if not data:
                    raise BrokenPipeError('Подключение закрыто!')
                print(f'Получено {data}, отправляю!')
                self.client.sendall(data)
        # В случае исключения выйти из метода run. 
        # При этом поток завершается
        except OSError as e:
                print(f'Поток прерван исключением {e}, производится остановка!')

    def close(self):
        # Разомкнуть подключение, если поток ещё активен; 
        # Поток может быть неактивен, если клиент закрыл подключения
        if self.is_alive():
            self.client.sendall(bytes('Останавливаюсь!', encoding='utf-8'))
            # Разомкнуть подключение клиента, остановив чтение и запись
            self.client.shutdown(socket.SHUT_RDWR)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('127.0.0.1', 8000))
    server.listen()
    connection_threads = []
    try:
        while True:
            connection, addr = server.accept()
            thread = ClientEchoThread(connection)
            connection_threads.append(thread)
            thread.run()
    except KeyboardInterrupt:
        print('Останавливаюсь!')
        # Вызвать метод close созданных потоков, чтобы разомкнуть все клиентские подключения в случае прерывания с клавиатуры
        [thread.close() for thread in connection_threads]

Вначале мы создадим класс, названный ClientEchoThread, который наследует от класса Thread. В этом классе мы переопределяем метод run, в который интегрирован код из функции echo, но с рядом важных модификаций. Прежде всего, весь код обернут в блок try/except, что позволяет нам перехватывать исключения типа OSError. Это исключение может возникать, когда метод sendall пытается отправить данные через сокет, который был закрыт клиентом.

Дополнительно мы проверяем, не возвращает ли метод recv значение 0. Это происходит в двух ситуациях: когда клиент закрыл соединение (например, если пользователь завершил сеанс telnet) или когда сервер закрыл соединение с клиентом. В любом из этих случаев мы генерируем исключение BrokenPipeError, которое является подклассом OSError. Затем мы выводим соответствующее сообщение в блоке except и завершаем выполнение метода run, что приводит к остановке потока.

Кроме того, в классе ClientEchoThread реализован метод close. Прежде чем закрыть клиентское соединение, он проверяет, активен ли поток. Но что именно мы понимаем под «активным» потоком? Поток считается активным, если выполняется его метод run. Это важно, поскольку поток может оказаться неактивным в случае, если клиент закрыл соединение.
Если происходит ситуация, когда нет данных для чтения или записи, это может быть связано с тем, что подключение было закрыто клиентом или остановлено сервером. В случае возникновения исключения поток завершает своё выполнение и выходит из метода run.
Если поток всё ещё активен, мы можем безопасно закрыть подключение. Однако важно учитывать, что поток может быть неактивен, если клиент сам закрыл соединение. В этом случае попытка разорвать соединение приведёт к исключению BrokenPipeError в методе run, ещё до вызова метода close. Поэтому при вызове метода sendall также может возникнуть исключение, так как подключение уже не функционирует.

В главном цикле нашего приложения мы прослушиваем порт в ожидании входящих подключений и обрабатываем исключение KeyboardInterrupt. Как только это исключение возникает, мы вызываем метод close для каждого созданного потока. Если подключение клиента всё ещё активно, ему отправляется уведомление перед тем, как разорвать соединение.
В общем, управление работающими потоками в Python — это сложная задача, решение которой зависит от конкретного контекста. Необходимо принимать специальные меры для того, чтобы потоки не мешали завершению приложения, и правильно размещать точки прерывания для выхода из потоков.

Мы рассмотрели два метода ручного управления потоками: создание объекта потока с заданной целевой функцией и создание подкласса Thread с переопределением метода run. Теперь, разобравшись с основами многопоточности, мы можем перейти к изучению того, как использовать её в сочетании с библиотекой asyncio, чтобы эффективно работать с популярными блокирующими библиотеками.

▍ Совместное использование потоков и asyncio


Теперь мы знаем, как создать несколько потоков и управлять ими для обработки блокирования. Недостаток в том, что потоки нужно создавать индивидуально и отслеживать их. А хотелось бы использовать API, основанные на asyncio, которые умеют дожидаться результата от потока, а не управлять потоками самостоятельно. Для этого можно использовать пулы потоков. В этом примере мы будем использовать популярную блокирующую клиентскую библиотеку HTTP и посмотрим, как использовать библиотеки threads и asyncio для конкурентного выполнения веб-запросов.

Библиотека requests — популярная клиентская библиотека HTTP, которую автор описывает словами «HTTP для людей». Библиотека позволяет отправлять HTTP-запросы веб-серверу, как и aiohttp.

После установки всё готово для отправки простых HTTP-запросов. Начнём с парочки запросов к сайту example.com с целью получить код состояния.

# Базовое использование requests
import requests

def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code

url = 'https://www.example.com'
print(get_status_code(url))
print(get_status_code(url))

Здесь мы выполняем два запроса HTTP GET подряд. В ответ должны получить ответы 200. Мы не стали создавать HTTP-сеанс, хотя библиотека их поддерживает, чтобы сохранять куки между запросами. Библиотека requests блокирующая, т. е. каждый запрос requests.get прекращает выполнение Python-кода в потоке до завершения запроса. Это влияет на способы её использования совместно с asyncio. Если попытаться включить функции из неё в сопрограмму или в задачу, то весь цикл событий будет заблокирован до завершения запроса. Если запрос занимает 2 с, то в течение этого времени наше приложение ничего не сможет делать. Поэтому блокирующие операции следует выполнять в отдельном потоке.

Как и для исполнителей пула процессов, библиотека concurrent.futures содержит реализацию абстрактного класса Executor для работы с потоками — ThreadPoolExecutor. Вместо пула рабочих процессов, исполнитель пула потоков создаёт и обслуживает пул потоков, которому можно передавать задания. Если пул процессов по умолчанию создаёт по одному рабочему процессу для каждого имеющегося процессорного ядра, то определение количества рабочих потоков несколько сложнее. Начиная с версии 3.8 и далее, значением по умолчанию является min(32, os.cpu_count() + 4). Из них 5 потоков решают задачи, регулирующие ввод-вывод. Это значит, что максимальное число потоков в пуле равно 32, а минимальное — 5. Верхняя граница задаётся равной 32, чтобы избежать создания неожиданно большого числа потоков на машинах с большим количеством ядер (напомним, что создавать и обслуживать потоки дорого). А нижняя граница равна 5, потому что на машинах с одним-двумя ядрами запуск всего двух потоков вряд ли как-то улучшит производительность. Часто имеет смысл создать немного больше потоков, чем имеется ядер, если предполагается использовать их для ввода-вывода. Например, на 6-ядерной машине, согласно приведённой выше формуле, будет создано 16 потоков (Так как на 1 ядро выделяется 2 потока, в итоге на в процессоре 12 логических процессов + 4 процесса по формуле). Модифицируем прошлый пример так, чтобы он конкурентно выполнял 100 HTTP-запросов, пользуясь пулом потоков. И будем вести хронометраж, чтобы понять, получили ли мы какой-то выигрыш.

import time
import requests
from concurrent.futures import ThreadPoolExecutor

def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code

start = time.time()
with ThreadPoolExecutor() as pool:
    urls = ['https://www.example.com' for _ in range(100)]
    results = pool.map(get_status_code, urls)
    for result in results:
        print(result)
    end = time.time()
    print(f'Выполнение запросов завершено за {end - start:.4f} с')

На 6-ядерной машине с быстрым интернетом этот код выполняется за 9-10 секунд, когда число потоков выбрано по умолчанию. Легко написать синхронную версию и выяснить, есть ли какой-то выигрыш от многопоточности.

start = time.time()
urls = ['https://www.example.com' for _ in range(100)]
for url in urls:
    print(get_status_code(url))
end = time.time()
print(f'Выполнение запросов завершено за {end - start:.4f} с')

Выполнение этого кода заняло 62 с! То есть многопоточный код оказался в 6 раз быстрее синхронного — неплохая прибавка к производительности! Конечно, это улучшение, но посмотрим, как справится с этой задачей aiohttp?

как справиться с этой задачей aiohttp?
import asyncio
import aiohttp
from aiohttp import ClientSession
import time

async def fetch_status(session: ClientSession, url: str) -> int:
    ten_millis = aiohttp.ClientTimeout(total=1)
    async with session.get(url, timeout=ten_millis) as result:
        return result.status

async def main():
    start = time.time()
 
    async with aiohttp.ClientSession() as session:
        urls = ['https://www.example.com' for _ in range(100)]
        requests = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*requests)
        end = time.time()
        print(f'Выполнение запросов завершено за {end - start:.4f} с')
        print(status_codes)

asyncio.run(main())

Данный код выполняется примерно за 1 с. Почему же многопоточная версия оказалась настолько медленнее? Напомним, что максимальное число потоков ограничено числом 32. Это ограничение можно обойти, задав max_workers=100 при создании пула потоков.

with ThreadPoolExecutor(max_workers=100) as pool:
    urls = ['https://www.example.com' for _ in range(100)]
    results = pool.map(get_status_code, urls)
    for result in results:
        print(result)
    end = time.time()
    print(f'Выполнение запросов завершено за {end - start:.4f} с')

Это может дать некоторое улучшение, потому что теперь потоков столько, сколько запросов. Но всё равно мы даже не приблизимся к производительности кода, основанного на сопрограммах. Всё дело в накладных расходах, связанных с потоками. Потоки создаются на уровне операционной системы и обходятся дороже сопрограмм. К тому же у контекстного переключения потоков на уровне ОС тоже есть цена. Сохранение и восстановление состояния потока при контекстном переключении съедает часть выигрыша, полученного от использования потоков.

Решая, сколько потоков нужно в приложении, лучше начать с малого (число ядер плюс ещё немного — хорошая отправная точка), протестировать и постепенно увеличивать количество. Обычно удаётся найти оптимальное число, после которого время работы выходит на плато и может даже уменьшиться, сколько бы потоков ни добавлять. Как правило, это число невелико по сравнению с числом запросов (короче — создавать 100 потоков для отправки 100 запросов, скорее всего, не стоит, только зря потратите ресурсы).

▍ Исполнители пула потоков и asyncio


Использование исполнителей пула потоков в цикле событий asyncio мало чем отличается от использования класса ProcessPoolExecutor. В том-то и заключается прелесть абстрактного базового класса Executor, что мы можем выполнять один и тот же код с процессами или потоками, но только изменить одну строчку. Модифицируем приведённый выше пример выполнения 100 запросов, заменив pool.map на asyncio.gather.

import functools
import requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code

async def main():
    start = time.time()
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        urls = ['https://www.example.com' for _ in range(100)]
        tasks = [loop.run_in_executor(pool, functools.partial(get_status_code, url)) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)
    end = time.time()
    print(f'Выполнение запросов завершено за {end - start:.4f} с')
asyncio.run(main())

Мы создаём пул потоков, как и раньше, но вместо использования map строим список задач, вызывая функцию get_status_code из loop. run_in_executor. Получив список задач, мы можем ждать их завершения с помощью asyncio.gather или любой другой из уже знакомых нам функций asyncio. Под капотом loop.run_in_executor вызывает метод submit исполнителя пула потоков. Это ставит все переданные задачи в очередь. Затем рабочие потоки в пуле могут выбирать задачи из очереди и выполнять их до завершения. Этот подход не даёт никакого выигрыша по сравнению с использованием пула потоков без asyncio, но, пока мы ждём await asyncio.gather, может выполняться другой код.

В документации по asyncio сказано, что параметр executor метода run_in_executor может быть равен None. В этом случае используется исполнитель по умолчанию, ассоциированный с циклом событий. Можно считать, что это допускающий повторное использование синглтонный исполнитель для всего приложения. Исполнитель по умолчанию всегда имеет тип ThreadPoolExecutor, если с помощью метода loop.set_default_executor не было задано иное. Следовательно, мы можем упростить код в прошом примере, как показано ниже.


import functools
import requests
import asyncio
import time

def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code

async def main():
    start = time.time()
    loop = asyncio.get_running_loop()
    urls = ['https://www.example.com' for _ in range(100)]
    tasks = [loop.run_in_executor(None, functools.partial(get_status_code, url)) for url in urls]
    results = await asyncio.gather(*tasks)
    print(results)
    end = time.time()
    print(f'Выполнение запросов завершено за {end - start:.4f} с')

asyncio.run(main())

Здесь мы не создаём собственный экземпляр ThreadPoolExecutor для использования в качестве контекстного менеджера, как раньше, а передаём в качестве исполнителя None. При первом вызове run_in_ executor asyncio создаёт и кеширует исполнитель пула потоков по умолчанию. При последующих вызовах используется уже созданный исполнитель, т. е. он оказывается глобальным относительно цикла событий. Остановка такого пула также отличается от того, что мы видели раньше. Ранее созданный исполнитель пула потоков останавливался в момент выхода из блока with, управляемого контекстным менеджером. Что до исполнителя по умолчанию, то он существует до момента выхода из цикла событий, а это обычно происходит при завершении приложения. Использование исполнителя пула потоков по умолчанию упрощает программу, но нельзя ли пойти ещё дальше?

В версии Python 3.9 появилась сопрограмма asyncio.to_thread, которая ещё больше упрощает передачу работы исполнителю пула потоков по умолчанию. Она принимает функцию, подлежащую выполнению в потоке, и её аргументы. Раньше для передачи аргументов нужно было использовать функцию functools.partial, так что теперь код стал немного чище. Затем эта функция выполняется с переданными аргументами в исполнителе по умолчанию и текущем цикле событий. Это позволяет ещё упростить код. Сопрограмма to_thread устраняет необходимость в functools.partial и asyncio.get_running_ loop, что уменьшает число строк кода.

import time
import requests
import asyncio

def get_status_code(url: str) -> int:
    response = requests.get(url)
    return response.status_code

async def main():
    start = time.time()
    urls = ['https://www.example.com' for _ in range(100)]
    tasks = [asyncio.to_thread(get_status_code, url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(results)
    end = time.time()
    print(f'Выполнение запросов завершено за {end - start:.4f} ')

asyncio.run(main())

▍ Итоги


В этой статье мы погрузились в мир многопоточности и асинхронного программирования на Python, изучая, как модуль threading может значительно улучшить работу с вводом-выводом. Мы рассмотрели, как правильно завершать потоки при остановке приложения, чтобы избежать утечек ресурсов. Кроме того, мы научились использовать пул потоков для распределения задач, что откроет доступ к мощным методам asyncio API, таким как gather, позволяющим дождаться результатов выполнения всех потоков. Наконец, мы увидели, как эффективно интегрировать блокирующие API ввода-вывода, например, из библиотеки requests, в пул потоков, чтобы повысить производительность и сделать наше приложение более отзывчивым. Надеюсь, что эта статья будет полезна не только новичкам, но и продвинутым программистам на Python. Это только первая часть, где было очень много теории. В следующей статье я хотел бы показать на примерах GUI, hashlib и numpy, как можно использовать многопоточность и как избежать состояний гонки с помощью блокировок.

Вдохновлялся книгой: Asyncio и конкурентное программирование на Python

Telegram-канал со скидками, розыгрышами призов и новостями IT ?

Комментарии (2)


  1. Andrey_Solomatin
    14.08.2024 09:48
    +1

    который использует блокирующие библиотеки ввода-вывода, например, requests для HTTP-запросов

    Можно попробовать requests поменять на httpx который поддерживает совместимое API https://www.python-httpx.org/compatibility/, а потом переделывать по асинхронные вызовы.

    У нас в команде мы сразу httpx используем, даже для синхронного кода.


  1. 9982th
    14.08.2024 09:48

    Вдохновлялся книгой: Asyncio и конкурентное программирование на Python

    Честнее было бы повесить плашку "Перевод".