Ранее я представил пару небольших постов о потенциальной роли Spring Boot 2 в реактивном программировании. После этого я получил ряд вопросов о том, как работают асинхронные операции в программировании в целом. Сегодня я хочу разобрать, что такое Non-blocking I/O и как применить это знание для создания небольшого tcp–сервера на python, который сможет обрабатывать множество открытых и тяжелых (долгих) соединений в один поток. Знание python не требуется: все будет предельно просто со множеством комментариев. Приглашаю всех желающих!

Мне, как и многим другим разработчикам, очень нравятся эксперименты, поэтому вся последующая статья будет состоять как раз из серии экспериментов и выводов, которые они несут. Предполагается, что вы недостаточно хорошо знакомы с тематикой, и будете охотно экспериментировать со мной. Исходники примеров можно найти на github.

Начнем с написания очень простого tcp–сервера. Задача сервера будет заключаться в получении и печати данных из сокета и возвращения строки Hello from server!. Примерно так это выглядит:

Синхронный tcp-сервер
import socket

# Задаем адрес сервера
SERVER_ADDRESS = ('localhost', 8686)

# Настраиваем сокет
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(SERVER_ADDRESS)
server_socket.listen(10)
print('server is running, please, press ctrl+c to stop')

# Слушаем запросы
while True:
    connection, address = server_socket.accept()
    print("new connection from {address}".format(address=address))

    data = connection.recv(1024)
    print(str(data))

    connection.send(bytes('Hello from server!', encoding='UTF-8'))

    connection.close()


Здесь все довольно просто. Если вы не знакомы с понятием сокета, то вот очень простая и практическая статья. Мы создаем сокет, ловим входящие соединения и обрабатываем их согласно заданной логике. Здесь стоит обратить внимание на сообщения. При создании нового соединения с клиентом мы пишем об этом в консоль.

Хочу сразу заметить, что не стоит серьезно вникать в листинги программ до полного прочтения статьи. Совершенно нормально, если что-то будет не совсем понятно в самом начале. Просто продолжайте чтение.

Нет особого смысла в сервере без клиента. Поэтому на следующем этапе необходимо написать небольшой клиент для использования сервера:

tcp-клиент
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket

MAX_CONNECTIONS = 20
address_to_server = ('localhost', 8686)

clients = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(MAX_CONNECTIONS)]
for client in clients:
    client.connect(address_to_server)

for i in range(MAX_CONNECTIONS):
    clients[i].send(bytes("hello from client number " + str(i), encoding='UTF-8'))

for client in clients:
    data = client.recv(1024)
    print(str(data))


Важной особенностью здесь является тот факт, что мы в начале устанавливаем максимально возможное количество соединений, а только потом используем их для передачи/хранения данных.

Давайте запустим сервер. Первое, что мы видим:

server is running, please, press ctrl+c to stop

Это означает, что мы успешно запустили наш сервер и он готов принимать входящие запросы. Запустим клиент и сразу увидим в консоли сервера (у вас порты могут быть другими):

server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 39196)
b'hello from client number 0'
new connection from ('127.0.0.1', 39198)
b'hello from client number 1'
...

Что и следовало ожидать. В бесконечном цикле мы получаем новое соединение и сразу же обрабатываем данные из него. В чем тут проблема? Ранее мы использовали опцию server_socket.listen(10) для настройки сервера. Она означает максимальный размер очереди из еще не принятых подключений. Но в этом нет совершенно никакого смысла, ведь мы принимаем по одному соединению. Что предпринять в такой ситуации? На самом деле, существует несколько выходов.

  1. Распараллелить с помощью потоков/процессов (для этого можно, например, использовать fork или пул). Подробнее здесь.
  2. Обрабатывать запросы не по мере их подключения к серверу, а по мере наполнения этих соединений нужным количеством данных. Проще говоря, мы можем открыть сразу максимальное количество ресурсов и читать из них столько, сколько сможем (сколько необходимо на это процессорного времени в идеальном случае).

Вторая идея кажется заманчивой. Всего один поток и обработка множества соединений. Давайте посмотрим, как это будет выглядеть. Не стоит пугаться обилия кода. Если что-то сразу не понятно, то это вполне нормально. Можно попробовать запустить у себя и подебажить:

Асинхронный сервер

import select
import socket

SERVER_ADDRESS = ('localhost', 8686)

# Говорит о том, сколько дескрипторов единовременно могут быть открыты
MAX_CONNECTIONS = 10

# Откуда и куда записывать информацию
INPUTS = list()
OUTPUTS = list()


def get_non_blocking_server_socket():

    # Создаем сокет, который работает без блокирования основного потока
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(0)

    # Биндим сервер на нужный адрес и порт
    server.bind(SERVER_ADDRESS)

    # Установка максимального количество подключений
    server.listen(MAX_CONNECTIONS)

    return server


def handle_readables(readables, server):
    """
    Обработка появления событий на входах
    """
    for resource in readables:

        # Если событие исходит от серверного сокета, то мы получаем новое подключение
        if resource is server:
            connection, client_address = resource.accept()
            connection.setblocking(0)
            INPUTS.append(connection)
            print("new connection from {address}".format(address=client_address))

        # Если событие исходит не от серверного сокета, но сработало прерывание на наполнение входного буффера
        else:
            data = ""
            try:
                data = resource.recv(1024)

            # Если сокет был закрыт на другой стороне
            except ConnectionResetError:
                pass

            if data:

                # Вывод полученных данных на консоль
                print("getting data: {data}".format(data=str(data)))

                # Говорим о том, что мы будем еще и писать в данный сокет
                if resource not in OUTPUTS:
                    OUTPUTS.append(resource)

            # Если данных нет, но событие сработало, то ОС нам отправляет флаг о полном прочтении ресурса и его закрытии
            else:

                # Очищаем данные о ресурсе и закрываем дескриптор
                clear_resource(resource)


def clear_resource(resource):
    """
    Метод очистки ресурсов использования сокета
    """
    if resource in OUTPUTS:
        OUTPUTS.remove(resource)
    if resource in INPUTS:
        INPUTS.remove(resource)
    resource.close()

    print('closing connection ' + str(resource))


def handle_writables(writables):

    # Данное событие возникает когда в буффере на запись освобождается место
    for resource in writables:
        try:
            resource.send(bytes('Hello from server!', encoding='UTF-8'))
        except OSError:
            clear_resource(resource)


if __name__ == '__main__':

    # Создаем серверный сокет без блокирования основного потока в ожидании подключения
    server_socket = get_non_blocking_server_socket()
    INPUTS.append(server_socket)

    print("server is running, please, press ctrl+c to stop")
    try:
        while INPUTS:
            readables, writables, exceptional = select.select(INPUTS, OUTPUTS, INPUTS)
            handle_readables(readables, server_socket)
            handle_writables(writables)
    except KeyboardInterrupt:
        clear_resource(server_socket)
        print("Server stopped! Thank you for using!")


Давайте запустим наш новый сервер и посмотрим на консоль:
Вывод асинхронного сервера
server is running, please, press ctrl+c to stop
new connection from ('127.0.0.1', 56608)
new connection from ('127.0.0.1', 56610)
new connection from ('127.0.0.1', 56612)
new connection from ('127.0.0.1', 56614)
new connection from ('127.0.0.1', 56616)
new connection from ('127.0.0.1', 56618)
new connection from ('127.0.0.1', 56620)
new connection from ('127.0.0.1', 56622)
new connection from ('127.0.0.1', 56624)
getting data: b'hello from client number 0'
new connection from ('127.0.0.1', 56626)
getting data: b'hello from client number 1'
getting data: b'hello from client number 2'


Как можно понять по выводу, мы принимаем новые коннекты и данные почти параллельно. Более того, мы не ждем данных от нового подключения. Вместо этого мы устанавливаем новое.

Как это работает?


Дело в том, что все наши операции с ресурсами (а обращение к сокету относится к этой категории) происходят через системные вызовы операционной системы. Если говорить коротко, то системные вызовы представляют собой обращение к API операционной системы.

Рассмотрим, что происходит в первом случае и во втором.

Синхронный вызов


Давайте разберем рисунок:



Первая стрелка показывает, что наше приложение обращается к операционной системе для получения данных из ресурса. Далее наша программа блокируется до появления нужного события. Минус очевиден: если у нас один поток, то другие пользователи должны ждать обработку текущего.

Асинхронный вызов


Теперь посмотрим на рисунок, который иллюстрирует асинхронный вызов:



Первая стрелка, как и в первом случае, делает запрос к ОС (операционной системе) на получение данных из ресурсов. Но посмотрите, что происходит далее. Мы не ждем данных из ресурса и продолжаем работу. Что же делать нам? Мы отдали распоряжение ОС и не ждем результат сразу. Простейшим ответом будет самостоятельно опрашивать нашу систему на наличие данных. Таким образом, мы сможем утилизировать ресурсы и не блокировать наш поток.

Но самом деле такая система не является практичной. Такое состояние, в котором мы постоянно смотрим на данные и ждем какого-то события, называется активным ожиданием. Минус очевиден: мы впустую тратим процессорное время в случае, когда информация не обновилась. Более правильным решением было бы оставить блокировку, но сделать ее «умной». Наш поток не просто ждет какого-то определенного события. Вместо этого он ожидает любое изменение данных в нашей программе. Именно так и работает функция select в нашем асинхронном сервере:



Теперь можно вернуться к реализации нашего асинхронного сервера и взглянуть на него с новым знанием. Первое, что бросается в глаза, – метод работы. Если в первом случае наша программа выполнялась “сверху вниз”, то во втором случае мы оперируем событиями. Такой подход в разработке ПО называется событийно-ориентированным (event-driven development).

Сразу стоит отметить, что такой подход не является серебряной пулей. У него имеется масса недостатков. Во-первых, такой код сложнее поддерживать и менять. Во-вторых, у нас есть и всегда будут блокирующие вызовы, которые все портят. Например, в программе выше мы воспользовались функцией print. Дело в том, что такая функция тоже обращается к ОС, следовательно, наш поток выполнения блокируется и другие источники данных терпеливо ждут.

Заключение


Выбор подхода напрямую зависит от решаемой нами задачи. Позвольте задаче самой выбрать наиболее продуктивный подход. Например, популярный Javaвеб -сервер Tomcat использует потоки. Не менее популярный сервер Nginx использует асинхронный подход. Создатели популярного веб-сервера gunicorn на python пошли по пути prefork.

Спасибо, что прочитали статью до конца! В следующий раз (уже скоро) я расскажу о прочих возможных неблокирующих ситуациях в жизни наших программ. Буду рад вас видеть и в следующих постах.

Комментарии (13)


  1. andreymal
    14.05.2018 13:23

    Не увидел в посте слова asyncio, а жаль (раз уж речь о python)


    1. TheDeadOne
      14.05.2018 13:59

      Asyncio — лишь один из способов добиться асинхронности.


      1. andreymal
        14.05.2018 14:04

        Конечно, но пост оформлен так, что неопытные читатели могут подумать, что все пишут асинхронный код как в этом посте, и получается не очень хорошо


  1. dev_random
    14.05.2018 16:17

    Ранее мы использовали опцию server_socket.listen(10) для настройки сервера. Она означает ограничение на количество единовременных подключений к серверному сокету.

    Это не совсем так. Этот параметр означает максимальный размер очереди из еще не принятых подлючений (ожидающих соответствующего server_socket.accept()). Количество одновременно-активных подключений ограничено совсем другими факторами.
    docs.python.org/3/library/socket.html#socket.socket.listen


    1. faoxy Автор
      14.05.2018 16:17
      +2

      Отличное замечание! Спасибо, сейчас поправлю.


  1. klirichek
    14.05.2018 16:40

    Select?
    А разве для реальной работы (помимо учебных примеров) его ещё используют?
    На крайний случай сразу бы уж poll. Оно хоть тоже не без недостатков, но с него уже проще двигаться в сторону современных epoll/kqueue или iocp (если под виндой).


    1. faoxy Автор
      15.05.2018 09:42

      Спасибо! Хорошее замечание. Действительно, select уже устарел. Но он до сих пор часто используется в качестве примеров в других источниках (вероятно из-за своей простоты). Я не стал отходить от традиций.


      1. TheDeadOne
        15.05.2018 14:58

        Используется он из-за переносимости, select есть во всех операционных системах. Если бы ваш пример использовал poll, запустить его в Windows было бы невозможно.


  1. WitNt
    14.05.2018 16:43

    Я не понял, в чём здесь новизна? Что здесь такого особенного? Ну сервер, ну асинхронный…
    Такого добра здесь уже написано много.


    1. faoxy Автор
      15.05.2018 09:38

      Как и многие другие источники(книги, статьи, туториалы и т.д.) данная статья представляет собой компиляцию уже существующих идей и собственного видения с опытом. Я не ставил целью открыть колесо. Мне просто хотелось поделиться существующими знаниями на случай, если кому-то это пригодится.
      Мне кажется большая вариация статей на одну тему — не так уж и плохо. Посмотрите какое количество учебников, статей, видео курсов по математическому анализу существует. Тем не менее до сих есть люди, которые пишут новые. Мне кажется, это отлично.


      1. WitNt
        15.05.2018 10:17

        Не буду спорить. Соглашусь.


  1. x86128
    14.05.2018 17:08

    Если запросы к асинхронному серверу не сильно CPU-тяжелые то такую задачу можно сделать на nodejs.

    На CoffeScript будет выглядеть так:

    net = require 'net'
    
    server = net.createServer (socket) ->
        [addr, port] = [socket.remoteAddress, socket.remotePort]
        console.log "Client from #{addr}:#{port} connected."
    
        talk = ->
            socket.end('Bye!')
    
        socket.on "data", (data) ->
            console.log "Received from #{addr}: #{data.toString()}"
            setTimeout talk, Math.random() * 2000
        
        socket.on "end", ->
            console.log "Client from #{addr}:#{port} disconnected"
        
        socket.on "error", (e) ->
            console.log e.toString()
    
    server.listen 8080, ->
        console.log "Listening on #{server.address().port} ..."
    



    1. faoxy Автор
      14.05.2018 17:08
      +1

      Спасибо!