В прошлый раз мы рассмотрели, что такое синхронное программирование, и с какими проблемами с ним сталкивается разработчик. На примере простого сервера с блокирующими сокетами мы увидели, что в синхронно выполняющейся программе все инструкции выполняются строго по очереди, и если встречается системный вызов ввода-вывода, то он может полностью остановить выполнение программы на довольно продолжительное время — пока не завершится. Весь этот период процессор простаивает в ожидании, хотя мог бы выполнять другие задачи, которых накапливается немало. В результате сервер одновременно может обрабатывать только одно подключение. Чтобы перейти ко второму, предыдущее должно быть закрыто.

Решить проблему многозадачности можно стандартными средствами вытесняющей многозадачности: процессами или потоками. Но тут разработчик сталкивается с достаточно серьезными трудностями. Процессы требуют дополнительных ресурсов на свое обслуживание, а потому невыгодны. А потоки влекут за собой множество трудноотлавливаемых и сложновоспроизводимых багов, из-за чего требуется долгая и кропотливая дополнительная работа по синхронизации потоков. В результате мы становимся перед выбором: или дополнительные расходы на железо, или дополнительные расходы на программистов.

Но, к счастью, существует и третий вариант — кооперативная многозадачность с помощью системного вызова select и его аналогов (poll, epoll и других). Он позволяет мультеплексировать несколько задач в одном потоке выполнения и в сущности является обычной синхронной программой. А потому никаких дополнительных трат процессорного времени и времени разработчиков не требуется.

I. Вытесняющая многозадачность:

1. Процессы (выполняются в одной ОС).

2. Потоки (выполняются в одном процессе).

II. Кооперативная многозадачность:

3. Сопрограммы (выполняются в одном потоке).

Кооперативная многозадачность для сокетов
Кооперативная многозадачность для сокетов

Суть данного способа в том, что когда синхронная программа достигает запроса системы ввода-вывода, мы не ждем, когда данный запрос завершится, а спрашиваем у системы: а какие есть другие уже завершившиеся запросы? Системный вызов select как раз и возвращает список готовых к обработке дескрипторов ввода-вывода (файлов, сокетов и других). Поэтому вместо пустого ожидания мы просто переключаемся на выполнение другой подпрограммы.

Со временем поверх вызова select программисты для собственного удобства сооружали все более сложные системы. Так постепенно они дошли до колбек-функций (callback), а потом и до сопрограмм (coroutine) и асинхронного (asynchronous) программирования. Всю эту эволюцию нам и предстоит проследить в данном материале. Ведь чтобы писать грамотные программы, нужно хорошо себе представлять, как все там внутри устроено, и как получается вся эта магия.

Но перед тем рассмотрим еще более простой способ, который позволяет не применять select. Начнем разбор с использования неблокирующих сокетов.

Неблокирующие сокеты

Как известно, когда у сокетов вызывается метод accept() или recv(), выполнение программы останавливается до тех пор, пока в систему ввода-вывода не придут необходимые данные из сети (пока не подсоединится новый клиент — в случае accept(), или не придут новые пакеты с данными для recv()). А все потому, что вызовы этих методов блокирующие.

Но, к счастью, сокеты допускают переключение в неблокирующий режим работы. Для этого нужно перед первым использованием сокета вызвать у него метод setblocking(False). Тогда ранее блокирующие методы будут сразу или возвращать данные, если они есть в наличии, или генерировать исключение BlockingIOError, если их нет. Поэтому мы в бесконечном цикле можем опрашивать все соединения по очереди на предмет готовности данных. Если возникает исключение BlockingIOError, мы просто переходим к следующему сокету:

import socket

HOST, PORT = "", 50007
connections = []
if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        serv_sock.setblocking(False)  # Important!
        while True:
            try:
                # print("Try to accept a new connection...")
                sock, addr = serv_sock.accept()
                sock.setblocking(False)
                print("Connected by", addr)
                connections.append((sock, addr))
            except BlockingIOError:
                # print("No connections are waiting to be accepted")
                pass
            for sock, addr in connections.copy():
                print("Try to receive data from:", sock, addr)
                try:
                    data = sock.recv(1024)
                except ConnectionError:
                    print(f"Client suddenly closed while receiving from {addr}")
                    connections.remove((sock, addr))
                    sock.close()
                    continue
                except BlockingIOError:
                    # No data received
                    continue
                print(f"Received: {data} from: {addr}")
                if not data:
                    connections.remove((sock, addr))
                    sock.close()
                    print("Disconnected by", addr)
                    continue
                data = data.upper()
                print(f"Send: {data} to: {addr}")
                try:
                    sock.sendall(data)
                except ConnectionError:
                    print(f"Client suddenly closed, cannot send to {addr}")
                    connections.remove((sock, addr))
                    sock.close()
                    continue

Не трудно догадаться, что такое решение будет полностью загружать процессор, как и всякий другой бесконечный цикл, вне зависимости от того, есть там что обрабатывать или нет. При этом чаще всего данные будут поступать медленнее, чем будет проходить полный цикл опроса всех клиентских сокетов. А значит, большая часть кода будет исполняться впустую — sock.recv()BlockingIOErrorsock.recv()BlockingIOError и т.д.

Также, если первый сокет в списке готов, а мы сейчас опрашиваем только второй, то придется пройтись по всему списку неготовых сокетов, пока мы дойдем до первого — реально готового соединения.

Я уже не говорю про затраты времени на обработку исключений, которые являются неотъемлемой составляющей данного способа. Даже если один из сокетов был бы всегда готов, когда бы его не спросили, то прежде, чем к нему попасть, пришлось бы поймать и обработать множество исключений BlockingIOError от других сокетов.

Вот бы каким-нибудь образом опрашивать только те сокеты, про которых достоверно известно, что данные для них уже пришли и готовы для обработки, чтобы не перебирать все сокеты подряд. Это сэкономило бы нам немало процессорного времени. Специально для таких случаев в ОС и создали системный вызов select.

Системный вызов select

Чтобы не вызывать sock.accept() и sock.recv() наудачу в надежде, что они вернут данные, был придуман системный вызов select. Он всегда возвращает список только готовых сокетов, а потому, когда мы вызываем sock.accept() или sock.recv() мы точно знаем, что выполнение программы не остановится, и результат будет возвращен сразу.

Метод не заблокирует выполнение, даже если setblocking(True), и не возбудит исключение BlockingIOError, если setblocking(False). Поэтому нам вообще не важно, блокирующие или неблокирующие сокеты мы используем.

О системных вызовах

Что такое select? Select, сокеты, процессы, потоки, получение текущего времени, вывод на экран и многое другое — все это части операционной системы (ОС). ОС предоставляет своим прикладным программам API (программный интерфейс) для вызова этих функций. Он представляет собой совокупность системных вызовов (system calls, или syscalls). Если программе нужно что-то помимо процессора или оперативной памяти (послать данные по сети или отобразить что-то на экране), то она всегда за этим должна обращаться к ОС. Да и многие сложные операции по управлению памятью и процессором также требуют обращения к системным вызовам.

Чтобы лучше себе представлять как они работают, приведем один пример на языке ассемблера и проследим, как он эволюционирует до вызова функции в Python.

Каждому системному вызову присвоен уникальный код, по которому их можно отличать друг от друга. Также у них может быть несколько параметров. Код обычно помещается в аккумулятор процессора, а для параметров отводятся другие заранее условленные регистры. После того, как все регистры установлены осуществляется вызов прерывания 80h (int 0x80), инструкции sysenter (для архитектуры i386) или инструкции syscall (архитектура x86-64). Ассемблерный код для x86-64 выглядит примерно так (подробнее):

  mov rax,function_number
  mov rdi,param_1  ; если есть
  mov rsi,param_2  ; если есть
  mov rdx,param_3  ; если есть
  mov r10,param_4  ; если есть
  mov r8,param_5   ; если есть
  mov r9,param_6   ; если есть
  syscall  ; вызов! (регистры RCX и R11 будут уничтожены)

Набор кодов системных вызовов и их параметров (каким регистрам процессора они соответствуют и какие значения могут принимать) вместе образуют программный интерфейс (API) операционной системы. Чтобы не запоминать всех этих деталей, которые к тому же могут отличаться для разных архитектур и систем, для каждого системного вызова пишется отдельная функция на C. C является кроссплатформенным языком. А это значит, что на нем можно написать программу один раз, а потом компилировать ее под разные платформы. Для каждой платформы функция для системного вызова может иметь разную внутреннюю реализацию, но сигнатура функции, ее внешний интерфейс будет для всех система одинакова.

Набор таких функций объединяются в библиотеку, которая становится частью ОС. Так появляется стандартная библиотека C, служащая программным интерфейсом к ОС. А так как Python и большинство его модулей написаны на C, то весь питоновский код в конце концов заканчивается вызовом си-шных функций. В том числе и функций стандартной библиотеки. Поэтому, когда мы в Python вызываем sock.recv() или select(), то это значит, что будет вызвана соответствующая функция стандартной библиотеки C. Она, в свою очередь, заполнит нужные регистры подходящими значениями и вызовет соответствующую инструкцию процессора для данной архитектуры (int 0x80, sysenter, syscall). Результаты вызова будут помещены операционной системой также в регистры.

Вот как работает механизм системных вызовов. Теперь вы имеете общее представление о том, что происходит при вызове из Python функции select(). Теперь рассмотрим подробнее саму эту функцию.

Системный вызов select — это такой вызов, которые принимает в качестве параметров список дескрипторов потоков ввода-вывода, которые нужно проанализировать, а на выходе возвращает список тех из них, которые готовы к работе.

Каждый дескриптор — это уникальное целое число, которое возвращает система при открытии файла, сокета или другого потока ввода-вывода. С помощью этого числа потоки можно отличать друг от друга. Класс socket в Python — это просто удобная обертка вокруг такого числа-дескриптора.

Если выразиться проще, то передавая в select() список всех сокетов, мы на выходе получим список тех из них, которые готовы к обработке. Вот и все. Вот как изменится наша программа, если мы добавим в нее select():

import select
import socket

def handle(sock, addr):
    try:
        data = sock.recv(1024)  # Should be ready
    except ConnectionError:
        print(f"Client suddenly closed while receiving")
        return False
    print(f"Received {data} from: {addr}")
    if not data:
        print("Disconnected by", addr)
        return False
    data = data.upper()
    print(f"Send: {data} to: {addr}")
    try:
        sock.send(data)  # Hope it won't block
    except ConnectionError:
        print(f"Client suddenly closed, cannot send")
        return False
    return True

HOST, PORT = "", 50007
if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        # serv_sock.setblocking(False)
        inputs = [serv_sock]
        outputs = []
        while True:
            print("Waiting for connections or data...")
            readable, writeable, exceptional = select.select(inputs, outputs, inputs)
            for sock in readable:
                if sock == serv_sock:
                    sock, addr = serv_sock.accept()  # Should be ready
                    print("Connected by", addr)
                    # sock.setblocking(False)
                    inputs.append(sock)
                else:
                	addr = sock.getpeername()
                    if not handle(sock, addr):
                    	# Disconnected
                        inputs.remove(sock)
                        if sock in outputs:
                            outputs.remove(sock)
                        sock.close()

Когда выполнение программы достигается вызова select, программа останавливается до тех пор, пока один из сокетов в списке не получит данные и не перейдет в состояние готовности. Тогда select() возвратит этот сокет и он поступит на обработку. Если этот сокет серверный, то вызывается метод accept(), если клиентский — то recv(). Обработка клиентского сокета была вынесена для большей ясности и чистоты кода в функцию handle(). Данный способ еще допускает написание всего сервера без единой функции, но позже без функций будет уже не обойтись.

Для простоты и понятности примеров мы не будем проверять сокеты на готовность к записи (writeable). Выходной сетевой буфер обычно всегда свободен, и любой здоровый сокет вернется как готовый к записи. Хотя в реальном production-коде, конечно же, необходимо предусмотреть и такой случай.

Таким образом, множество блокирующих вызовов сокетов мы заменили лишь одним блокирующим вызовом — select. В результате этого процессорное время равномерно распределяется между всеми соединениями. Это и называется многозадачность — многозадачность через мультиплексирование. При этом все выполняется в одном потоке и абсолютно синхронным образом. Поэтому никакой дополнительной синхронизации, как в случае разных потоков, не требуется.

Использование колбек-функций

На разных системах существуют и другие альтернативы select — poll, epoll, devpoll, kqueue — использование которых для тех или иных случаев может быть выгоднее, чем select. Но API и способ вызова у них у всех может отличаться. Поэтому, чтобы унифицировать процесс получения готовых к работе дескрипторов ввода-вывода в Python был создан модуль selectors, в котором содержатся разные имплементации единого интерфейса селекторов.

Каждый класс селектора имеет три основных метода. С помощью методов register() и unregister() можно добавлять или удалять дескриптор (в том числе и сокет) из списка активных, а метод select() выбирает из этого списка массив готовых к обработке:

if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        # serv_sock.setblocking(False)
        sel = selectors.DefaultSelector()
        sel.register(serv_sock, selectors.EVENT_READ)
        while True:
            print("Waiting for connections or data...")
            events = sel.select()
            for key, mask in events:
                sock = key.fileobj
                if sock == serv_sock:
                    sock, addr = serv_sock.accept()  # Should be ready
                    print("Connected by", addr)
                    # sock.setblocking(False)
                    sel.register(sock, selectors.EVENT_READ)
                else:
                	addr = sock.getpeername()
                    if not handle(sock, addr):
                    	# Disconnected
                        sel.unregister(sock)
                        sock.close()
                        continue

На первый взгляд все то же самое, и мы не получили ни одного преимущества от рефакторинга кроме того, что можно перейти к другому методу выбора (poll, epoll) заменой всего одной строчки: sel = selectors.DefaultSelector().

Но это не совсем так. Дело в том, что у метода register() есть еще третий параметр — data — который передается вместе с сокетом в объекте key. И сюда можно поместить любое значение, в том числе и функцию. Так мы, регистрируя сокет, можем тут же задать и колбек, который его будет обрабатывать:

def on_accept_ready(sel, serv_sock, mask):
    sock, addr = serv_sock.accept()  # Should be ready
    print("Connected by", addr)
    # sock.setblocking(False)
    sel.register(sock, selectors.EVENT_READ, on_read_ready)

def on_read_ready(sel, sock, mask):
	addr = sock.getpeername()
    if not handle(sock, addr):
        sel.unregister(sock)
        sock.close()

if __name__ == "__main__":
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((HOST, PORT))
        serv_sock.listen(1)
        # sock.setblocking(False)
        sel = selectors.DefaultSelector()
        sel.register(serv_sock, selectors.EVENT_READ, on_accept_ready)
        while True:
            print("Waiting for connections or data...")
            events = sel.select()
            for key, mask in events:
                callback = key.data
                callback(sel, key.fileobj, mask)

Тут мы уже вынуждены вынести код обработки нового соединения из общего цикла в отдельную функцию: on_read_ready(). В результате код основного цикла работы приложения становится полностью стандартным. Как и код создания серверного сокета. Их можно объединить в функцию вроде run_server(host, port, on_connect, on_read), вынести ее в библиотеку и использовать в других проектах. Весь кастомный код приложения выносится в колбеки, а в run_server() остается только стандартная функциональность, которая всегда одинакова:

def on_connect(sock, addr):
    print("Connected by", addr)

def on_disconnect(sock, addr):
    print("Disconnected by", addr)

def run_server(host, port, on_connect, on_read, on_disconnect):
    def on_accept_ready(sel, serv_sock, mask):
        sock, addr = serv_sock.accept()  # Should be ready
        # sock.setblocking(False)
        sel.register(sock, selectors.EVENT_READ, on_read_ready)
        if on_connect:
            on_connect(sock, addr)

    def on_read_ready(sel, sock, mask):
        addr = sock.getpeername()
        if not on_read or not on_read(sock, addr):
            if on_disconnect:
                on_disconnect(sock, addr)
            sel.unregister(sock)
            sock.close()

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv_sock:
        serv_sock.bind((host, port))
        serv_sock.listen(1)
        # sock.setblocking(False)
        sel = selectors.DefaultSelector()
        sel.register(serv_sock, selectors.EVENT_READ, on_accept_ready)
        while True:
            print("Waiting for connections or data...")
            events = sel.select()
            for key, mask in events:
                callback = key.data
                callback(sel, key.fileobj, mask)

HOST, PORT = "", 50007

if __name__ == "__main__":
    run_server(HOST, PORT, on_connect, handle, on_disconnect)

Вот так нами был создан простой асинхронный серверный движок на колбеках. Данный движок позволяет нам вообще абстрагироваться от того, как реализована асинхронность. Мы просто пишем функции-обработчики для событий: connect, read, disconnect.

Хотя данная система — уже большой прогресс, по сравнению с предыдущими версиями, но она все же не лишена серьезных недостатков. Во-первых, из колбеков нельзя вызывать блокирующие функции, кроме единственного вызова recv() в on_read, иначе все выполнение программы остановится.

Во-вторых, если мы начинаем использовать колбеки, то рано или поздно мы начинаем использовать вложенные колбеки, а для них еще вложенные и так далее. Рано или поздно это приводит к такому явлению, как callback hell. Это когда код становится таким запутанным из-за многочисленных вложений, что проще застрелится, чем его поддерживать.

Обе проблемы решаются переходом к сопрограммам (coroutines). Именно сопрограммы позволяют писать асинхронный код так, как будто он синхронный. То есть без колбеков и вложенных функций. Но прежде, чем начать, нелишним будет узнать, как в Python появились сопрограммы, и из чего они эволюционировали. Чем мы и займемся в следующий раз.

Исходники

< Назад | Начало | Вперед >

Комментарии (4)


  1. Sabirman
    11.07.2022 23:28

    Мне интересно, насколько имеет смысл использовать асинхронность в Python ? - большинство библиотек здесь синхронны. Кроме того, не являются асинхронными ни СУБД, ни "жёсткие" диски.


    1. kai3341
      12.07.2022 00:28

      Вопрос из разряда "есть ли смысл дышать?". Вас никто не заставляет, не дышите. А грамотное использование asyncio даёт дикий буст производительности приложения, работающего с IO, будь то приём или отправка любых сетевых запросов (включая БД) и остальное по мелочи


    1. Yuribtr
      12.07.2022 10:04

      Асинхронность удобна прежде всего для работы с сетью. Например если вам надо в какой то момент времени послать несколько независимых друг от друга запросов - конкурентное выполнение кода ускорит выполнение запросов в разы. Такое часто применяется в микросервисных архитектурах, при скраппинге сайтов.
      Более того, в SqlAlchemy уже имеется поддержка асинхронных драйверов PostgreSQL, MySQL, SQLite. Так что все зависит от характера задачи и ее масштаба.


    1. Yuribtr
      12.07.2022 11:51

      Кстати уже есть попытки организации асинхронного доступа к файлам. Хотя действительно стоит признать что готового решения еще нет.