Всем привет!
Меня зовут Тарас, я автор библиотеки picows — ультрабыстрых вебсокетов для asyncio. В этой статье я расскажу, почему вообще появилась ещё одна библиотека для веб-сокетов, покажу результаты бенчмарков и заодно порассуждаю о производительности в asyncio.
Предыстория
В далёком-предалёком 2021 году мне довелось поучаствовать в разработке алготрейдинг-платформы для криптовалютных бирж. Выбор языка пал на Python из-за разнообразия ML-библиотек, возможность быстро собирать прототипы и проверять идеи, отсутствия этапа компиляции и в целом наличия богатой экосистемы. Если какая-то идея взлетит, критичный участок всегда можно оптимизировать, хотя бы частично переписав его на C/C++/Cython.
Одна из первых задач — подключиться к WebSocket API крипто-биржи и подписаться на рыночные данные в реальном времени: обычно это сделки и обновления торговой книги.
Когда рынок тихий, таких обновлений приходит немного: скажем, до 20 в секунду на одну торговую пару. Но когда начинается движение, счёт уже идёт на тысячи. Часто, чтобы принимать более точные решения в торговой стратегии, нужно получать обновления сразу по нескольким инструментам и с нескольких бирж. В таком режиме количество входящих сообщений, которые нужно обработать, запросто переваливает за 10000, а то и за 20000 в секунду. Сами сообщения при этом обычно небольшие: JSON размером порядка 200 байт.
Я взял вебсокеты из aiohttp и довольно быстро понял, что библиотека не справляется. Даже если вообще ничего не делать со входящими сообщениями, они скапливаются во внутренней очереди aiohttp, а в тяжёлых случаях RCVBUF сокета успевает переполниться и возникает back-pressure. Задержка между приёмом данных из сети и доставкой их в логику приложения быстро растёт. А ведь эти данные с биржи потом ещё нужно разобрать и обработать.
Код выглядел примерно так:
from aiohttp import ClientSession async def run(): async with ClientSession() as session: async with session.ws_connect("wss://какая-то.биржа/btc_usd", ssl_context=ssl_context) as ws: async for msg in ws: if msg.type == WSMsgType.TEXT: # Разбор и обработка содержимого сообщения # msg.data: str ... # Когда нужно что-то отправить await ws.send_bytes(data)
Каких-то улучшений и оптимизации тут особо и не сделаешь.
Задержки в алготрейдинге — это очень плохо. Нет ничего хуже торговой стратегии, которая лагает. Даже 10 миллисекунд — это уже серьёзный лаг и, как следствие, быстро потерянные деньги. С aiohttp задержка вполне доходила и до 100 мс.
Другая популярная библиотека, websockets, имея похожий async интерфейс, оказалась ещё медленнее. При этом именно websockets на тот момент активно рекламировала себя как самую быструю современную async-библиотеку, но никаких конкретных бенчмарков и сравнений не предоставлялось.
Понять причину задержек можно погрузившись в детали реализации этих библиотек. Но прежде чем это сделать, напомню как вообще устроен протокол вебсокетов. Он довольно простой.
Протокол вебсокетов (совсем коротко)
На сетевом уровне отправляются вебсокет-фреймы (frame). Каждый фрейм состоит из заголовка (header) и полезной нагрузки (payload). Длина заголовка — от 2 до 14 байт. В заголовке указаны:
тип фрейма (TEXT | BINARY | PING | PONG | CLOSE | CONTINUATION)
несколько флагов (fin, rsv1, rsv2, rsv3)
длина payload в байтах
опциональная xor-маска для payload
Пользовательские сообщения (message) могут состоять из нескольких фреймов. Чтобы получить сообщение, payload фреймов конкатенируется. Контрольные сообщения PING/PONG/CLOSE всегда отправляются одним фреймом.
В подавляющем большинстве случаев серверы и клиенты отправляют пользовательские сообщения одним фреймом: так проще и эффективнее. Максимальный размер фрейма — 2^63 байт, то есть, по сути, ограничений на размер нет. Флаг fin из заголовка сигнализирует об окончании сообщения:
WSFrame(frame_type=<TEXT|BINARY|PING|PONG|CLOSE>, fin=True, <msg>)
Если же сообщение разбивается на несколько фреймов, то порядок выглядит так:
WSFrame(frame_type=<TEXT|BINARY>, fin=False, <msg part1>) WSFrame(frame_type=CONTINUATION, fin=False, <msg part2>) WSFrame(frame_type=CONTINUATION, fin=True, <msg part3>)
Детали реализации aiohttp и websockets
При приёме данных aiohttp и websockets делают примерно следующее:
сырые данные приходят в виде объектов bytes в asyncio.Protocol.data_received
парсер фреймов копирует их, накапливая в своём буфере
парсер разбирает заголовок фрейма
если фрейм приехал полностью, то payload копируется из буфера, а буфер очищается
фреймы объединяются в сообщения, и данные могут копироваться повторно
если тип сообщения TEXT, данные могут дополнительно копироваться и конвертироваться в объект str
сообщения добавляются во внутреннюю очередь (аналог asyncio.Queue)
asyncio, когда больше не остаётся IO-операций с сокетами, пробуждает пользовательскую корутину и передаёт ей готовое сообщение
Проблем тут несколько:
пользовательская корутина просыпается через неопределённо долгое количество времени
после чтения из сокета данные копируются несколько раз
парсер фреймов и другой код, связанный с конкатенацией фреймов, очередью и контролем потока (flow control), написаны на Python и тоже не слишком эффективны. Хотя конкретно в aiohttp парсер в какой-то момент переписали на C
Отправка данных и построение фреймов тоже не обходятся без копирования.
Например, вы хотите отправить сообщение размером в 1 МБ. И aiohttp, и websockets в лучшем случае делают следующее:
строят заголовок фрейма на чистом Python с помощью модуля struct
чтобы дописать заголовок спереди (2-14 байт), копируют весь мегабайт
отправляют полученный объект bytes в asyncio.Transport.write
Маршрут, по которому проходят данные и при чтении, и при записи, сопровождается созданием и разрушением большого количества мелких объектов, а также поиском функций по имени в модулях. Это плохо с точки зрения memory locality и в целом даёт неоправданные накладные расходы для таких базовых операций.
А как хотелось бы?
Получение данных
В asyncio есть два типа протоколов: asyncio.Protocol и asyncio.BufferedProtocol. При получении новых данных из сети asyncio.Protocol передаёт пользовательскому коду новый объект bytes с копией данных. asyncio.BufferedProtocol, в свою очередь, позволяет передать в asyncio свой собственный буфер и избежать лишнего копирования.
чтобы минимизировать задержки, библиотека вебсокетов должна использовать asyncio.BufferedProtocol; ни aiohttp, ни websockets этого не делают
парсер на C разбирает заголовок фрейма и определяет, доставлен ли фрейм полностью
как только фрейм полностью доставлен, границы payload в буфере передаются в пользовательский код сразу же
пользовательский код может читать и разбирать содержимое payload в буфере, не копируя его
для TLS ситуация сложнее, но количество копирований всё равно сильно уменьшается. Пользователь работает с участком памяти, куда OpenSSL записывает расшифрованные данные
Отправка данных
Для минимизации задержек при отправке нужно:
подготовить заголовок
как можно быстрее вызвать системный вызов send/writev
если это клиент, придётся скопировать данные, чтобы наложить маску
но можно дать пользователю API, в котором он сам предоставит bytearray буфер с payload и зарезервированными 16 байтами впереди, куда мы запишем заголовок фрейма, и наложить маску прямо в нём, без копирования.
сделать всё максимально на C, минимизировав создание и взаимодействие с Python-объектами
Именно на таком подходе и построен picows. От некоторых высокоуровневых фич пришлось отказаться ради производительности:
получение данных осуществляется через обычный обратный вызов, а не через async for
picows предоставляет пользователю фреймы, а не сообщения. Подавляющее большинство серверов и клиентов никогда не разбивают сообщения на несколько фреймов. В таком случае фрейм сам по себе уже является готовым к дальнейшей обработке сообщением
если же сообщение разбито на несколько фреймов, пользователь может выбрать наиболее оптимальный способ конкатенации фреймов
в picows нет расширения permessage-deflate из коробки, поскольку компрессия работает на уровне сообщений, а не фреймов
picows WSTransport.send не является async методом. Если запись в сокет прямо сейчас невозможна (системный вызов возвращает EAGAIN), send добавляет данные в очередь и завершается.
есть zero-copy версия send, использующая пользовательский буфер: WSTransport.send_reuse_external_bytearray.
Простой клиент выглядит так:
import asyncio from picows import ws_connect, WSFrame, WSTransport, WSListener, WSMsgType, WSCloseCode class ClientListener(WSListener): def on_ws_connected(self, transport: WSTransport): transport.send(WSMsgType.TEXT, b"Hello world") def on_ws_frame(self, transport: WSTransport, frame: WSFrame): print(f"Echo reply: {frame.get_payload_as_ascii_text()}") transport.send_close(WSCloseCode.OK) transport.disconnect() async def main(): transport, client = await ws_connect(ClientListener, "ws://127.0.0.1:9001") await transport.wait_disconnected() if __name__ == '__main__': asyncio.run(main())
Может показаться громоздко, но именно такой интерфейс позволяет достичь максимальной производительности. Дизайн интерфейса полностью повторяет transport/protocol модель в asyncio. Установка, и ожидание разрыва соединения - это async функции. Получение и отправка данных - полностью не async.
Эхо-сервер выглядит похоже: ws_connect заменяется на ws_create_server, объекты ServerClientListener создаются для входящих соединения, а реализация отвечает на входящие сообщения.
import asyncio from picows import ws_create_server, WSFrame, WSTransport, WSListener, WSMsgType, WSUpgradeRequest class ServerClientListener(WSListener): def on_ws_connected(self, transport: WSTransport): print("New client connected") def on_ws_frame(self, transport: WSTransport, frame: WSFrame): if frame.msg_type == WSMsgType.CLOSE: transport.send_close(frame.get_close_code(), frame.get_close_message()) transport.disconnect() else: transport.send(frame.msg_type, frame.get_payload_as_memoryview()) async def main(): def listener_factory(r: WSUpgradeRequest): # Routing can be implemented here by analyzing request content return ServerClientListener() server: asyncio.Server = await ws_create_server(listener_factory, "127.0.0.1", 9001) for s in server.sockets: print(f"Server started on {s.getsockname()}") await server.serve_forever() if __name__ == "__main__": asyncio.run(main())
Если нужно перейти от обыкновенных методов в ClientListener к async, это всегда можно сделать либо через asyncio.create_task, либо через asyncio.Queue. Пользователь может выбирать наиболее подходящий в его случае механизм. Однако делать это по умолчанию для всех сообщений — плохая идея из-за значительных задержек, которые эти механизмы создают. Для максимальной производительности я рекомендую по возможности обрабатывать входящие сообщения вообще без async.
Сравнение производительности
Бенчмарк измеряет RPS (request per second) клиентов с использованием различных библиотек. Клиент отправляет сообщение определённого размера, сервер отправляет сообщение обратно. После этого клиент отправляет сообщение повторно.
Клиенты подсоединяются по очереди через loopback-интерфейс к одному и тому же серверу, написанному на C++ boost.beast. Этот цикл повторяется в течение заданного промежутка времени, и в конце вычисляется RPS. Тестируются TCP- и TLS-соединения, asyncio и uvloop, а также различные размеры сообщений.
Я также добавил boost.beast C++ клиент, чтобы иметь baseline и понять, насколько вообще Python и asyncio loop уступают “чистому” С++ клиенту.
256 байт

8 КБ

100 КБ

2 МБ

Выводы
Конечно каждый кулик хвалит своё болото, но я не смог найти аналогичных, независимых бенчмарков, что бы в них поучаствовать. В основном, делают бенчмарки для серверов и сравнивают их пропускную способность. Моя же изначальная задача была именно сделать клиента с минимальными задержками.
В моих тестах picows показывает RPS в 2-2.5 раза выше, чем websockets и aiohttp. aiohttp немного быстрее websockets за счёт того, что у неё более эффективный парсер фреймов, переписанный на C. Отрыв picows особенно велик для небольших сообщений, когда задержки библиотек доминируют над временем, проведённым в системных вызовах read/send.
В некоторых случаях picows даже умудряется немного обогнать C+±клиент boost.beast. Когда я впервые увидел эти результаты, то подумал, что с тестом что-то не так. Но потом выяснилась интересная деталь реализации beast. После того как epoll_wait сигнализирует о готовности сокета на чтение, boost.beast вызывает read с буфером фиксированного размера 1536 байт. Если read полностью заполняет буфер, то затем вызывается ещё один read, но уже с пользовательским буфером большего размера. Это интересная попытка оптимизировать доставку маленьких фреймов, но для фреймов размером больше 1.5 КБ boost.beast всегда делает как минимум 2 системных вызова, что и сказывается на производительности.
uvloop по-прежнему немного быстрее asyncio на реальных тестах, но разница уже не очень велика.
Добавлять ws4py было не совсем честно, так как клиент у этой библиотеки синхронный, сокет работает в блокирующем режиме, async loop не используется и в нём нет вызова epoll_wait. Отсюда и лучшие результаты. Я добавил эту библиотеку, потому что у этого проекта больше 1000 звёзд на GitHub.
Несколько слов о производительности asyncio и uvloop
Реализация EventLoop в asyncio в последних версиях приближается к производительности uvloop в основном за счёт того, что реализации asyncio.Task и asyncio.Future переписали на C. Однако основной цикл в base_events.py остаётся по-прежнему на Python. Взаимодействие с модулем selectors тоже осуществляется через обычные Python-вызовы. asyncio и selectors не интегрированы друг с другом на уровне C. Поэтому чтение данных в uvloop по-прежнему немного быстрее, чем в asyncio.
И в asyncio, и в uvloop производительность TLS-слоя остаётся, пожалуй, самым неэффективным местом из-за того, что OpenSSL используется не напрямую, а через обёртки в модуле ssl. В связи с этим возникают несколько ненужных копирований данных и на чтение, и на запись.
Последние версии picows используют высокоэффективную замену методов asyncio.loop.create_connection и asyncio.loop.create_server, предоставляемую пакетом aiofastnet. aiofastnet реализован на C/Cython и взаимодействует с OpenSSL напрямую. Это даёт дополнительный прирост производительности при использовании защищённых соединений.