image
Структурно-функциональная схема модуля

Хочу рассказать о разработанном и используемом в продакшне модуле Publisher Pulsar (github), который позволяет динамически синхронизировать действия процессов.

Например, есть множество (десятки или сотни) процессов, независимо друг от друга обращающихся к API Google Analytics с одного IP.

При этом, GA установлен лимит в 10 queries per second с одного IP.

Если не регулировать обращения к API, то постоянно будет возвращаться ошибка превышения лимита некоторыми процессами, и либо они не смогут выполнить свою задачу без данных API, либо будут снова и снова пытаться получить данные в цикле, создавая проблему для других процессов, увеличивая количество ошибок. То есть будет хаос и отсутствие прогнозируемости в части процента корректно получаемых данных и процента ошибок при обращениях к API.



И постольку возникает задача избежать превышения лимита RPS (QPS), чтобы все процессы могли корректно получать данные.

Данный модуль как раз справляется с этим, динамически корректируя параметры своей активности (возможно существуют другие реализации сходной логики для PHP, но они не были найдены).

Система призвана функционировать подобно пульсару — совершать регулярные («пульсирующие») рассылки подписчикам.

Общую структуру действий по использованию модуля можно описать так:

1. Указать параметры и запустить Пульсар как демон.

2. Настроить код процесса (Сервиса), обращающегося к API (прим. — выполняющего любое действие, которое необходимо синхронизировать), для коннекта к Пульсару, чтобы прежде выполнения действия (например, совершения запроса к API) процесс обращался бы к Пульсару и ждал разрешения на выполнение действия. И только после получения разрешения выполнял бы его.

В результате Пульсар согласно настройкам одновременно разрешает быть подписчиками только [например] 10 процессам (которые вышли из FIFO стека; т.е. 10-ти разрешили стать подписчиками, а остальные N находятся в ZMQ очереди).

И после того, как необходимое количество процессов стало подписчиками, им высылается разрешение, после которого они могут совершить свое действие (например, обратиться к API).

Таким образом, лимит будет соблюдаться независимо от количества параллельно работающих процессов (в пределах возможностей стека ZMQ).

3. После этого подписчик (исполнитель) должен послать в Пульсар сообщение о выполненном действии — присутствуют ли ошибки или все в порядке.

Т.к. если при выполнении действия присутствуют ошибки, связанные с количеством одновременно выполненных действий, то Пульсар может скорректировать свое поведение — временно, до нормализации ситуации (исчезновения ошибок) уменьшить число подписчиков, увеличить интервал между публикациями (разрешениями действий), или даже на время прекратить работу (в случае ошибки, требующей перерыва в действиях; например, превышения суточного лимита 403 DailyLimitExceeded).

1) Настройка и запуск Пульсара:

Есть режим из коробки и режим настройки. Из коробки предполагает в том числе дефолтные сокетные адреса, и это может подойти только в случае, если Пульсар и воркеры запускаются на одной машине.

Из коробки:
$pulsar = new \React\PublisherPulsar\Pulsar();
        
$publisherPulsarDto = new \React\PublisherPulsar\Inventory\PublisherPulsarDto();              
$publisherPulsarDto->setModuleName('react:pulsar'); //произвольное имя, характеризующее назначение Пульсара
$publisherPulsarDto->setReplyStackCommandName('php artisan react:pulsar-reply-stack'); // Вызов субсидиарного скрипта, выполняющего роль стека для исполнителей. Код этого скрипта не требует настроек, он приведен чуть ниже. В данном случае указан путь вызова консольной команды Laravel
$publisherPulsarDto->initDefaultPulsarSocketsParams();
        
$pulsar->setPublisherPulsarDto($publisherPulsarDto);
$pulsar->manage();

И опции настройки:
$publisherPulsarDto->setPulsationIterationPeriod(1); // количество секунд между публикациями (в результате размер будет не меньшим, чем указанный в этом параметре; и может быть большим при некоторых условиях)
$publisherPulsarDto->setSubscribersPerIteration(10); // количество подписчиков, которым высылается разрешение на действие (в т.ч. одновременное; и это одновременность или не одновременность зависит уже от кода процесса-исполнителя/подписчика)
$publisherPulsarDto->setPerformerContainerActionMaxExecutionTime(7); // количество секунд ожидания результирующих сообщений от исполнителей для возможной коррекции поведения
$publisherPulsarDto->setLogger(\Log::getMonolog());  // чтобы использовать имеющиеся StreamHandlers. Если не сделать set, то создаст новый Logger с выводом информации в STDOUT
$publisherPulsarDto->setMaxWaitReplyStackResult(7); // количество секунд ожидания подключения нужного количества подписчиков, указанного в свойстве subscribersPerIteration выше. Если за это время нужное количество не подключится к Стеку, то Пульсар запустит процесс имитации подключения исполнителей, чтобы добрать нужное количество в виде "фантомов" и продолжить работу

$pulsarSocketsParams = new \React\PublisherPulsar\Inventory\PulsarSocketsParamsDto();

//могут быть любые свободные порты
$pulsarSocketsParams->setReplyToReplyStackSocketAddress('tcp://127.0.0.1:6271');
$pulsarSocketsParams->setPushToReplyStackSocketAddress('tcp://127.0.0.1:6272');
$pulsarSocketsParams->setPublishSocketAddress('tcp://127.0.0.1:6273');
$pulsarSocketsParams->setPullSocketAddress('tcp://127.0.0.1:6274');
$pulsarSocketsParams->setReplyStackSocketAddress('tcp://127.0.0.1:6275');

$publisherPulsarDto->setPulsarSocketsParams($pulsarSocketsParams);
$pulsar->setPublisherPulsarDto($publisherPulsarDto);

$pulsar->manage();

Код скрипта ReplyStack:

$replyStack = new  \React\PublisherPulsar\ReplyStack();
$replyStack->startCommunication();

Note: важно, чтобы Пульсар был запущен раньше процессов, подключающихся к нему, иначе процессы будут стучаться в пустоту по адресам, которые еще не связаны с Пульсаром, и просто зависнут в ожидании ответа, который не придет никогда.

2) Настройка кода исполнителя (подписчика):

Включаем объект Performer пакета модуля в код процесса:

Из коробки:
$performer = new \React\PublisherPulsar\Performer();
 
$performerDto = new \React\PublisherPulsar\Inventory\PerformerDto();
$performerDto->setModuleName("YourServiceNameContainingPerformer"); // для понимания в логах какой тип исполнителей выполняет действие 

$performer->setPerformerDto($performerDto);
$performer->initDefaultPerformerSocketsParams();
 
$this->zmqPerformer = $performer;  

И опции настройки:

$performerDto->setLogger(\Log::getMonolog()); 
 
$performerSocketParams = new \React\PublisherPulsar\Inventory\PerformerSocketsParamsDto();

//эти адреса должны совпадать с адресами Пульсара в рамках ZMQ-парности (Publish/Subscribe, Push/Pull, Request/Reply)
$performerSocketParams->setPublisherPulsarSocketAddress('tcp://127.0.0.1:6273');
$performerSocketParams->setPushPulsarSocketAddress('tcp://127.0.0.1:6274');
$performerSocketParams->setRequestPulsarRsSocketAddress('tcp://127.0.0.1:6275');

$performerDto->setSocketsParams($performerSocketParams);

$performer->setPerformerDto($performerDto);

$this->zmqPerformer = $performer; 

И далее в необходимом месте, перед вызовом целевого действия, требующего синхронизации/координации, вызываем метод, отвечающий за получение разрешения от Пульсара:

$this->zmqPerformer->connectToPulsarAndWaitPermissionToAct();


3) После выполнения целевого действия необходимо отправить результирующее сообщение о том, возникли ли ошибки. Например в таком виде:


//имеет место ошибка превышения 10 QPS
if (strpos($e->getMessage(), GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED) !== false) {

    $actionResultWithError = new ActionResultingPushDto();

    $actionResultWithError->setActionCompleteCorrectly(false); 
    $actionResultWithError->setSlowDown(true);

    $actionResultWithError->setErrorMessage($e->getMessage());
    $actionResultWithError->setErrorReason(GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED);

    $this->zmqPerformer->pushActionResultInfo($actionResultWithError);

// превышен дневной лимит и необходимо на время уснуть
} elseif (strpos($e->getMessage(), GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED) !== false) {

    $actionResultWithError = new ActionResultingPushDto();

    $actionResultWithError->setActionCompleteCorrectly(false);

    $sleepForPeriod = new ErrorSleepForPeriod();
    $sleepForPeriod->setSleepPeriod((60 * 60 * 1000000)); //на час, в микросекундах
    $actionResultWithError->setSleepForPeriod($sleepForPeriod);

    $actionResultWithError->setErrorMessage($e->getMessage());
    $actionResultWithError->setErrorReason(GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED);

    $this->zmqPerformer->pushActionResultInfo($actionResultWithError);

//всё корректно
} else {

    $this->zmqPerformer->pushActionResultInfoWithoutPulsarCorrectionBehavior();

}

Это актуально также для случая, если к Пульсару подключена только часть процессов, а другая работает в произвольном хаотичном виде. И такая механика позволит снижать количество ошибок, создаваемых совместной активностью упорядоченных и неупорядоченных процессов.

***

При этом, как уже было сказано, модуль можно использовать для любой периодической передачи информации процессам. Для этого достаточно при инициализации засетить свой класс, отнаследованный от PublisherToSubscribersDto, содержащий логику управления процессами, которые его получат.

То есть при инициализации демона в пункте 1) добавить:

$publisherToSubscribersDto = new YourNameExtendedByPublisherToSubscribersDto(); 
$publisherToSubscribersDto->setYourProperty();

$publisherPulsarDto->setPublisherToSubscribersDto($publisherToSubscribersDto);

И этот объект будет передаваться процессам.

***

Основная часть кода была написана на прошлой работе в компании Adventum в рамках решения коммерческих задач и публикуется с ее разрешения.
Поделиться с друзьями
-->

Комментарии (11)


  1. msvn
    01.07.2016 15:07

    А есть ли что-то подобное для Node.js?


    1. jamset
      01.07.2016 16:42

      Если гуглить Token Bucket algorithm node.js, то находит

      https://github.com/jhurliman/node-rate-limiter

      и еще ряд других.


      1. msvn
        01.07.2016 16:44

        Да, спасибо!


  1. mrsoul
    01.07.2016 15:36

    Для решения похожей задачи в php есть библиотека https://github.com/bandwidth-throttle/token-bucket. Позволяет реализовать алгоритм «текущего ведра» (равная длительность между выдачей токенов). Есть два режима работы — когда подписчик завершает работу при отсутствии свободного токена, либо ждёт до появления следюущего.
    Правда в некоторых, редких, случаях модуль выдаёт больше токенов в секунду чем запрашивалось, победить так и не удалось. Проблему решаем возвращением отклонённого запроса в очередь.


    1. jamset
      01.07.2016 19:19

      Интересная библиотека!

      «Правда в некоторых, редких, случаях модуль выдаёт больше токенов в секунду чем запрашивалось, победить так и не удалось.»

      Хм… Интересно, почему так…

      Вот тут, кажется, центральная логика
      https://github.com/bandwidth-throttle/token-bucket/blob/master/classes/TokenBucket.php#L166

      И раз он выдает токен, когда не должен, значит $delta становится равной нулю или положительной тогда, когда должна быть отрицательной.

      Там перед этим есть вызов bcsub() [пусть и до 8-го знака] и округление до int(). Возможно, какие-то маленькие положительные float значения приводятся к нулю.

      Может быть можно было бы попробовать изменить условие:

      if ($delta < 0) {

      false

      }

      if ($delta <= 0){

      false

      }

      «Для решения похожей задачи»

      Да, можно было бы задействовать эту библиотеку. Но в ней нет динамического изменения количества пропускаемых «токенов» и динамического изменения периодичности (Rate) или засыпания всех процессов в зависимости от ответов API.

      Как нет и возможности посылать произвольные управляющие сигналы в процессы.

      И получается, описанный модуль шире по функциям и отличается внутренней механикой.

      Тут конечно интересный вопрос максимальной нагрузки на модуль: zmq укладывает запросы в стек, и по их документации, может быть более 2-х млн. сообщений в секунду (http://zeromq.org/results:more-precise-0mq-tests, с необходимостью сделать поправку на влияние скорости движения по Сети и разные типы обмена сообщениями).

      И тогда вопрос упрется только в оперативную память для большинства задач [ну, просто вообще редко когда нужны десятки и выше миллионов такого рода сообщений в секунду, ставящие под вопрос поведение zmq, и в этом случае будут использованы другие языки для реализации подобной системы.)]

      А в модуле token-bucket для глобального доступа (с разных хостов) используется Redis или Memcache, и обращение через, например, Predis->set().

      И насколько это загрузит сервер, где крутится хранилище, какое значение будет записано — тут могут быть подводные камни, которые тоже будут влиять на устойчивость/корректность системы.


      1. GoldJee
        10.11.2016 16:46

        Этот вывод можно сделать и из специальной теории относительности, вспомнив концепцию пространства Минковского, на которую автор неявно ссылается по ходу статьи.
        Мое сомнение заключается в том, что этот вывод подается как следствие мысленного эксперимента с двумя зеркалами. Но для меня такая причинно-следственная связь неочевидна.

        Будет здорово, если автор прокомментирует этот параграф, дав ссылку и пояснив свои логические заключения.


        1. KarimSI
          02.07.2016 07:11

          В RabbitMQ есть очередь со списком индетификаторов. Так же есть главный процесс php. Он получает пачку заданий (10 шт., ограничение через prefetch) и под каждое запускает свой форк.

          При этом вы запускаете форки на одной машине. Если же запросом не 10, а например 10 тыс, эффективнее их уже запускать параллельно на нескольких машинах, что и призван решить инструмент автора.

          Что касается количества воркеров — тут уже влияет скорость выполнения заданий. Если вам разрешено 10 запросов в секунду, а после получения ответа воркер производит, например, тяжелую аналитику, так что время выполнения запроса и последующей его обработки составит 2 секунды, можно использовать уже 20 воркеров.


        1. jamset
          02.07.2016 16:24

          Допустим у нас ограничение на 10 запросов в секунду, сколько воркеров в вашей системе нужно запустить? Просто зачем 2 млн., если ограничение на вызовы api гораздо меньше?


          Тут немножко другая логика.

          Из-за ограничения, мы не породим сразу 100500 процессов, а будем их поразжать по мере выполнения.


          Действительно, как и в комментарии KarimSI: представьте, что вы хотите еще более распараллелить выполнение заданий.

          Чтобы одновременно работали бы, например, 100 или 1000 процессов.

          а после получения ответа воркер производит, например, тяжелую аналитику

          Или просто какие-то более-менее затратные действия, коннекты к БД, обработку.

          И таким образом пока один процесс выполняет обработку другой, в следующую секунду, уже может обращаться к API. И получается, что нет простоя.

          При этом, распараллеливание возможно даже на одной машине. Просто нужно скорректировать механику получения данных (например id) нужных для работы воркера.

          Пульсар — это часть более крупной системы, которая позволяет создавать параллельные процессы через proc_open (а не fork), передавать им нужные данные, и при этом количество создаваемых процессов ограничивается количеством заданий из очереди и количеством выделенной для этих целей оперативной памяти.

          Эта система уже работает, код выложен, но релиз еще не сделан (нужно сделать небольшие правки и тесты).

          Она состоит из модуля Process&Load Manager и связки с Gearman (модуль Gearman Conveyor).

          В целом всё выглядит вот так:
          image

          В ее рамках можно в запускаемом по крону Клиенте добавить в очередь Gearman нужные задания. После чего по Крону запустить процесс, запускающий Process&Load Manager (Pm&Lm), который в свою очередь начинает создавать воркеров в пределах отведенного процента свободной оперативной памяти и количества заданий.

          Затем каждый из создаваемых воркеров коннектится к Gearman, получает задачу. После чего коннектится к Пульсару и выполняет ее в описанной в статье логике, с разрешения Пульсара.

          При этом, процесс может быть поставлен на паузу или прекращен Pm&Lm, и в таком случае сведения об этом отправляются в Клиент, чтобы задача была пересоздана и выполнена при следующей итерации запуска Pm&Lm.

          Gearman хорош тем, что это не просто очередь, но и система call-бэков. Что позволяет очень быстро реагировать на изменения в процессах.

          Например, процесс-воркер посылает сигнал о каком-либо состоянии выполнения/не выполнения на другую ноду, где крутится Gearman, там в обработчике колбэка происходит анализ и принимается решение: пересоздать, отправить уведомление, отправить в другую очередь (например, RabbitMQ) и соответственно, в другую систему обработки и так далее. То есть не нужно делать переодические запросы о проверке состояния.

          Но при этом очередь Gearman в таком использовании не сохраняема (т.е. упадет клиент, создавший задачи — все задачи будут удалены из очереди), и ее нельзя администрировать и просматривать как Rabbit, например.

          ***

          Если же использовать только модуль Пульсар, например, в форке, то получается, что можно взять не 10 id за раз, а столько, сколько позволяет оперативка (тут просто вопрос, сколько можно создать процессов, когда каждый из них может потреблять по разному, в зависимости от ответа API, например. И заранее это не предугадаешь. Поэтому и был написан Pm&Lm, чтобы защитить ноду от падения в связи с перегрузом оперативки).

          Так, наверное, можно сделать и сейчас, взяв не 10, а [условно] 100 id, если есть понимание, что памяти хватит. Т.к. token-bucket будет делать ограничение в 10 обращений в секунду.


        1. jamset
          03.07.2016 18:27

          Эта система уже работает, код выложен, но релиз еще не сделан

          Соответственно, до релиза, в продакшне не стоило бы использовать, т.к. могут вноситься изменения в названия методов и какие-то небольшие, но влияющие на API вещи.


    1. vinograd19
      10.11.2016 19:57

      А в данном случае не существует самого того пространства, по которому мы собираемся перемещаться

      Существует. Это обычное пространство. Примеры использования такого путешествия во времени: наблюдение света отдаленных галактик, годичные кольца. В обоих случаях есть значения dt/dx, то есть той самой скорости перемещения во времени, о которой вы писали.


  1. GerrAlt
    10.11.2016 16:15

    Возьмем два зеркала и расположим их противоположно одну над другой. Допустим, что луч света будет многократно отражаться между этими двумя зеркалами. Движение луча света будет происходить по вертикальной оси, при каждом отражении отмеряя время как метроном. Теперь начнем двигать наши зеркала по горизонтальной оси


    луч света двигается со скоростью света от зеркала к зеркалу, зеркала двигаются со скоростью света ортогонально направлению движения луча (иначе не понятно как должна бы получаться картинка похожая на пружину)

    Из описания эксперимента я так понял что луч должен в процессе эксперимента набирать скорость в том же направлении что и зеркала, иначе как мне кажется мы таки выдернем из под него зеркала и луч просто улетит, но за счет чего луч будет приобретать эту скорость?

    Т.е. за счет чего луч света приобретает скорость позволяющую ему менять свое положение в пространстве относительно зеркала (любого из двух) только по 1 оси?