В данной статье постараюсь раскрыть основные нюансы реализации системы демонов для PHP и научить консольные команды Yii2 демонизироваться.

Последние 3 года я занимаюсь разработкой и развитием достаточно большого корпоративного портала для одной группы компаний. Я, какие и многие, столкнулся с проблемой, когда решение задачи, которую требует бизнес, не укладывается ни в какие таймауты. Сделайте отчетик в excel на 300 тыс. строк, отправьте рассылку на 1500 писем и так далее. Естественно, такие задачи должны решаться фоновыми заданиями, демонами и crontab-ами. В рамках статьи я не буду приводить сравнение кронов и демонов, мы для решения подобных задач выбрали демонов. При этом важным требованием для нас стала возможность иметь доступ ко всему, что уже написано для бэкенда, соответственно, демоны должны быть продолжением фрейворка Yii2. По этой же причине нам не подошли уже готовые решения типа phpDaemon.

Под катом готовое решение для реализации демонов на Yii2, которое у меня вышло.

Тема демонов на PHP поднимается с завидной регулярностью (раз, два, три, а ребята из badoo даже перезапускают их без потери соединений). Возможно мой велосипед быстрый способ запустить демоны на популярном фреймворке будет полезен.

Немного основ


Для того, чтобы процесс стал демоном, нужно:
  1. Отвязать скрипт от консоли и стандартных потоков ввода-вывода;
  2. Завернуть исполнения основного кода в бесконечный цикл;
  3. Реализовать механизмы контроля над процессом.

Отвязываемся от консоли

Для начала закрываем стандартные потоки STDIN, STOUT, STDERR. Но PHP без них не может, поэтому первый открытый поток он сделает стандартным, так что откроем их в /dev/null.

if (is_resource(STDIN)) {
   fclose(STDIN);
   $stdIn = fopen('/dev/null', 'r');
}
if (is_resource(STDOUT)) {
    fclose(STDOUT);
    $stdOut = fopen('/dev/null', 'ab');
}
if (is_resource(STDERR)) {
    fclose(STDERR);
    $stdErr = fopen('/dev/null', 'ab');
}

Далее форкаем процесс и делаем форк основным процессом. Процесс донор — завершаем.
$pid = pcntl_fork();
if ($pid == -1) {
   $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
} elseif ($pid) {
   $this->halt(self::EXIT_CODE_NORMAL);
} else {
   posix_setsid();
}

Бесконечный цикл и контроль

Я думаю, с циклом все понято. А вот необходимые механизмы контроля стоит рассмотреть подробнее.

Фиксация уже запущенных процессов

Тут все просто — после запуска демон кладет в файл со своим названием свой PID, а при завершении своей работы этот файл сносит.

Обработка POSIX сигналов

Демон должен корректно обрабатывать сигналы от операционной системы, т.е. при получении сигнала SIGTERM должен плавно завершать свою работу. Достигается это несколькими вещами: первое, определяем функцию, которая будет обрабатывать полученные сигналы:

pcntl_signal(SIGTERM, ['MyClassName', 'mySignalHandlerFunction']);

Второе, в функцию обработки сигналов ставим присвоение некоторому статическому свойству класса значение true.
static function signalHandler($signo, $pid = null, $status = null)
{
   self::$stopFlag = true;
}

Ну и третье, наш бесконечный цикл теперь должен быть не такой уж бесконечный:
while (!self::$stopFlag) {
   pcntl_signal_dispatch();
}

Особенности обработки сигналов в разных версиях PHP
В PHP < 5.3.0 для распределения сигналов использовалась специальная директива declare(ticks = N). Где тик — это событие, которое случается каждые N низкоуровневых операций, выполненных парсером внутри блока declare. Распределение сигналов осуществлялось в соответствии с настройкой. Слишком маленькое значение приводило к провалу в производительности, а слишком большое — к несвоевременной обработке сигналов.

В PHP >= 5.3.0 появилась функция pcntl_signal_dispatch(), которую можно вызывать для ручного распределения сигналов, что мы и делаем после каждой итерации.
Ну и наконец, в PHP 7.1 станет доступно асинхронное распределение сигналов, что позволит почти мгновенно получать сигналы без оверхеда и ручного вызова функций.

Теперь при получении команды от операционной системы скрипт спокойно завершит текущую итерацию и выйдет из цикла.

Контроль за утечками памяти

К сожалению, если демон долго трудится без перезапуска — у него начинает течь память. Интенсивность утечки зависит от того, какие функции вы используете. Из нашей практики — наиболее сильно «текли» демоны, которые работают с удаленными SOAP-сервисами через стандартный класс SoapClient. Так что за этим нужно следить и периодически их перезапускать. Дополним наш цикл условием контроля за утечками:

while (!self::$stopFlag) {
   if (memory_get_usage() > $this->memoryLimit) {
      break;
   }
   pcntl_signal_dispatch();
}

Где же код для Yii?


Исходники выложены на Github — yii2-daemon, пакет также доступен для установки через composer.

Пакет состоит всего из 2-х абстрактных классов — базовый класс DaemonController и класс WatcherDaemonController.

DaemonController
<?php

namespace vyants\daemon;

use yii\base\NotSupportedException;
use yii\console\Controller;
use yii\helpers\Console;

/**
 * Class DaemonController
 *
 * @author Vladimir Yants <vladimir.yants@gmail.com>
 */
abstract class DaemonController extends Controller
{

    const EVENT_BEFORE_JOB = "beforeJob";
    const EVENT_AFTER_JOB = "afterJob";

    const EVENT_BEFORE_ITERATION = "beforeIteration";
    const EVENT_AFTER_ITERATION = "afterIteration";

    /**
     * @var $demonize boolean Run controller as Daemon
     * @default false
     */
    public $demonize = false;

    /**
     * @var $isMultiInstance boolean allow daemon create a few instances
     * @see $maxChildProcesses
     * @default false
     */
    public $isMultiInstance = false;

    /**
     * @var $parentPID int main procces pid
     */
    protected $parentPID;

    /**
     * @var $maxChildProcesses int max daemon instances
     * @default 10
     */
    public $maxChildProcesses = 10;

    /**
     * @var $currentJobs [] array of running instances
     */
    protected static $currentJobs = [];

    /**
     * @var int Memory limit for daemon, must bee less than php memory_limit
     * @default 32M
     */
    protected $memoryLimit = 268435456;

    /**
     * @var boolean used for soft daemon stop, set 1 to stop
     */
    private static $stopFlag = false;

    /**
     * @var int Delay between task list checking
     * @default 5sec
     */
    protected $sleep = 5;

    protected $pidDir = "@runtime/daemons/pids";

    protected $logDir = "@runtime/daemons/logs";

    private $stdIn;
    private $stdOut;
    private $stdErr;

    /**
     * Init function
     */
    public function init()
    {
        parent::init();

        //set PCNTL signal handlers
        pcntl_signal(SIGTERM, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGINT, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGHUP, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGUSR1, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGCHLD, ['vyants\daemon\DaemonController', 'signalHandler']);
    }

    function __destruct()
    {
        $this->deletePid();
    }

    /**
     * Adjusting logger. You can override it.
     */
    protected function initLogger()
    {
        $targets = \Yii::$app->getLog()->targets;
        foreach ($targets as $name => $target) {
            $target->enabled = false;
        }
        $config = [
            'levels' => ['error', 'warning', 'trace', 'info'],
            'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->getProcessName() . '.log',
            'logVars' => [],
            'except' => [
                'yii\db\*', // Don't include messages from db
            ],
        ];
        $targets['daemon'] = new \yii\log\FileTarget($config);
        \Yii::$app->getLog()->targets = $targets;
        \Yii::$app->getLog()->init();
    }

    /**
     * Daemon worker body
     *
     * @param $job
     *
     * @return boolean
     */
    abstract protected function doJob($job);

    /**
     * Base action, you can\t override or create another actions
     * @return bool
     * @throws NotSupportedException
     */
    final public function actionIndex()
    {
        if ($this->demonize) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
            } elseif ($pid) {
                $this->cleanLog();
                $this->halt(self::EXIT_CODE_NORMAL);
            } else {
                posix_setsid();
                $this->closeStdStreams();
            }
        }
        $this->changeProcessName();

        //run loop
        return $this->loop();
    }

    /**
     * Set new process name
     */
    protected function changeProcessName()
    {
        //rename process
        if (version_compare(PHP_VERSION, '5.5.0') >= 0) {
            cli_set_process_title($this->getProcessName());
        } else {
            if (function_exists('setproctitle')) {
                setproctitle($this->getProcessName());
            } else {
                \Yii::error('Can\'t find cli_set_process_title or setproctitle function');
            }
        }
    }

    /**
     * Close std streams and open to /dev/null
     * need some class properties
     */
    protected function closeStdStreams()
    {
        if (is_resource(STDIN)) {
            fclose(STDIN);
            $this->stdIn = fopen('/dev/null', 'r');
        }
        if (is_resource(STDOUT)) {
            fclose(STDOUT);
            $this->stdOut = fopen('/dev/null', 'ab');
        }
        if (is_resource(STDERR)) {
            fclose(STDERR);
            $this->stdErr = fopen('/dev/null', 'ab');
        }
    }

    /**
     * Prevent non index action running
     *
     * @param \yii\base\Action $action
     *
     * @return bool
     * @throws NotSupportedException
     */
    public function beforeAction($action)
    {
        if (parent::beforeAction($action)) {
            $this->initLogger();
            if ($action->id != "index") {
                throw new NotSupportedException(
                    "Only index action allowed in daemons. So, don't create and call another"
                );
            }

            return true;
        } else {
            return false;
        }
    }

    /**
     * Возвращает доступные опции
     *
     * @param string $actionID
     *
     * @return array
     */
    public function options($actionID)
    {
        return [
            'demonize',
            'taskLimit',
            'isMultiInstance',
            'maxChildProcesses',
        ];
    }

    /**
     * Extract current unprocessed jobs
     * You can extract jobs from DB (DataProvider will be great), queue managers (ZMQ, RabbiMQ etc), redis and so on
     *
     * @return array with jobs
     */
    abstract protected function defineJobs();

    /**
     * Fetch one task from array of tasks
     *
     * @param Array
     *
     * @return mixed one task
     */
    protected function defineJobExtractor(&$jobs)
    {
        return array_shift($jobs);
    }

    /**
     * Main Loop
     *
     * * @return boolean 0|1
     */
    final private function loop()
    {
        if (file_put_contents($this->getPidPath(), getmypid())) {
            $this->parentPID = getmypid();
            \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' started.');
            while (!self::$stopFlag) {
                if (memory_get_usage() > $this->memoryLimit) {
                    \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' .
                        getmypid() . ' used ' . memory_get_usage() . ' bytes on ' . $this->memoryLimit .
                        ' bytes allowed by memory limit');
                    break;
                }
                $this->trigger(self::EVENT_BEFORE_ITERATION);
                $this->renewConnections();
                $jobs = $this->defineJobs();
                if ($jobs && !empty($jobs)) {
                    while (($job = $this->defineJobExtractor($jobs)) !== null) {
                        //if no free workers, wait
                        if ($this->isMultiInstance && (count(static::$currentJobs) >= $this->maxChildProcesses)) {
                            \Yii::trace('Reached maximum number of child processes. Waiting...');
                            while (count(static::$currentJobs) >= $this->maxChildProcesses) {
                                sleep(1);
                                pcntl_signal_dispatch();
                            }
                            \Yii::trace(
                                'Free workers found: ' .
                                ($this->maxChildProcesses - count(static::$currentJobs)) .
                                ' worker(s). Delegate tasks.'
                            );
                        }
                        pcntl_signal_dispatch();
                        $this->runDaemon($job);
                    }
                } else {
                    sleep($this->sleep);
                }
                pcntl_signal_dispatch();
                $this->trigger(self::EVENT_AFTER_ITERATION);
            }

            \Yii::info('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' is stopped.');

            return self::EXIT_CODE_NORMAL;
        }
        $this->halt(self::EXIT_CODE_ERROR, 'Can\'t create pid file ' . $this->getPidPath());
    }

    /**
     * Delete pid file
     */
    protected function deletePid()
    {
        $pid = $this->getPidPath();
        if (file_exists($pid)) {
            if (file_get_contents($pid) == getmypid()) {
                unlink($this->getPidPath());
            }
        } else {
            \Yii::error('Can\'t unlink pid file ' . $this->getPidPath());
        }
    }

    /**
     * PCNTL signals handler
     *
     * @param $signo
     * @param null $pid
     * @param null $status
     */
    final static function signalHandler($signo, $pid = null, $status = null)
    {
        switch ($signo) {
            case SIGINT:
            case SIGTERM:
                //shutdown
                self::$stopFlag = true;
                break;
            case SIGHUP:
                //restart, not implemented
                break;
            case SIGUSR1:
                //user signal, not implemented
                break;
            case SIGCHLD:
                if (!$pid) {
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                while ($pid > 0) {
                    if ($pid && isset(static::$currentJobs[$pid])) {
                        unset(static::$currentJobs[$pid]);
                    }
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                break;
        }
    }

    /**
     * Tasks runner
     *
     * @param string $job
     *
     * @return boolean
     */
    final public function runDaemon($job)
    {
        if ($this->isMultiInstance) {
            $this->flushLog();
            $pid = pcntl_fork();
            if ($pid == -1) {
                return false;
            } elseif ($pid !== 0) {
                static::$currentJobs[$pid] = true;

                return true;
            } else {
                $this->cleanLog();
                $this->renewConnections();
                //child process must die
                $this->trigger(self::EVENT_BEFORE_JOB);
                $status = $this->doJob($job);
                $this->trigger(self::EVENT_AFTER_JOB);
                if ($status) {
                    $this->halt(self::EXIT_CODE_NORMAL);
                } else {
                    $this->halt(self::EXIT_CODE_ERROR, 'Child process #' . $pid . ' return error.');
                }
            }
        } else {
            $this->trigger(self::EVENT_BEFORE_JOB);
            $status = $this->doJob($job);
            $this->trigger(self::EVENT_AFTER_JOB);

            return $status;
        }
    }

    /**
     * Stop process and show or write message
     *
     * @param $code int -1|0|1
     * @param $message string
     */
    protected function halt($code, $message = null)
    {
        if ($message !== null) {
            if ($code == self::EXIT_CODE_ERROR) {
                \Yii::error($message);
                if (!$this->demonize) {
                    $message = Console::ansiFormat($message, [Console::FG_RED]);
                }
            } else {
                \Yii::trace($message);
            }
            if (!$this->demonize) {
                $this->writeConsole($message);
            }
        }
        if ($code !== -1) {
            \Yii::$app->end($code);
        }
    }

    /**
     * Renew connections
     * @throws \yii\base\InvalidConfigException
     * @throws \yii\db\Exception
     */
    protected function renewConnections()
    {
        if (isset(\Yii::$app->db)) {
            \Yii::$app->db->close();
            \Yii::$app->db->open();
        }
    }

    /**
     * Show message in console
     *
     * @param $message
     */
    private function writeConsole($message)
    {
        $out = Console::ansiFormat('[' . date('d.m.Y H:i:s') . '] ', [Console::BOLD]);
        $this->stdout($out . $message . "\n");
    }

    /**
     * @param string $daemon
     *
     * @return string
     */
    public function getPidPath($daemon = null)
    {
        $dir = \Yii::getAlias($this->pidDir);
        if (!file_exists($dir)) {
            mkdir($dir, 0744, true);
        }
        $daemon = $this->getProcessName($daemon);

        return $dir . DIRECTORY_SEPARATOR . $daemon;
    }

    /**
     * @return string
     */
    public function getProcessName($route = null)
    {
        if (is_null($route)) {
            $route = \Yii::$app->requestedRoute;
        }

        return str_replace(['/index', '/'], ['', '.'], $route);
    }

    /**
     *  If in daemon mode - no write to console
     *
     * @param string $string
     *
     * @return bool|int
     */
    public function stdout($string)
    {
        if (!$this->demonize && is_resource(STDOUT)) {
            return parent::stdout($string);
        } else {
            return false;
        }
    }

    /**
     * If in daemon mode - no write to console
     *
     * @param string $string
     *
     * @return int
     */
    public function stderr($string)
    {
        if (!$this->demonize && is_resource(\STDERR)) {
            return parent::stderr($string);
        } else {
            return false;
        }
    }

    /**
     * Empty log queue
     */
    protected function cleanLog()
    {
        \Yii::$app->log->logger->messages = [];
    }

    /**
     * Empty log queue
     */
    protected function flushLog($final = false)
    {
        \Yii::$app->log->logger->flush($final);
    }
}


WatcherDaemonController
<?php

namespace vyants\daemon\controllers;

use vyants\daemon\DaemonController;

/**
 * watcher-daemon - check another daemons and run it if need
 *
 * @author Vladimir Yants <vladimir.yants@gmail.com>
 */
abstract class WatcherDaemonController extends DaemonController
{
    /**
     * @var string subfolder in console/controllers
     */
    public $daemonFolder = 'daemons';

    /**
     * @var boolean flag for first iteration
     */
    protected $firstIteration = true;

    /**
     * Prevent double start
     */
    public function init()
    {
        $pid_file = $this->getPidPath();
        if (file_exists($pid_file) && ($pid = file_get_contents($pid_file)) && file_exists("/proc/$pid")) {
            $this->halt(self::EXIT_CODE_ERROR, 'Another Watcher is already running.');
        }
        parent::init();
    }

    /**
     * Job processing body
     *
     * @param $job array
     *
     * @return boolean
     */
    protected function doJob($job)
    {
        $pid_file = $this->getPidPath($job['daemon']);

        \Yii::trace('Check daemon ' . $job['daemon']);
        if (file_exists($pid_file)) {
            $pid = file_get_contents($pid_file);
            if ($this->isProcessRunning($pid)) {
                if ($job['enabled']) {
                    \Yii::trace('Daemon ' . $job['daemon'] . ' running and working fine');

                    return true;
                } else {
                    \Yii::warning('Daemon ' . $job['daemon'] . ' running, but disabled in config. Send SIGTERM signal.');
                    if (isset($job['hardKill']) && $job['hardKill']) {
                        posix_kill($pid, SIGKILL);
                    } else {
                        posix_kill($pid, SIGTERM);
                    }

                    return true;
                }
            }
        }
        \Yii::error('Daemon pid not found.');
        if ($job['enabled']) {
            \Yii::trace('Try to run daemon ' . $job['daemon'] . '.');
            $command_name = $job['daemon'] . DIRECTORY_SEPARATOR . 'index';
            //flush log before fork
            $this->flushLog(true);
            //run daemon
            $pid = pcntl_fork();
            if ($pid === -1) {
                $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() returned error');
            } elseif ($pid === 0) {
                $this->cleanLog();
                \Yii::$app->requestedRoute = $command_name;
                \Yii::$app->runAction("$command_name", ['demonize' => 1]);
                $this->halt(0);
            } else {
                $this->initLogger();
                \Yii::trace('Daemon ' . $job['daemon'] . ' is running with pid ' . $pid);
            }
        }
        \Yii::trace('Daemon ' . $job['daemon'] . ' is checked.');

        return true;
    }

    /**
     * @return array
     */
    protected function defineJobs()
    {
        if ($this->firstIteration) {
            $this->firstIteration = false;
        } else {
            sleep($this->sleep);
        }

        return $this->getDaemonsList();
    }

    /**
     * Daemons for check. Better way - get it from database
     * [
     *  ['daemon' => 'one-daemon', 'enabled' => true]
     *  ...
     *  ['daemon' => 'another-daemon', 'enabled' => false]
     * ]
     * @return array
     */
    abstract protected function getDaemonsList();

    /**
     * @param $pid
     *
     * @return bool
     */
    public function isProcessRunning($pid)
    {
        return file_exists("/proc/$pid");
    }
}


DaemonController

Это родительский класс для всех демонов. Вот минимальный пример демона:

<?php

namespace console\controllers\daemons;

use vyants\daemon\DaemonController;

class TestController extends DaemonController
{
    /**
     * @param $job
     *
     * @return boolean
     */
    protected function doJob($job)
    {
        //do some job
        return true;
    }

    /**
     * @return array
     */
    protected function defineJobs()
    {
       return [];
    }
}

Функция defineJobs() должна возвращать набор задач для выполнения. По-умолчанию ожидается, что она будет возвращать массив. Если вы хотите возвращать, скажем MongoCursor, потребуется еще переопределить defineJobExtractor(). Функция doJob() должна получать на вход одну задачу для выполнения, производить с ней необходимые операции и помечать данную задачу в источнике как отработанную, чтобы она не упала второй раз.

Возможные параметры и настройки:
  • demonize — данный параметр определяет будет ли скрипт демонизироваться или работать как консольное приложение. Параметр доступен для задания из консоли: --demonize=1
  • isMultiInstance и maxChildProcesses — определяет можно ли демону создавать свои собственные копии и какое их максимальное количество может одновременно работать. Данная функция позволяет выполнять несколько задач параллельно. doJob будет выполняться в дочерних процессах, а родительский процесс будет только делегировать задачи своим потомкам и следить, чтобы их количество не превышало допустимый максимум. Весьма полезно, если ресурсов сервера хватает для того, чтобы выполнять несколько достаточно продолжительных по времени задач. По-умолчанию такое поведение выключено. Параметры так же доступны из консоли: --isMultiInstance=1 --maxChildProcesses=2
  • memoryLimit — порог потребления демоном памяти, если демон в режиме ожидания превысит данный порог, то он благородно совершить сиппоку. Как уже было обозначено ранее, для уменьшения размера потребляемой демонами памяти в результате утечек.
  • sleep — время в секундах, на которое демон будет засыпать между проверками наличия задач. Демон отправится спать только если defineJob вернет empty и пока есть задачи демон спать не будет. Поэтому defineJobs не должна возвращать статический список задач, иначе демон будет молотить их без конца и отдыха.
  • pidDir и logDir — пути для хранения логов и pid-ов, поддерживают алиасы Yii. По-умолчанию "@runtime/daemons/pids" и "@runtime/daemons/logs"


Проблема потери соединений

При осуществлении операции fork() установленные в родительском процессе соединения перестают работать в дочерних процессах. Для того, чтобы избежать этой проблемы, после всех форков проставлен вызов функции renewConnections(). По-умолчанию, данная функция переподключает только Yii::$app->db, но вы можете переопределить ее, и добавить прочие источники, соединение с которыми нужно поддерживать в дочерних процессах.

Логгирование

Демоны перенастраивают стандартный логгер Yii под себя. Если вас не устраивает поведение по-умолчанию — переопределите функцию initLogger().

WatcherDaemonController

Это почти готовый демон-наблюдатель. Задача данного демона следить за другими демонами, запускать и останавливать их при необходимости. Он не может стартовать дважды, поэтому можно смело поставить его запуск в crontab. Для того, чтобы начать его использовать, нужно в console/controllers создать папку daemons и положить класс вида:

<?php

namespace console\controllers\daemons;

use vyants\daemon\controllers\WatcherDaemonController;

/**
 * Class WatcherController
 */
class WatcherController extends WatcherDaemonController
{

    protected $sleep = 10;

    /**
     * @return array
     */
    protected function getDaemonsList()
    {
        return [
            ['daemon' => 'daemons/test', 'enabled' => true]
        ];
    }
}

Требуется определить лишь одну функцию — getDaemonsList(), которая вернет список демонов за которыми нужно следить. В самом простом виде — это зашитый в код массив, но в таком случае вы не будете иметь возможности менять список «на лету». Положите список демонов в базу или отдельный файлик и получайте его каждый раз оттуда. В таком случае, watcher сможет включить или выключить демон без собственного перезапуска.

Заключение


В данный момент у нас более 50-ти демонов, выполняющих разнообразные задачи, начиная от отправки почтовых сообщений и заканчивая генерацией отчетов и актуализацией данных между разными системами.

Демоны работают с разными источниками задач — MySQL, RabbitMQ и даже удаленными веб-сервисами. Полет нормальный.
Безусловно, демоны на php не сравнятся с теми же демонами на Go. Но высокая скорость разработки, возможность повторного использования уже написанного кода и отсутствие необходимости учить команду другому языку перевешивают минусы.
Поделиться с друзьями
-->

Комментарии (38)


  1. oxidmod
    09.08.2016 19:01
    +4

    а чем не угодил супервизор и обычные команды yii-шные?


    1. vyants
      10.08.2016 09:00

      «Не угодили» не совсем та фраза. Это хорошее решение. Но я бы не сказал, что это сильно проще — консольный команды для правильной работы с супервизором все равно нужно «допилить» — ну как минимум они должны хэндлить PCNTLсигналы и корректно завершать текущую задачу.
      Для нас была важна потребность в параллельной обработке задач одним демоном. Да, супервизор может запускать несколько инстансов одной команды, но они всегда будут висеть в памяти (тут тоже можно поспорить, конечно, что лучше — лишний процесс в памяти или форк при необходимости), к тому же нужно будет решать проблему корректного распределения задач между ними (а у нас далеко не всегда есть возможность использовать нормальный менеджер очередей).


      1. oxidmod
        10.08.2016 09:19
        +1

        вам конечно в вашем проекте видней, но с моей колокольни прикрутить банальную очередь на редисе вообще не проблема, если к роликом или любым другим менеджером не хочется напрягаться. Демоны на то и демоны, что висят в памяти и переодически опрашивают очередь на наличие задач.
        та даже очередь через бд не проблема прикрутить.


        1. vyants
          10.08.2016 09:37

          В случае очереди на БД, если 2 инстанса одной консольной команды одновременно потянутся за задачами есть шанс, что оба получать одну и ту же задачу и она выполниться дважды. На Redis-е этого можно избежать, ну а тот же RabbitMQ этой проблемы лишен, задача дойдет только одному консумеру. Я поэтому и написал, что если нет возможности использовать нормальный менеджер очередей — эту проблему придется решать.
          У нас часть задач в очередях на БД, часть в RabbitMQ, не буду вдаваться в подробности, так было необходимо, поэтому для нас проблема актуальна.
          Вы, конечно, правы, для этих целей можно использовать и супервизор. Даже кронами можно обойтись. Наше решение — всего лишь еще один способ решения задачи.


          1. oxidmod
            10.08.2016 09:44

            есть у нас и пару воркеров на очереди в бд, делается это так:
            воркер делает запрос
            update queueTable set handled_by=«some unique id», begin_at=NOW() where handled_by is null limit=10;

            после этого селектит таски из по своему айдишнику.

            зы. если возможны падения независимые от вас, можно поставить еще воркер надзиратель, который обнуляет handled_by по истечению таймаута от begin_at
            ззы. и это все еще проще чем ваше решение
            зззы. но в целом не вижу смысла делать очереди на бд, когда есть редис… разве что система крутится на шаред хостинге, где только бд и доступна


          1. coh
            10.08.2016 10:56

            В случае очереди на БД, если 2 инстанса одной...

            Зависит от уровня изоляции, если поставить serialized и в транзакции ставить метку в строку, то один запрос отвалится с retry transaction


            1. coh
              10.08.2016 11:17

              Очереди в бд — не лучшее решение, но может пригодиться упрощенный псевдокод (применим к MySQL):

              // лучше иметь отдельный коннект для очереди
              $db->query('SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE');
              $db->beginTransaction();
              
              // время, когда считаем lock устаревшим
              $locktimeout = '2001-01-01 00:00:00';
              
              try{
               $row = $db->getRow('SELECT * from `queue` WHERE `lock` IS NULL OR `lock` < "'.$locktimeout.'" LIMIT 1');
               $db->update('queue' , ['lock' => date('Y-m-d H:i:s')], 'where `id`='.$row['id']);
               $db->commit();
              }catch (Exception $e){
                  $db->rollBack();
                  // deadlock, try restart transaction, как ожидаемое поведение MySQL
                  if($e->getCode() = 1213){
                      // принимаем меры
                  }
              }
              


              1. Hayate
                10.08.2016 11:25

                Да, очереди в БД это решение пока нет большой нагрузки.
                Можно ещё использовать вариант который был на одном проекте — выбирать задачи с pk кратным чему-то. Досталось мне в наследство, впоследствии переделал на нормальный вариант с брокером сообщений.


                1. AotD
                  10.08.2016 11:50

                  Хм, а SELECT FOR UPDATE чем плох?


                  1. Hayate
                    10.08.2016 11:52
                    +1

                    Процесс 1 будет ждать, чтобы потом узнать что эту задачу уже выполнил процесс 2. И это вместо того чтобы процесс 1 делал какую-то полезную работу.


                    1. AotD
                      10.08.2016 11:57

                      Хм, если предположить что выполнение задачи >> выборки её из очереди, то делаем
                      1. SELECT FOR UPDATE,
                      2. Ставим лок на эту запись через UPDATE,
                      3. Долго выполняем
                      Если два процесса одновременно делают запрос на выборку задачи второй будет ожидать UPDATE первого (N ms)


                      1. Hayate
                        10.08.2016 12:02

                        Ну да, я про это и говорю. Это плохо увязывается с масштабированием нагрузки. Если выбирать случайную запись по статусу, то процессы постоянно будут хватать одни и те же записи и ничего не делать. Допустим в supervisord прописано 40 воркеров, из которых 10-20 ждёт освобождения лока. А если наш стартап идёт в гору, количество задач растёт, увеличиваем количество воркеров, они не влазят на одну машину, разносим на разные. И там уже будут сплошные блокировки.
                        Пока задачи не очень критичные и их мало, такой вариант годится. Если задач много — то уже не особо.


                      1. oxidmod
                        10.08.2016 12:02

                        потому сначала пометка что задача взята в работу со сбросом этого статуса по истечению таймаута
                        после выполнения — пометка, что задача сделана


                        1. Hayate
                          10.08.2016 12:21

                          Проверено — будут постоянно брать одну и ту же всё равно. Один процесс уже взял, закоммитить не успел, и тут же второй берёт.


                          1. oxidmod
                            10.08.2016 12:38

                            он не берет. он делает первым же запросом апдейт N записей.
                            притом помечает их какимто уникальным айдишником. допустим так:

                            $uniqueId = uniqueid('w_', true);
                            $this->db->query(«update {$this-tableName} set locked_by='{$uniqueId}', locked_at=NOW(), status='in progress' where locked_by is null limit 10»);


                            а теперь мы уже спокойно можем сделать селект по $uniqueId обработать свои ивенты, а по завершению либо поставить статус complete либо вообще удалить записи, если история не нужна. Если не нужна история, то поле статуса тоже не нужно, достаточно locked_by

                            зы. дополнительно нужен крон, который будет раз в N минут сбрасывать лок с записей, если locked_at был слишком давно и статус ин прогресс


                            1. Hayate
                              10.08.2016 12:40

                              Ну в общем — на первое время и пока нет нагрузки.


                              1. oxidmod
                                10.08.2016 13:12

                                чем помешает нагрузка? я не спорю, что очередь в бд это костыль? я о том, что если нет возможности/желания использовать редис или нормальный броке, то бд вполне вариант


                          1. coh
                            10.08.2016 13:13

                            В том варианте, что я предложил: второй воркер, если возьмет ту же задачу, что и первый — не сможет повесить на нее лок, соответственно возьмет следующую по списку


                1. coh
                  10.08.2016 13:23

                  Скорее решение не «пока нет большой нагрузки», а пока нет сотен тысяч заданий.


  1. Pilat
    10.08.2016 05:52

    What is the reason for performing a double fork when creating a daemon?
    http://stackoverflow.com/questions/881388/what-is-the-reason-for-performing-a-double-fork-when-creating-a-daemon


    1. vyants
      10.08.2016 08:39

      posix_setsid() делает дочерний процесс лидером сессии и необходимость во втором форке пропадает.


  1. necromant2005
    10.08.2016 06:54

    pcntl хорошо работает только для самых простых приложений. Но в реальности крайне неприятно следить за конекшенами, перезапускать демоны, следить вообще за ними и распределять нагрузку. Еще из возможных неприятных проблем это то что syntax error or fatal error никак не хендлится. Ну поесть если пришила такая задача на которой приложение упало (причем желательно сразу), то через какое время будет лежать все. Медленно текущая память тоже не сильно принято, когда утекает сначала по 10-20 мб а потом это выливается в 2Гб демон который еле работает из постоянной попытки собрать мусор и выделить новую память.
    На порядок проще и стабильно работает связка: демон очередей (rabbit/gearman/redis pub/sub etc.) + supervizor.


    1. vyants
      10.08.2016 09:15
      +1

      Спасибо за комментарий. Видно, что Вы лично прошлись по этим граблям :)
      У нас сейчас работает 51 демон. Все работают с разным задачами, и многие из них весьма нетривиальны. Я бы не сказал что это «простое приложение». Проблему с коннекшенами мы решили и больше с ней не сталкиваемся. Руками мы ничего не завершаем и не перезапускаем, все делает Watcher, а команды ему мы даем из веб-интерфейса.
      Поясните, пожалуйста, какая именно проблема с распределением нагрузки у Вас возникала?
      По поводу перехвата ошибок, да проблема была в 5.6 (хотя некоторую часть неперехватываемых ошибок удавалось обрабатывать при помощи register_shutdown_function), но в 7-ке проблема уже не так актуальна. Если при выполнении задачи возникла ошибка, мы помечаем задачу, а дальше логи и newrelic, отлавливаем и фиксим.
      Проблема с утечками тоже учтена.


      1. necromant2005
        10.08.2016 09:50
        +2

        Проблема с конекшенами, это не только открыть — но и закрыть. Что иногда даже важнее. Любой зависший, упавший но не закрывший осенние варке будет держать соединение вечно — пока его не перезапустишь. И в какой-то момент можно наблюдать по 10-20к соединений к memcached / mongodb cluster, после которого для начала сразишься тюнить систему разрешая больше соединений, больше открытых файлов. Но в конце концов это работает плохо.

        Касательно отлова ошибок: shutdown_function не вызовется когда варвар сожрал всю оперативку и упал к примеру или unrecoverable fatal. И это крайне неприятно, потому что и поместить задачу ты не можешь. Она просто не доходит до конца.

        После долгой борьбы мы пришли к более простому понимаю что такое воркер: воркер это независимая команда. А если она независимая то нет разницы запускать через fork or php -f. Но с другой стороны запуская через php -f, ты получаешь все бонусы 100% независимой команды, все соединия открываются и закрываются самим php, нет зависимости от предидущих глобальных переменных и значений, этот подход позволяет утилизировать память по максимуму.

        Benefits:
        — ловля всех ошибок!
        — потребление памяти воркерами сократилось в 10 раз
        — срочность процессинга выросла в 3.5 раза (из-за того что меньше оперативки можно отключить GC)
        — простота разработки
        — простота connection management
        — не течетет блин вообще никак (сам manager жрет 8Mb оперативки и живет месяцами)
        — и как приятный бонус не нужно ставить pcntl ;)
        Drawbacks:
        — дополнительные 20 мс на запуск внешнего интерпретатора


  1. Hayate
    10.08.2016 08:36

    В какой версии PHP наблюдаются утечки? PHP7 пробовали?


    1. vyants
      10.08.2016 08:45
      +1

      Во всех. Мы перешли на PHP 7 почти сразу после релиза, на нем получше, но от утечек никуда не денешься. Сильнее течет память в extensions.
      Даже fpm воркеры рекомендуется перезапускать (pm.max_requests)


      1. Electronick
        10.08.2016 09:25
        +1

        У меня есть простенький демон на silex, работает месяцами, стучит курлом к внешнему ресурсу, собирает данные и обновляет некий срез. Стабильно работает месяцами, иногда я его перезапускаю «на всякий случай», но утечек за ним я не наблюдал. и там точно не php 7. Полагаю, что ваши утечки – это проблемы из вашего же кода/фреймворка. Просто Yii не выглядит чем-то не «рожденным чтобы умереть».


      1. oxidmod
        10.08.2016 09:25

        думаю рекомендуется их перезапускать от греха и кривых рук програмистов подальше. вообще у нас используются воркеры на базе команд и супервизора, с утечками не сталкивались.
        зы. работает около 700-800 разных воркеров по 1-100 инстансов
        ззы. еще первая yii


        1. vyants
          10.08.2016 09:45

          Кажется утечки как раз и дело «кривых рук».
          Я бы не сказал, что они нас сильно беспокоят. За все время сильно утекала память при только использовании SoapClient, но после 7.0.5 проблему больше не замечали. Защита от утечек довольно простая, так что лучше подстраховаться и не ждать, пока нарвешься на какую-то сильно «подтекающую» функцию.


          1. oxidmod
            10.08.2016 09:57

            безусловно, если можно подстраховаться, то стоит это делать) я лишь имел ввиду что перезапуск воркеров это не необходимое условие для их нормальной работы


  1. PaulZi
    10.08.2016 09:41

    У меня утечка памяти была как то при работе с Imagine. Помог ручной вызов gc_collect_cycles().


  1. He11ion
    10.08.2016 09:49

    Вообще интересное решение, но как-то не в идеологии php в целом. У нас все подобные задачи решаются кроном/другими языками.


    1. OnYourLips
      10.08.2016 14:30
      +1

      PHP давно перестал быть шаблонизатором.


      1. oxidmod
        10.08.2016 14:58
        +1

        но пока всеже умирает. это не плохо, но не очень согласуется с долгоживущими концепциями)


        1. OnYourLips
          10.08.2016 15:51

          Сейчас уже программист решает, «умирает» или «не умирает».
          Время, когда такого выбора не было, давно прошло.


  1. sergebezborodov
    10.08.2016 10:15
    +1

    в проекте аналогичные задачи, но обошлись классикой beanstalk+worker все это дело крутится в supervisord
    все гораздо проще и прозрачнее, без каких либо форков https://github.com/sergebezborodov/beanstalk-yii2


  1. vyants
    10.08.2016 12:07

    Beanstalk не пробовал, спасибо, пощупаем. Пробовали persistent вариант?


    1. sergebezborodov
      10.08.2016 17:05

      persistent — нет, все откладывал настройку, но в итоге за три года работы beanstalk так ни разу и не упал