class MyCounter(object):
def __init__(self):
self.__counter = 0
def incCounter(self):
self.__counter += 1
def getCounter(self):
return self.__counter
И вы хотите сделать его распределённым. Просто наследуете его от SyncObj (передав ему список серверов, с которыми нужно синхронизироваться) и отмечаете декоратором @replicated все методы, которые изменяют внутреннее состояние класса:
class MyCounter(SyncObj):
def __init__(self):
super(MyCounter, self).__init__('serverA:4321', ['serverB:4321', 'serverC:4321'])
self.__counter = 0
@replicated
def incCounter(self):
self.__counter += 1
def getCounter(self):
return self.__counter
PySyncObj автоматически обеспечит репликацию вашего класса между серверами, отказоустойчивость (всё будет работать до тех пор, пока живо больше половины серверов), а также (при необходимости) асинхронный дамп содержимого на диск.
На базе PySyncObj можно строить различные распределенные системы, например распределенный мьютекс, децентрализованные базы данных, биллинговые системы и другие подобные штуки. Все те, где на первом месте стоит надёжность и отказоустойчивость.
Общее описание
Для репликации PySyncObj использует алгоритм Raft. Raft — это простой алгоритм достижения консенсуса в распределённой системе. Raft разрабатывался в качестве более простой замены алгоритма Paxos. Вкратце алгоритм raft работает следующим образом. Среди всех узлов выбирается лидер, который пингует остальные узлы через определенный промежуток времени. Каждый узел выбирает случайный промежуток времени, который он будет ждать получение пинга от лидера. Когда время ожидания заканчивается, а пинг от лидера не пришел — узел считает, что лидер упал и посылает остальным узлам сообщение, в котором говорит, что он сам стал лидером. При удачном стечении обстоятельств на этом всё и заканчивается (остальные узлы соглашаются). А в случае, если два узла захотели стать лидерами одновременно, процедура выбора лидера повторяется (но уже с другими случайными значениями времени ожидания). Подробнее о выборе лидера вы можете узнать посмотрев визуализацию, либо почитав научную статью.
После того как определён лидер, он отвечает за поддержание распределённого журнала. В распределённый журнал пишутся все действия, изменяющие состояние системы. Действие применяется к системе только в том случае, если большинство узлов подтверждает получение записи — это обеспечивает консистетность. Для того чтобы количество записей в распределенном логе не росло до бесконечности, периодически происходит операция под названием log compaction. Текущий лог выкидывается, а вместо него начинает хранится сериализованное состояние системы на текущий момент.
Чтобы не потерять содержимое (например, при выключении вообще всех серверов), его нужно периодически сохранять на диск. Так как количество данных может быть очень большим, содержимое сохраняется асинхронно. Чтобы одновременно иметь возможность работать с данными и параллельно сохранять их же на диск, PySyncObj использует CopyOnWrite через fork процесса. После fork-а процесс родитель и дочерний процесс имеют общую память. Копирование данных осуществляется операционной системой лишь в случае попытки перезаписи этих данных.
PySyncObj реализован целиком на Python (поддерживается Python 2 и Python 3) и не использует каких-либо внешних библиотек. Работа с сетью происходит при помощи select или poll, в зависимости от платформы.
Примеры
А теперь несколько примеров.
Key-value storage
class KVStorage(SyncObj):
def __init__(self, selfAddress, partnerAddrs, dumpFile):
conf = SyncObjConf(
fullDumpFile=dumpFile,
)
super(KVStorage, self).__init__(selfAddress, partnerAddrs, conf)
self.__data = {}
@replicated
def set(self, key, value):
self.__data[key] = value
@replicated
def pop(self, key):
self.__data.pop(key, None)
def get(self, key):
return self.__data.get(key, None)
Вообщем-то всё то же самое что и со счетчиком. Для того чтобы периодически сохранять данные на диск создаём SyncObjConf и передаём ему fullDumpFile.
Callback
PySyncObj поддерживает callback-и — вы можете создавать методы, возвращающие какие-то значения, они автоматически будут прокинуты в callback:
class Counter(SyncObj):
def __init__(self):
super(Counter, self).__init__('localhost:1234', ['localhost:1235', 'localhost:1236'])
self.__counter = 0
@replicated
def incCounter(self):
self.__counter += 1
return self.__counter
def onAdd(res, err):
print 'OnAdd: counter = %d:' % res
counter = Counter()
counter.incCounter(callback=onAdd)
Distributed lock
Пример чуть сложнее — распределенный лок. Весь код можете посмотреть на github, а здесь я просто опишу основные аспекты его работы.
Начнём с интерфейса. Лок поддерживает следующие операции:
- tryAcquireLock — попытка взять лок
- isAcquired — проверка, взят ли лок или отпущен
- release — отпустить лок
Первый возможный вариант реализации лока — аналогичный key-value хранилищу. Если по ключу lockA что-то есть, значит лок взят, иначе он свободен, и мы можем сами его взять. Но не всё так просто.
Во-первых, если мы просто воспользуемся kv-хранилищем из примера выше без всяких модификаций, то операции проверки наличия элемента (проверки взят ли лок) и записи элемента (взятие лока) будут не атомарны (то есть мы можем перезаписать чей-то другой лок). Поэтому проверка и взятие лока должны быть одной операцией, реализуемой внутри реплицируемого класса (в данном случае в tryAcquireLock).
Во-вторых, в случае если какой-то из клиентов, взявших лок упадёт, лок останется висеть навсегда (ну или пока клиент не переподнимется и не отпустит его). В большинстве случаев это нежелательное поведение. Поэтому мы введём timeout, после которого lock будет считаться отпущенным. Также придется добавить операцию, подтверждающую взятие лока (назовём её ping), которая будет вызываться с интервалом timeout / 4, и которая будет продлевать жизнь взятым локам.
Третья особенность — реплицируемые классы должны обеспечивать идентичное поведение на всех серверах. Это значит, что они не должны использовать внутри себя никаких данных, которые могут отличаться. Например, список процессов на сервере, значение random-а или время. Поэтому если мы всё же хотим использовать время — придется передавать его в качестве параметра всем методам класса, в которых оно используется.
С учетом этого, получившаяся реализации состоит из двух классов — LockImpl, являющийся реплицируемым объектом а так же Lock, обертка над ним. Внутри Lock мы автоматически добавляем текущее время ко всем операциям над LockImpl а так же осуществляем периодический ping с целью подтвердить взятые локи. Получившийся лок — всего лишь минимальный пример, который можно дорабатывать с учетом требуемой функциональности. Например, добавить колбэки, информирующие нас о взятии и отпускании лока.
Заключение
Мы используем PySyncObj в проекте WOT Blitz для синхронизации данных между серверами в разных регионах. Например, для счетчика оставшихся танках во время ивента ИС-3 Защитник. PySyncObj является неплохой альтернативной существующим механизмам хранения данных в распределенных системах. Основные аналоги – различные распределённые БД, например Apache Zookeeper, etcd и прочие. В отличие от них PySyncObj не является БД. Он является инструментом более низкого уровня и позволяет реплицировать сложные конечные автоматы. Кроме того он не требует внешних серверов и легко интегрируется в python приложения. Из недостатков текущей версии – потенциально не самая высокая производительность (сейчас это полностью python код, есть планы попробовать переписать в виде c++ экстеншена) а так же отсутствие разделение на серверную / клиентскую часть – иногда может возникнуть необходимость иметь большое количество нод-клиентов (часто подключающихся / отключающихся) и лишь несколько постоянно работающих серверов.
Ссылки
- github.com/bakwc/PySyncObj — исходники проекта
- pip install pysyncobj — установка через pypi
- raft.github.io — сайт протокола raft (описание и визуализация)
- ramcloud.stanford.edu/raft.pdf — оригинальная публикация raft (с подробным описанием деталей реализации)
- habrahabr.ru/post/222825 — Консенсус в распределенных системах. Paxos
Комментарии (24)
YourChief
28.06.2016 14:11Скажите, пожалуйста, какие ограничения этот класс накладывает на экземпляры своих классов в плане сериализации? Все ли данные, которые содержатся в экземпляре, должны быть сериализуемы pickle?
Верно ли, что изменения передаются в виде вызовов таких же методов на остальных нодах? Если какой-либо вызов какого-либо replicated метода инициализирует свойство экземпляра из внешнего объекта или из результата вызова системной функции, то содержимое на каждой ноде может получиться разным? К примеру, если метод записывает в свойство текущий timestamp.bak
28.06.2016 14:34+1Все ли данные, которые содержатся в экземпляре, должны быть сериализуемы pickle?
Именно так. Аргументы методов так же должны быть сериализуемы.
Верно ли, что изменения передаются в виде вызовов таких же методов на остальных нодах? Если какой-либо вызов какого-либо replicated метода инициализирует свойство экземпляра из внешнего объекта или из результата вызова системной функции, то содержимое на каждой ноде может получиться разным? К примеру, если метод записывает в свойство текущий timestamp.
Совершенно верно. В статье в примере distributed lock описана эта особенность. Timestamp-ы необходимо передавать в виде аргументов методам, точно так же как и сиды для рандома и прочее.
tumikosha
28.06.2016 14:16А zero-deployment есть?
Или после изменения кода класса каждый раз нужно рестартовать все кластера и деплоить его на них?
Если нету, то в топку. Лучше на GRID GAIN сидеть буду.
А вот если есть…bak
28.06.2016 14:58+1zero-deployment нет, но идея интересная. Хотя я не очень представляю как именно это реализовать. Если на части машин будет одна версия класса, а на части машин — другая — во время применения журнала операций состояния классов разойдутся. Возможно у вас есть какие-то идеи?
trapwalker
29.06.2016 00:22+1Можно попробовать сперва раздать всем нодам новую версию класса, а затем в едином локе перейти на нее. Это, наверно, наложит ещё ряд ограничений на логику, которую можно размещать в методах таких классов, но, если подумать… Вот у нас есть объект. Он постоянно синхронизируется со своими зеркалами. В какой-то момент прилетела новая реализация, она откомпилировалась (да, надо делать какую-то систему передачи кода и да, это, конечно, небезопасно без должного шифрования и подписей) и наше локальное зеркало отправило всем сигнал готовности. Когда достаточная часть сети будет готова перейти на новую версию, проходит единый импульс, который заставит пиры десериализовать уже объекты свежей версии.
Да, довольно много интересных проблем выплывает. Тут и систему зависимостей надо какую-то, хотя, возможно, и опциональную, поскольку в некоторых тривиальных случаях можно делать деплой «без оглядки». Чертовски интересная тема. Хочу поучаствовать.
tumikosha
29.06.2016 08:34Насколько я понимаю тему, есть 2 концепта:
- Erlang, Akka — distributed actors (для realtime)
- Apache Ignite/GridGain — distributed closures (для map reduce)
zero deployment жизненно необходим для map reduce
тогда программист всегда может спросить у кластера сколько есть машин, как они загружены и т.д., а потом сказать на какой машине какой класс должен быть выполнен. При этом класс автоматически деплоится на одну или несколько машин выполняется и высылает результаты обратно.
При этом он может общаться с другими классами посылкой сообщений.
Все это программист может делать вручную, а может переложить на фреймворк.
Это все шикарно реализовано у Grid Gain. Причем они внедрили так называемые
SPI — интерфейсы позволяющие заменять например транспорт между нодами
tumikosha
29.06.2016 08:52Distributed closures дает возможность писать такой вот код:
Ignite ignite = Ignition.ignite();
// Print out hello message on all cluster nodes.
ignite.compute().broadcast(() -> System.out.println(«Hello Node!»));
в результате Hello Node! бедет напечатано на всех нодах
или скажем
ignite.compute().broadcastToLessLoaded(() -> System.out.println(«Hello Node!»));
напечатает Hello Node! на наименее загруженной ноде
trapwalker
29.06.2016 00:26В некоторых приложениях и такая (с рестартами) схема вполне подойдёт. Если наши воркеры мелкие и достаточно изолированные, хотя и требуют немного распределенных данных; если воркеры получают небольшие задания от балансера и система толерантна к падению воркеров, то можно поступить так:
— отправка свежей версии воркерам происходит такой же репликацией как и все остальное;
— получив свежую версию и проверив все подписи воркер тупо падает, запустив деплой свежей версии.
— обновленные воркеры не могут получать задания, пока их не станет большинство
— как-то так=)bak
29.06.2016 00:34обновленные воркеры не могут получать задания, пока их не станет большинство
В текущем виде они вполне себе успешно получат задания, но из за того что логика старых и новых воркеров разная — в лучшем случае старые воркеры покрешатся из за того что дернется к примеру не существующий метод — в худшем — разъедутся данные из за отличий в реализации таких-же методов.trapwalker
29.06.2016 02:14Ну строго говоря да. Однако, можно придумать ситуации и задачи, когда гетерогенные по версии воркеры вполне могли бы сосуществовать. Например, если это, скажем, мессенджер, то, если не затронуты какие-то методы ядра, вполне можно пережить вызовы несуществующих методов. Просто сработает соответствующая деградация с сообщением, что, мол, ваш собеседник использует более свежую версию и вы не можете видеть некоторые его сообщения (например с волшебными стикерами).
Хотя я, всё же, считаю все эти деградации от лукавого. Надо просто запретить работу нового кода, пока не сложатся обстоятельства для консистентного и штатного перехода всей сетью.
afiskon
28.06.2016 15:49+2Супер интересная библиотека, спасибо! Пара вопросов, если позволите:
- Raft-кластер имеет одного лидера, в который идет вся запись. Верно ли я понимаю, что для распределения нагрузки предлагается создать несколько кластеров на разных портах?
- Расскажите, пожалуйста, как вы тестировали вашу реализацию Raft? Доступны ли где-то исходники этих тестов?
bak
28.06.2016 16:35+1Raft-кластер имеет одного лидера, в который идет вся запись. Верно ли я понимаю, что для распределения нагрузки предлагается создать несколько кластеров на разных портах?
Да, хороший вариант.
Расскажите, пожалуйста, как вы тестировали вашу реализацию Raft? Доступны ли где-то исходники этих тестов?
У нас есть несколько основных сценариев, среди которых падение и выбор лидера, log compaction, передача большого лога (50мб) и прочие. Исходники тестов здесь. Так же есть планы тестировать саму логику raft отдельно от сети и остальной обвязки, но для этого нужен небольшой рефакторинг.
gridem
28.06.2016 18:07+2Это очень похоже на то, что описано в статье: Реплицируемый объект. Только там я использовал С++ и другой алгоритм консенсуса.
Хотел спросить: а отличаются ли операции чтения от операций записи? Все ли операции проходят через raft?bak
28.06.2016 18:48+1Это очень похоже на то, что описано в статье: Реплицируемый объект. Только там я использовал С++ и другой алгоритм консенсуса.
Отчасти я вашей статьёй и вдохновился :) Начал искать нечто подобное для питона и не нашёл. Кстати, вы уже дописали replobj? Опубликовали исходники? Бегло глянул вашу статью про masterless консенсус — выглядит интересно. Это ваш собственный алогритм? Какая у него лицензия?
а отличаются ли операции чтения от операций записи? Все ли операции проходят через raft?
Через raft проходят только операции записи, чтение происходит напрямую. Все методы изменяющие состояние помечаются декоратором replicated.gridem
28.06.2016 19:48+2Отчасти я вашей статьёй и вдохновился
На мой взгляд, это самый правильный подход для построения надежных распределенных систем.
Начал искать нечто подобное для питона и не нашёл.
Предлагаю поискать для других языков, чтобы убедиться, что такой подход никто не использует. По крайней мере, я ничего такого не нашел, когда начал этим заниматься.
Кстати, вы уже дописали replobj?
Вот здесь описан алгоритм:
Masterless Consensus Algorithm
Здесь код:
https://github.com/gridem/ReplobPrototype
Для верификации консенсуса использовал DAVE:
https://github.com/gridem/DAVE
Бегло глянул вашу статью про masterless консенсус — выглядит интересно. Это ваш собственный алогритм? Какая у него лицензия?
Да, собственный. Лицензия — Apache 2.0
onegreyonewhite
29.06.2016 03:12+1Интересный продукт получился.
Правда я бегло просмотрел код и не нашёл намёка на шифрование.Планируете ли добавить?bak
29.06.2016 09:51Да, шифрование и авторизация — нужная фича, в одной из следующих версий обязательно добавим. Сейчас у нас оно запускается во внутренней сети, поэтому сразу и не сделали.
estin
29.06.2016 11:39Очень интересно!
А двойное подчеркивание__counter
и__data
это корпоративная культура кода?
Почему одного подчеркивания не хватило что бы просто намекнуть?bak
29.06.2016 11:58+1Это общепринятый способ создания private членов класса в питоне. К таким полям не получится обратится извне на прямую.
estin
29.06.2016 12:32Ясно.
Мы же все знаем, что можно к ним обратиться если захотеть и бывает что они да же мешает )
Отсюда и появился вопрос из-за любопытства, зачем использовать двойное подчеркивание и добиваться мнимой инкапсуляции.
Думал что это корпоративный стиль.pahaz
07.07.2016 18:15мы же все понимаем, что абсолютной приватности не существует ни в одном языке. Можно пошариться по памяти и поисправлять байтики, но это не мешает нам использовать private в java :)
estin
07.07.2016 21:45Это верно )
Вот тут BDFL раскрывает тему немного "we're all adults here"
Просто давно не видел двойного подчеркивания, ковыряясь в исходниках крупных и не очень python open source проектов.
Вот интересно было применяется ли в wargaming двойное подчеркивание и это как то обоснованно.
ik62
Спасибо! Интересная библиотека, нужно будет попробовать.
Можно ли динамически добавлять новые узлы в кластер? И что происходит при split brain?
bak
Динамическое добавление пока не поддерживается. Планирую добавить в следующей версии, благо сам протокол raft это позволяет. При разделении кластера — в случае если в одном из подкластеров больше половины нод — этот подкластер будет работать. Если во всех подкластерах меньше половины машин — коммиты транзакций происходить не будут (хотя они по прежнему будут добавлятся в лог старого лидера). Как только кластер восстановится — произойдет выбор нового лидера (предпочтение будет отдано ноде с логом наибольшей длины) и попытка закоммитить имеющиеся транзакции.