Всем привет!

Меня зовут Тарас, я автор библиотеки picows — ультрабыстрых вебсокетов для asyncio. В этой статье я расскажу, почему вообще появилась ещё одна библиотека для веб-сокетов, покажу результаты бенчмарков и заодно порассуждаю о производительности в asyncio.

Предыстория

В далёком-предалёком 2021 году мне довелось поучаствовать в разработке алготрейдинг-платформы для криптовалютных бирж. Выбор языка пал на Python из-за разнообразия ML-библиотек, возможность быстро собирать прототипы и проверять идеи, отсутствия этапа компиляции и в целом наличия богатой экосистемы. Если какая-то идея взлетит, критичный участок всегда можно оптимизировать, хотя бы частично переписав его на C/C++/Cython.

Одна из первых задач — подключиться к WebSocket API крипто-биржи и подписаться на рыночные данные в реальном времени: обычно это сделки и обновления торговой книги.

Когда рынок тихий, таких обновлений приходит немного: скажем, до 20 в секунду на одну торговую пару. Но когда начинается движение, счёт уже идёт на тысячи. Часто, чтобы принимать более точные решения в торговой стратегии, нужно получать обновления сразу по нескольким инструментам и с нескольких бирж. В таком режиме количество входящих сообщений, которые нужно обработать, запросто переваливает за 10000, а то и за 20000 в секунду. Сами сообщения при этом обычно небольшие: JSON размером порядка 200 байт.

Я взял вебсокеты из aiohttp и довольно быстро понял, что библиотека не справляется. Даже если вообще ничего не делать со входящими сообщениями, они скапливаются во внутренней очереди aiohttp, а в тяжёлых случаях RCVBUF сокета успевает переполниться и возникает back-pressure. Задержка между приёмом данных из сети и доставкой их в логику приложения быстро растёт. А ведь эти данные с биржи потом ещё нужно разобрать и обработать.

Код выглядел примерно так:

from aiohttp import ClientSession

async def run():
    async with ClientSession() as session:
        async with session.ws_connect("wss://какая-то.биржа/btc_usd", ssl_context=ssl_context) as ws:
            async for msg in ws:
                if msg.type == WSMsgType.TEXT:
                    # Разбор и обработка содержимого сообщения
                    # msg.data: str
                    ...

                    # Когда нужно что-то отправить
                    await ws.send_bytes(data)

Каких-то улучшений и оптимизации тут особо и не сделаешь.

Задержки в алготрейдинге — это очень плохо. Нет ничего хуже торговой стратегии, которая лагает. Даже 10 миллисекунд — это уже серьёзный лаг и, как следствие, быстро потерянные деньги. С aiohttp задержка вполне доходила и до 100 мс.

Другая популярная библиотека, websockets, имея похожий async интерфейс, оказалась ещё медленнее. При этом именно websockets на тот момент активно рекламировала себя как самую быструю современную async-библиотеку, но никаких конкретных бенчмарков и сравнений не предоставлялось.

Понять причину задержек можно погрузившись в детали реализации этих библиотек. Но прежде чем это сделать, напомню как вообще устроен протокол вебсокетов. Он довольно простой.

Протокол вебсокетов (совсем коротко)

На сетевом уровне отправляются вебсокет-фреймы (frame). Каждый фрейм состоит из заголовка (header) и полезной нагрузки (payload). Длина заголовка — от 2 до 14 байт. В заголовке указаны:

  • тип фрейма (TEXT | BINARY | PING | PONG | CLOSE | CONTINUATION)

  • несколько флагов (fin, rsv1, rsv2, rsv3)

  • длина payload в байтах

  • опциональная xor-маска для payload

Пользовательские сообщения (message) могут состоять из нескольких фреймов. Чтобы получить сообщение, payload фреймов конкатенируется. Контрольные сообщения PING/PONG/CLOSE всегда отправляются одним фреймом.

В подавляющем большинстве случаев серверы и клиенты отправляют пользовательские сообщения одним фреймом: так проще и эффективнее. Максимальный размер фрейма — 2^63 байт, то есть, по сути, ограничений на размер нет. Флаг fin из заголовка сигнализирует об окончании сообщения:

WSFrame(frame_type=<TEXT|BINARY|PING|PONG|CLOSE>, fin=True, <msg>)

Если же сообщение разбивается на несколько фреймов, то порядок выглядит так:

WSFrame(frame_type=<TEXT|BINARY>, fin=False, <msg part1>)
WSFrame(frame_type=CONTINUATION, fin=False, <msg part2>)
WSFrame(frame_type=CONTINUATION, fin=True, <msg part3>)

Детали реализации aiohttp и websockets

При приёме данных aiohttp и websockets делают примерно следующее:

  • сырые данные приходят в виде объектов bytes в asyncio.Protocol.data_received

  • парсер фреймов копирует их, накапливая в своём буфере

  • парсер разбирает заголовок фрейма

  • если фрейм приехал полностью, то payload копируется из буфера, а буфер очищается

  • фреймы объединяются в сообщения, и данные могут копироваться повторно

  • если тип сообщения TEXT, данные могут дополнительно копироваться и конвертироваться в объект str

  • сообщения добавляются во внутреннюю очередь (аналог asyncio.Queue)

  • asyncio, когда больше не остаётся IO-операций с сокетами, пробуждает пользовательскую корутину и передаёт ей готовое сообщение

Проблем тут несколько:

  • пользовательская корутина просыпается через неопределённо долгое количество времени

  • после чтения из сокета данные копируются несколько раз

  • парсер фреймов и другой код, связанный с конкатенацией фреймов, очередью и контролем потока (flow control), написаны на Python и тоже не слишком эффективны. Хотя конкретно в aiohttp парсер в какой-то момент переписали на C

Отправка данных и построение фреймов тоже не обходятся без копирования.

Например, вы хотите отправить сообщение размером в 1 МБ. И aiohttp, и websockets в лучшем случае делают следующее:

  • строят заголовок фрейма на чистом Python с помощью модуля struct

  • чтобы дописать заголовок спереди (2-14 байт), копируют весь мегабайт

  • отправляют полученный объект bytes в asyncio.Transport.write

Маршрут, по которому проходят данные и при чтении, и при записи, сопровождается созданием и разрушением большого количества мелких объектов, а также поиском функций по имени в модулях. Это плохо с точки зрения memory locality и в целом даёт неоправданные накладные расходы для таких базовых операций.

А как хотелось бы?

Получение данных

В asyncio есть два типа протоколов: asyncio.Protocol и asyncio.BufferedProtocol. При получении новых данных из сети asyncio.Protocol передаёт пользовательскому коду новый объект bytes с копией данных. asyncio.BufferedProtocol, в свою очередь, позволяет передать в asyncio свой собственный буфер и избежать лишнего копирования.

  • чтобы минимизировать задержки, библиотека вебсокетов должна использовать asyncio.BufferedProtocol; ни aiohttp, ни websockets этого не делают

  • парсер на C разбирает заголовок фрейма и определяет, доставлен ли фрейм полностью

  • как только фрейм полностью доставлен, границы payload в буфере передаются в пользовательский код сразу же

  • пользовательский код может читать и разбирать содержимое payload в буфере, не копируя его

  • для TLS ситуация сложнее, но количество копирований всё равно сильно уменьшается. Пользователь работает с участком памяти, куда OpenSSL записывает расшифрованные данные

Отправка данных

Для минимизации задержек при отправке нужно:

  • подготовить заголовок

  • как можно быстрее вызвать системный вызов send/writev

  • если это клиент, придётся скопировать данные, чтобы наложить маску

  • но можно дать пользователю API, в котором он сам предоставит bytearray буфер с payload и зарезервированными 16 байтами впереди, куда мы запишем заголовок фрейма, и наложить маску прямо в нём, без копирования.

  • сделать всё максимально на C, минимизировав создание и взаимодействие с Python-объектами

Именно на таком подходе и построен picows. От некоторых высокоуровневых фич пришлось отказаться ради производительности:

  • получение данных осуществляется через обычный обратный вызов, а не через async for

  • picows предоставляет пользователю фреймы, а не сообщения. Подавляющее большинство серверов и клиентов никогда не разбивают сообщения на несколько фреймов. В таком случае фрейм сам по себе уже является готовым к дальнейшей обработке сообщением

  • если же сообщение разбито на несколько фреймов, пользователь может выбрать наиболее оптимальный способ конкатенации фреймов

  • в picows нет расширения permessage-deflate из коробки, поскольку компрессия работает на уровне сообщений, а не фреймов

  • picows WSTransport.send не является async методом. Если запись в сокет прямо сейчас невозможна (системный вызов возвращает EAGAIN), send добавляет данные в очередь и завершается.

  • есть zero-copy версия send, использующая пользовательский буфер: WSTransport.send_reuse_external_bytearray.

Простой клиент выглядит так:

import asyncio

from picows import ws_connect, WSFrame, WSTransport, WSListener, WSMsgType, WSCloseCode


class ClientListener(WSListener):
    def on_ws_connected(self, transport: WSTransport):
        transport.send(WSMsgType.TEXT, b"Hello world")

    def on_ws_frame(self, transport: WSTransport, frame: WSFrame):
        print(f"Echo reply: {frame.get_payload_as_ascii_text()}")
        transport.send_close(WSCloseCode.OK)
        transport.disconnect()


async def main():
    transport, client = await ws_connect(ClientListener, "ws://127.0.0.1:9001")
    await transport.wait_disconnected()


if __name__ == '__main__':
    asyncio.run(main())

Может показаться громоздко, но именно такой интерфейс позволяет достичь максимальной производительности. Дизайн интерфейса полностью повторяет transport/protocol модель в asyncio. Установка, и ожидание разрыва соединения - это async функции. Получение и отправка данных - полностью не async.

Эхо-сервер выглядит похоже: ws_connect заменяется на ws_create_server, объекты ServerClientListener создаются для входящих соединения, а реализация отвечает на входящие сообщения.

import asyncio
from picows import ws_create_server, WSFrame, WSTransport, WSListener, WSMsgType, WSUpgradeRequest


class ServerClientListener(WSListener):
    def on_ws_connected(self, transport: WSTransport):
        print("New client connected")

    def on_ws_frame(self, transport: WSTransport, frame: WSFrame):
        if frame.msg_type == WSMsgType.CLOSE:
            transport.send_close(frame.get_close_code(), frame.get_close_message())
            transport.disconnect()
        else:
            transport.send(frame.msg_type, frame.get_payload_as_memoryview())


async def main():
    def listener_factory(r: WSUpgradeRequest):
        # Routing can be implemented here by analyzing request content
        return ServerClientListener()

    server: asyncio.Server = await ws_create_server(listener_factory, "127.0.0.1", 9001)
    for s in server.sockets:
        print(f"Server started on {s.getsockname()}")

    await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())

Если нужно перейти от обыкновенных методов в ClientListener к async, это всегда можно сделать либо через asyncio.create_task, либо через asyncio.Queue. Пользователь может выбирать наиболее подходящий в его случае механизм. Однако делать это по умолчанию для всех сообщений — плохая идея из-за значительных задержек, которые эти механизмы создают. Для максимальной производительности я рекомендую по возможности обрабатывать входящие сообщения вообще без async.

Сравнение производительности

Бенчмарк измеряет RPS (request per second) клиентов с использованием различных библиотек. Клиент отправляет сообщение определённого размера, сервер отправляет сообщение обратно. После этого клиент отправляет сообщение повторно.

Клиенты подсоединяются по очереди через loopback-интерфейс к одному и тому же серверу, написанному на C++ boost.beast. Этот цикл повторяется в течение заданного промежутка времени, и в конце вычисляется RPS. Тестируются TCP- и TLS-соединения, asyncio и uvloop, а также различные размеры сообщений.

Я также добавил boost.beast C++ клиент, чтобы иметь baseline и понять, насколько вообще Python и asyncio loop уступают “чистому” С++ клиенту.

Ссылка на бенчмарк

256 байт

256 bytes
256 bytes

8 КБ

8 Kb
8 Kb

100 КБ

100 Kb
100 Kb

2 МБ

2 Mb
2 Mb

Выводы

Конечно каждый кулик хвалит своё болото, но я не смог найти аналогичных, независимых бенчмарков, что бы в них поучаствовать. В основном, делают бенчмарки для серверов и сравнивают их пропускную способность. Моя же изначальная задача была именно сделать клиента с минимальными задержками.

В моих тестах picows показывает RPS в 2-2.5 раза выше, чем websockets и aiohttp. aiohttp немного быстрее websockets за счёт того, что у неё более эффективный парсер фреймов, переписанный на C. Отрыв picows особенно велик для небольших сообщений, когда задержки библиотек доминируют над временем, проведённым в системных вызовах read/send.

В некоторых случаях picows даже умудряется немного обогнать C+±клиент boost.beast. Когда я впервые увидел эти результаты, то подумал, что с тестом что-то не так. Но потом выяснилась интересная деталь реализации beast. После того как epoll_wait сигнализирует о готовности сокета на чтение, boost.beast вызывает read с буфером фиксированного размера 1536 байт. Если read полностью заполняет буфер, то затем вызывается ещё один read, но уже с пользовательским буфером большего размера. Это интересная попытка оптимизировать доставку маленьких фреймов, но для фреймов размером больше 1.5 КБ boost.beast всегда делает как минимум 2 системных вызова, что и сказывается на производительности.

uvloop по-прежнему немного быстрее asyncio на реальных тестах, но разница уже не очень велика.

Добавлять ws4py было не совсем честно, так как клиент у этой библиотеки синхронный, сокет работает в блокирующем режиме, async loop не используется и в нём нет вызова epoll_wait. Отсюда и лучшие результаты. Я добавил эту библиотеку, потому что у этого проекта больше 1000 звёзд на GitHub.

Несколько слов о производительности asyncio и uvloop

Реализация EventLoop в asyncio в последних версиях приближается к производительности uvloop в основном за счёт того, что реализации asyncio.Task и asyncio.Future переписали на C. Однако основной цикл в base_events.py остаётся по-прежнему на Python. Взаимодействие с модулем selectors тоже осуществляется через обычные Python-вызовы. asyncio и selectors не интегрированы друг с другом на уровне C. Поэтому чтение данных в uvloop по-прежнему немного быстрее, чем в asyncio.

И в asyncio, и в uvloop производительность TLS-слоя остаётся, пожалуй, самым неэффективным местом из-за того, что OpenSSL используется не напрямую, а через обёртки в модуле ssl. В связи с этим возникают несколько ненужных копирований данных и на чтение, и на запись.

Последние версии picows используют высокоэффективную замену методов asyncio.loop.create_connection и asyncio.loop.create_server, предоставляемую пакетом aiofastnet. aiofastnet реализован на C/Cython и взаимодействует с OpenSSL напрямую. Это даёт дополнительный прирост производительности при использовании защищённых соединений.

Комментарии (0)