Меня удивило, что я нашел всего одно упоминание про эту разработку на хабре, да и то в скользь, поэтому решил написать эту заметку.
Как пишут на официальном сайте разработчики самой библиотеки, ZeroRPC – это легкая, надежная, языконезависимая библиотека для распределенной связи между серверами.
ZeroRPC построена на вершине стека ZeroMQ и MessagePack, о которых уже не раз говорилось на хабре. И из коробки поддерживает stream responses, heartbeat'ы, определение таймаутов и даже автоматическое переподключение после какого-нибудь фейла.
Звучало хорошо и я решил попробовать. По большому счету я еще не изучил все возможности библиотеки и расскажу только о том, как применил ее в своем случае. Но, возможно, уже это заинтересует Вас почитать и, возможно, попробовать ZeroMQ в своих проектах.
Установка проста и понятна, как и у большинства python библиотек:
pip install zerorpc
за собой она притянет еще парочку либ, среди которых:
- gevent,
- msgpack-python,
- pyzmq
Ставится не за пару секунд, поэтому если установка вдруг остановилась на одном месте, не бейте панику – скоро все продолжится.
Сообственно согласно документации ей уже можно начать пользоваться без написания какого-либо кода.
zerorpc --server --bind tcp://*:1234 time
Охотно верю, но меня это не сильно интересовало. Более интересными выглядели примеры на главной странице сайта.
Сервер
import zerorpc
class HelloRPC(object):
def hello(self, name):
return "Hello, %s" % name
s = zerorpc.Server(HelloRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()
Клиент
import zerorpc
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
print c.hello("RPC")
Копи-паста, запуск, работает.
Круто, но hello world'ы всегда работают, поэтому немного усложним первичные тесты
Сервер
import zerorpc, time
class StreamingRPC(object):
count = 0
def commit(self, cid):
self.count += 1
print(str(self.count))
return str(cid)+'_done'
s = zerorpc.Server(StreamingRPC())
s.bind("tcp://0.0.0.0:4242")
s.run()
Клиент
import zerorpc
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
for i in range(0,290000):
print c.commit(i)
И запустим несколько клиентов дабы посмотреть, что будет с пакетами.
Меня больше всего волновал этот вопрос, ибо в моей реализации я уперся в то, что при большой нагрузке я терял больше 40% сообщений на udp сокетах, что собственно не очень-то удивительно…
Тест конечно показал, что все совсем не так быстро как было на сокетах, но зато не единой потери пакета. Чуть позже я провел еще парочку тестов с тредами и записью в базу данных и остался вполне доволен. Через несколько дней тестов, я переписал код сокет сервера и клиента для своей метрики, и вот спустя две недели метрика собирает кучу данных и “ни единого разрыва”. Более того, повысив скорость сбора данных, просадки в производительности я не увидел. В данный момент метрика собирает в пике до 100к сообщений в секунду и замечательно себя чувствует.
Конечно в моем случае ZeroRPC скорее всего избыточна и я уверен, что будь на моем месте более сведущий программист, он бы смог победить потери пакетов, зависание сессий и обрывы коннектов сокетов и реализовал бы более производительное решение.
Но тогда я бы не познакомился с ZeroRPC и не рассказал бы о ней здесь.
И быть может другие возможности этой библиотеки очень могут пригодиться в рабочих буднях читающих эту заметку разработчиков.
Рекомендую посмотреть-почитать доклад Jerome Petazzoni из dotCloud о том, зачем и для чего они ее сделали
pycon-2012-notes.readthedocs.org/en/latest/dotcloud_zerorpc.html
Сайт проекта — www.zerorpc.io
GitHub — github.com/etsy/statsd
Комментарии (12)
Elvish
28.07.2015 10:51Меня больше всего волновал этот вопрос, ибо в моей реализации я уперся в то, что при большой нагрузке я терял больше 40% сообщений на udp сокетах, что собственно не очень-то удивительно…
А вы пробовали увеличить буфер сокета?
Не знаю как в питоне, но в C это выглядит как-то так:
int sock = /* инициализация сокета, бинд */; int new_size = 1024*1024; setsockopt(sock,SOL_SOCKET,SO_RCVBUF,&new_size,sizeof(new_size));
или в QT:
QUdpServer *udp = new QUdpServer(); udp->bind(....); udp->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, 1048576);
Мне очень интересно проблема только в этом была или еще в чем-то.alexrett Автор
28.07.2015 17:16Проблем было несколько.
С изменением буфера самого сокета, честно говоря, не пробовал.Возможно это бы решило проблему, но сеть мне не подсказала о таком решении в тот момент. Постараюсь в ближайшие дни попробовать проверить.
А основные проблемы были связаны с потерей пакетов и разрывом соединения (в случае tcp). Также несколько раз упирался в перемешивание сообщений, вот тут уж с чем это связанно, я честно признаться, совсем не знаю.
pavelsh
28.07.2015 21:54+1Если перемешивание сообщений было в udp то это нормально. Связано чаще всего с одной вещью — параллельные пути внутри сети. Часто бывает так, что пакет, который отправили позднее — пришел раньше.
Или же QoS (что редко встречается в современных сетях) — один пакет по какой-либо причине попридержали в буфере перед отправкой.alexrett Автор
28.07.2015 22:34Понятно. Да именно с udp такое и происходило…
А случайно не подскажите, что почитать по теме, дабы таки разобраться в вопросе как следует?pavelsh
28.07.2015 22:55+1По теме сетей? Тут очень обширно. Когда то (в 2000 году кажется) мне очень понравились Олиферы. Но вряд ли там будет объяснено про параллельные пути.
Давайте я коротко вам тут попытаюсь объяснить.
1. Как работает роутинг в Интернет: много много сетевых устройств соединены друг с другом. Объединение этих устройств образует граф. Чтобы дойти из одной точки графа в другую точку графа надо пройти какой-то путь в этом графе. Чаще всего таких путей может быть несколько. Но не все пути будут одинаковыми, один путь будет короче, другой длинее (как физически, так и по задержкам). Понятно, что чем короче путь — тем быстрее пакет дойдет до адресата. И было бы выгодно использовать этот маршрут. Но когда стали использовать этот принцип, быстро выяснилось, что некоторые дуги в этом графе (физические линки, оптика) не загружены трафиком, а некоторые перегружены трафиком. Поэтому с помощью некоторых технолочиских ухищрений трафик в этом графе научились отправлять по паралельным линкам. И иногда так получалось, что один пакет из tcp сессии шел через одного провайдера, а второй пакет через другого (обычно пытаются сделать так, чтобы одна tcp сессия шла всегда одинакова, но я видел огромное количество противоположенных примеров). Понятно, что сети у разных провайдеров разные, и часто так получалось, что у одного провайдера пакет подзадерживался то тут, то там (особенно если сеть у провайдера перегружена), а у другого провайдера пакет пролетал со свистом через всю сеть. И получалось, что пакеты приходили в разное время, хотя, казалось бы начинали они идти по сети почти одновренно.
2. Что касается QoS — Quality of Service то тут все просто. QoS обычно настраивают для того, чтобы обеспечить заданное качество обслуживание на сети. Одна из политик QoS — буферизация трафика. Как оно работает — когда видим, что линк у нас на 100Mb/s, а в него пытается проползти 1Gb/s, мы засовываем в линк столько, сколько влезет, а остальное складываем в буфер с мыслью: «а вдруг этот 1Gb/s станет жалким 1Mb/s, а мы потом успеем все в линк покидать». Буферизация приводит к тому, что один пакет задержался в буфере, а второй под шумок пролез параллельным путем.
Или был такой механизм как WFQ — когда маленькие пакеты получали приоритет перед большими при отправке через низкоскоростные интерфейсы. На более высокоскоростных интерфейсах это стало неважно.
Все что я попытался объяснить тут справедливо для Интернета и мало справедливо для обычной локальной сети (хотя для локальной сети большого предприятия это тоже может быть справедливо).
P.S.: Кстати еще по сетям очень рекомендую цикл статей на хабре — Сети для самых маленьких. Начало я не читал, честно говоря, но вот часть про MPLS написана просто очень отлично habrahabr.ru/post/246425alexrett Автор
28.07.2015 23:04Спасибо, за такое развернутое объяснение! С циклом статей ознакомлюсь обязательно.
TrueMaker
Легкая, надёжная, но с Python 3 не совместимая из-за свой зависимости на gevent. Жаль.
lega
С недавних пор gevent работает под py3.3+
TrueMaker
С каких пор? На сколько мне известно это еще в девелопмент состоянии.
Да и эта ошибка как бы намекает на синтаксис питона 2.x
File "/private/var/folders/5m/hhwdehdewhdwed/T/pip-build-bp461rpq/gevent/setup.py", line 111, in make_universal_header
print >>f, line
TypeError: unsupported operand type(s) for >>: 'builtin_function_or_method' and '_io.TextIOWrapper'
поставить попробовал только что.
lega
У меня собралось и запустилось (собрал из github).
Так же поддержка python3 заявлена, да видимо оно ещё в бете, но это дело времени…
TrueMaker
Дело времени не дело продакшен :)
lega
Кроме того под asyncio есть aiozmq в которой есть аналогичный RPC через эти же — zmq и msgpack.