Не так давно мне выпала честь сделать реализацию демонов (воркеров) на Node.js для использования во всех проектах, разрабатываемых нашей компанией. Мы часто разрабатываем проекты, где необходимо процессить и распределенно хранить видео файлы на нескольких серверах, так что нам нужно иметь готовый инструмент для этого.


Постановка задачи:


  • Разработка должна быть на Node.js. Мы давно используем эту платформу для разработки всех наших проектов, так что здесь это оправданный выбор.
  • Все ноды в кластере должны быть равнозначными. Не должно быть специального управляющего звена или мастера. В противном случае останов мастера может привести к останову всего кластера.
  • Задачи должны находиться в таблице MySQL. Это намного гибче и информативнее, чем использовать какой-нибудь MQ. Всегда можно получить доступ ко всем задачам, оценить очередь, переназначить их на другую ноду и т.д.
  • Каждый воркер должен уметь обрабатывать несколько задач одновременно.

Сразу привожу ссылку на GitHub того, что получилось: (https://github.com/pipll/node-daemons).


Запуск воркеров


Каждый воркер это отдельный процесс Node.js. Для создания процессов воркеров используется встроенный модуль cluster. Он также контролирует падения воркеров и заново их запускает.


Код запуска воркеров
'use strict';

const config = require('./config/config');

const _ = require('lodash');
const path = require('path');
const cluster = require('cluster');
const logger = require('log4js').getLogger('app');
const models = require('./models');

// Таймер для проверки количества воркеров и остановки мастера
let shutdownInterval = null;

if (cluster.isMaster) {
    // Запускаем воркеры
    _.each(config.workers, (conf, name) => {
        if (conf.enabled) {
            startWorker(name);
        }
    });
} else {
    // Инициализируем воркер
    let name = process.env.WORKER_NAME;
    let WorkerClass = require(path.join(__dirname, 'workers', name + '.js'));
    let worker = null;
    if (WorkerClass) {
        worker = new WorkerClass(name, config.workers[name]);
        // Запускаем воркер
        worker.start();
        // Подписываемся на событие, когда воркер остановлен
        worker.on('stop', () => {
            process.exit();
        });
    }
    // Подписываемся на события от мастера
    process.on('message', message => {
        if ('shutdown' === message) {
            if (worker) {
                worker.stop();
            } else {
                process.exit();
            }
        }
    });
}

// Shutdown
process.on('SIGTERM', shutdownCluster);
process.on('SIGINT', shutdownCluster);

// Метод запуска воркера
function startWorker(name) {
    let worker = cluster.fork({WORKER_NAME: name}).on('online', () => {
        logger.info('Start %s worker #%d.', name, worker.id);
    }).on('exit', status => {
        // Когда воркер был остановлен
        if ((worker.exitedAfterDisconnect || worker.suicide) === true || status === 0) {
            // Если воркер был остановлен контролируемо, то ни чего не делаем
            logger.info('Worker %s #%d was killed.', name, worker.id);
        } else {
            // Если воркер был остановлен неконтролируемо, то запускаем его еще раз
            logger.warn('Worker %s #%d was died. Replace it with a new one.', name, worker.id);
            startWorker(name);
        }
    });
}

// Метод остановки кластера
function shutdownCluster() {
    if (cluster.isMaster) {
        clearInterval(shutdownInterval);
        if (_.size(cluster.workers) > 0) {
            // Посылаем сигнал останова каждому воркеру
            logger.info('Shutdown workers:', _.size(cluster.workers));
            _.each(cluster.workers, worker => {
                try {
                    worker.send('shutdown');
                } catch (err) {
                    logger.warn('Cannot send shutdown message to worker:', err);
                }
            });
            // Ожидаем останов всех воркеров
            shutdownInterval = setInterval(() => {
                if (_.size(cluster.workers) === 0) {
                    process.exit();
                }
            }, config.shutdownInterval);
        } else {
            process.exit();
        }
    }
}

Я хотел бы обратить внимание на несколько моментов:


  • Для запуска воркера используется fork процесса, при этом в переменной окружения WORKER_NAME передается название запускаемого воркера.
  • Когда воркер неконтролируемо завершает работу, мы перезапускаем его.
  • Чтобы контролируемо остановить воркеры, мы посылаем им сигнал shutdown. Воркер реагирует на это событие и после окончания выполнения задачи делает process.exit().
  • Мастер наблюдает за количеством воркеров с помощью setInterval и когда все воркеры будут остановлены делает process.exit().

Компонент базового воркера


Этот компонент предназначен для выполнения периодических процессов и не работает с очередью задач.


Код базового воркера
'use strict';

const _ = require('lodash');
const Promise = require('bluebird');
const log4js = require('log4js');
const EventEmitter = require('events');
const WorkerStates = require('./worker_states');

class Worker extends EventEmitter {

    constructor(name, conf) {
        super();
        this.name = name;
        // Значения настроек по умолчанию
        this.conf = _.defaults({}, conf, {
            sleep: 1000 // Задержка между запусками
        });
        this.logger = log4js.getLogger('worker-' + name);
        // Флаг останова воркера
        this.stopped = true;
        // Таймер для задержки между запусками loop мотода
        this.timer = null;
        // Состояние воркера
        this.state = null;
    }

    // Метод запуска воркера
    start() {
        this.logger.info('Start');
        this.stopped = false;
        this.state = WorkerStates.STATE_IDLE;
        return this._startLoop();
    }

    // Метод останова воркера
    stop() {
        this.logger.info('Stop');
        this.stopped = true;
        if (this.state === WorkerStates.STATE_IDLE) {
            // Останавливаем таймер задержки
            if (this.timer) {
                clearTimeout(this.timer);
                this.timer = null;
            }
            this.state = WorkerStates.STATE_STOP;
            // Вызываем событие останова воркера
            this.emit('stop');
        }
    }

    // Метод для выполнения бизнес задач
    loop() {
        return Promise.resolve();
    }

    // Метод запуска и обработки loop метода
    _startLoop() {
        this.state = WorkerStates.STATE_WORK;
        return this.loop().catch(err => {
            this.logger.warn('Loop error:', err);
        }).finally(() => {
            this.state = WorkerStates.STATE_IDLE;
            if (!this.stopped) {
                // Делаем повторный запуск по окончании работы loop метода
                this.timer = setTimeout(() => {
                    this._startLoop();
                }, this.conf.sleep);
            } else {
                this.state = WorkerStates.STATE_STOP;
                // Вызываем событие останова воркера
                this.emit('stop');
            }
        });
    }

}

module.exports = Worker;

Код простейшего воркера может выглядеть так:


'use strict';

const Promise = require('bluebird');
const Worker = require('../components/worker');

class Sample extends Worker {

    loop() {
        this.logger.info("Loop method");
        return Promise.resolve().delay(30000);
    }

}

module.exports = Sample;

Некоторые особенности:


  • loop метод предназначен для наследования в потомках и реализации бизнес задач. Возвращаемым значением этого метода должен быть Promise.
  • По окончании работы loop метода он запускается заново через указанное в настройках воркера время.
  • Воркер имеет три состояния:
    • STATE_IDLE — во время паузы между запусками loop метода.
    • STATE_WORK — во время работы loop метода.
    • STATE_STOP — после останова воркера.

Компонент воркера обработки задач


Это основной компонент, предназначенный для параллельной обработки задач из таблицы MySQL.


Код воркера обработки задач
'use strict';

const config = require('../config/config');

const _ = require('lodash');
const Promise = require('bluebird');
const Worker = require('./worker');
const WorkerStates = require('./worker_states');
const models = require('../models');

class TaskWorker extends Worker {

    constructor(name, conf) {
        super(name, conf);
        // Значения настроек по умолчанию
        this.conf = _.defaults({}, this.conf, {
            maxAttempts: 3, // Максимальное количество попыток запуска задачи
            delayRatio: 300000, // Базовое значение отсрочки запуска задачи
            count: 1, // Максимальное количество одновременно обрабатываемых задач
            queue: '', // Название очереди для получения задач
            update: 3000 // Интервал обновления статуса задачи
        });
        // Счетчик одновременно обрабатываемых задач
        this.count = 0;
    }

    loop() {
        if (this.count < this.conf.count && !this.stopped) {
            // Получение очередной задачи
            return this._getTask().then(task => {
                if (task) {
                    // Увеличение счетчика одновременно обрабатываемых задач
                    this.count++;
                    // Запуск периодического обновления статуса задачи
                    let interval = setInterval(() => {
                        return models.sequelize.transaction(t => {
                            return task.touch({transaction: t});
                        });
                    }, this.conf.update);
                    // Запуск метода обработки задачи
                    this.handleTask(task.get({plain: true})).then(() => {
                        // Завершение задачи
                        return models.sequelize.transaction(t => {
                            return task.complete({transaction: t}).then(() => {
                                this.logger.info('Task completed:', task.id);
                            });
                        });
                    }).catch(err => {
                        // Задача не была выполнена - перезапуск задачи
                        this.logger.warn('Handle error:', err);
                        return this.delay(task).then(delay => {
                            return models.sequelize.transaction(t => {
                                return task.fail(delay, {transaction: t}).then(() => {
                                    this.logger.warn('Task failed:', task.id);
                                });
                            });
                        });
                    }).finally(() => {
                        clearInterval(interval);
                        this.count--;
                    }).done();
                    return null;
                }
            });
        } else {
            return Promise.resolve();
        }
    }

    // Метод обработки задачи
    handleTask() {
        return Promise.resolve();
    }

    // Метод вычисления отсрочки задачи после неуспешного запуска
    delay(task) {
        return Promise.resolve().then(() => {
            return task.attempts * this.conf.delayRatio;
        });
    }

    // Метод получения задачи для обработки
    _getTask() {
        return models.sequelize.transaction({autocommit: false}, t => {
            return models.Task.scope({
                method: ['forWork', this.conf.queue, config.node_id]
            }).find({transaction: t, lock: t.LOCK.UPDATE}).then(task => {
                if (task) {
                    return task.work(config.node_id, {transaction: t});
                }
            });
        });
    }

    _startLoop() {
        this.state = WorkerStates.STATE_WORK;
        return this.loop().catch(err => {
            this.logger.warn('Loop error:', err);
        }).finally(() => {
            if (this.count === 0) {
                this.state = WorkerStates.STATE_IDLE;
            }
            if (this.stopped && this.count === 0) {
                this.state = WorkerStates.STATE_STOP;
                this.emit('stop');
            } else {
                this.timer = setTimeout(() => {
                    this._startLoop();
                }, this.conf.sleep);
            }
        });
    }

}

module.exports = TaskWorker;

Код простейшего воркера может выглядеть так:


'use strict';

const Promise = require('bluebird');
const TaskWorker = require('../components/task_worker');

class Sample extends TaskWorker {

    handleTask(task) {
        this.logger.info('Sample Task:', task);
        return Promise.resolve().delay(30000);
    }

}

module.exports = Sample;

Особенности работы компонента:


  • Для получения задачи из базы данных используется конструкция SELECT ... FOR UPDATE и последующий UPDATE записи в базе данных в одной транзакции с отключенным автокоммитом. Это позволяет получить эксклюзивный доступ к задаче даже при одновременных запросах с нескольких серверов.
  • Во время обработки задачи запускается периодический процесс обновления статуса задачи в базе данных. Это необходимо чтобы отличить долго работающую задачу от неожиданно завершенной задачи без обновления статуса.
  • Статус обработанной задачи определяется статусом Promise возвращаемого методом handleTask. При успехе задача помечается как выполненная. В противном случае задача помечается как провальная и запускается с отсрочкой, задаваемой в методе delay.

Модель работы с задачами


Для работы с моделями базы данных используется модуль sequelize. Все задачи находятся в таблице tasks. Таблица имеет следующую структуру:


Поле Тип Описание
id integer, autoincrement ID задачи
node_id integer, nullable ID ноды, для которой предназначена задача
queue string Очередь задачи
status enum Статус задачи
attempts integer Количество попуток запуска задачи
priority integer Приоритет задачи
body string Тело задачи в JSON формате
start_at datetime, nullable Дата и время начала обработки задачи
finish_at datetime, nullable Дата и время окончания обработки задачи (аналог TTL)
worker_node_id integer, nullable ID ноды, которая начала обработку задачи
worker_started_at datetime, nullable Дата и время начала обработки задачи
checked_at datetime, nullable Дата и время обновления статуса работы задачи
created_at datetime, nullable Дата и время создания задачи
updated_at datetime, nullable Дата и время изменения задачи

Задача может быть назначена как конкретной ноде, так и любой ноде в кластере. Это регулируется полем node_id во время создания задачи. Задачу можно запустить с отсрочкой (поле start_at) и с ограничением временем обработки (поле finish_at).


Различные воркеры работают с различными очередями задач, задаваемыми в поле queue. Приоритет обработки задается в поле priority (чем больше, тем выше приоритет). Количество перезапусков задачи сохраняется в поле attempts. В теле задачи (поле body) в JSON формате передаются параметры, необходимые воркеру.


Поле checked_at выполняет роль признака работающей задачи. Значение его все время меняется во время работы задачи. Если значение поля checked_at долго не менялось, а задача находится в статусе working, то задача считается проваленной и статус ее меняется на failure.


Код модели работы с задачами
'use strict';

const moment = require('moment');

module.exports = function(sequelize, Sequelize) {

    return sequelize.define('Task', {
        id: {
            type: Sequelize.INTEGER,
            primaryKey: true,
            autoIncrement: true
        },
        node_id: {
            type: Sequelize.INTEGER
        },
        queue: {
            type: Sequelize.STRING,
            allowNull: false
        },
        status: {
            type: Sequelize.ENUM,
            values: ['pending', 'working', 'done', 'failure'],
            defaultValue: 'pending',
            allowNull: false
        },
        attempts: {
            type: Sequelize.INTEGER,
            defaultValue: 0,
            allowNull: false
        },
        priority: {
            type: Sequelize.INTEGER,
            defaultValue: 10,
            allowNull: false
        },
        body: {
            type: Sequelize.TEXT,
            set: function(body) {
                return this.setDataValue('body', JSON.stringify(body));
            },
            get: function() {
                try {
                    return JSON.parse(this.getDataValue('body'));
                } catch (e) {
                    return null;
                }
            }
        },
        start_at: {
            type: Sequelize.DATE
        },
        finish_at: {
            type: Sequelize.DATE
        },
        worker_node_id: {
            type: Sequelize.INTEGER
        },
        worker_started_at: {
            type: Sequelize.DATE
        },
        checked_at: {
            type: Sequelize.DATE
        }
    }, {
        tableName: 'tasks',
        freezeTableName: true,
        underscored: true,

        scopes: {
            forWork: function(queue, node_id) {
                return {
                    where: {
                        node_id: {
                            $or: [
                                null,
                                node_id
                            ]
                        },
                        queue: queue,
                        status: 'pending',
                        start_at: {
                            $or: [
                                null,
                                {
                                    $lt: moment().toDate()
                                }
                            ]
                        },
                        finish_at: {
                            $or: [
                                null,
                                {
                                    $gte: moment().toDate()
                                }
                            ]
                        }
                    },
                    order: [
                        ['priority', 'DESC'],
                        ['attempts', 'ASC'],
                        [sequelize.fn('IFNULL', sequelize.col('start_at'), sequelize.col('created_at')), 'ASC']
                    ]
                };
            }
        },

        instanceMethods: {
            fail: function(delay, options) {
                this.start_at = delay ? moment().add(delay, 'ms').toDate() : null;
                this.attempts = sequelize.literal('attempts + 1');
                this.status = 'failure';
                return this.save(options);
            },
            complete: function(options) {
                this.status = 'done';
                return this.save(options);
            },
            work: function(node_id, options) {
                this.status = 'working';
                this.worker_node_id = node_id;
                this.worker_started_at = moment().toDate();
                return this.save(options);
            },
            check: function(options) {
                this.checked_at = moment().toDate();
                return this.save(options);
            }
        }
    });

};

Жизненный цикл задач


Все задачи проходят следующий жизненный цикл:


  1. Новая задача создается со статусом pending.
  2. Когда наступает время обработки задачи, ее получает первый свободный воркер, переводит в статус working и заполняет поля worker_node_id и worker_started_at.
  3. Во время обработки задачи воркер с некоторой периодичностью (по умолчанию каждые 10 секунд) обновляет поле checked_at для нотификации о корректной работе.
  4. По окончании работы над задачей может быть несколько вариантов развития событий:
    4.1. Если задача успешно завершилась, то задача переводится в статус done.
    4.2. Если выполнение задачи провались, то задача переводится в статус failure, увеличивается количество attempts и происходит отсрочка запуска на заданное количество времени (вычисляется в методе delay основываясь на количестве попыток и настройке delayRatio).

В проекте так же есть встроенный модуль Manager, который запускается на каждой ноде и делает следующую обработку задач:


  1. Переводит зависшие задачи в статус failure.
  2. Переводит проваленные задачи в статус pending для новой обработки.
  3. Удаляет невыполненные задачи, срок запуска которых уже истек.
  4. Удаляет успешно завершенные задачи с отсрочкой в 1 час (может быть настроено).
  5. Удаляет проваленные завершенные задачи с израсходованным количеством попыток запуска с отсрочкой в 3 дня (может быть настроено).

Кроме того этот модуль работает с включением/отключением нод.


Код модуля менеджера
'use strict';

const _ = require('lodash');
const moment = require('moment');
const Promise = require('bluebird');
const Worker = require('../components/worker');
const models = require('../models');
const config = require('../config/config');

class Manager extends Worker {

    constructor(name, conf) {
        super(name, conf);
        this.conf = _.defaults({}, this.conf, {
            maxUpdate: 30000,       // 30 seconds
            maxCompleted: 3600000,  // 1 hour
            maxFailed: 259200000    // 3 days
        });
    }

    loop() {
        return models.sequelize.transaction(t => {
            return Promise.resolve()
                .then(() => {
                    return this._checkCurrentNode(t);
                })
                .then(() => {
                    return this._activateNodes(t);
                })
                .then(() => {
                    return this._pauseNodes(t);
                })
                .then(() => {
                    return this._restoreFrozenTasks(t);
                })
                .then(() => {
                    return this._restoreFailedTasks(t);
                })
                .then(() => {
                    return this._deleteDeadTasks(t);
                })
                .then(() => {
                    return this._deleteCompletedTasks(t);
                })
                .then(() => {
                    return this._deleteFailedTasks(t);
                });
        });
    }

    _checkCurrentNode(t) {
        return models.Node.findById(config.node_id, {transaction: t}).then(node => {
            if (node) {
                return node.check();
            }
        });
    }

    _activateNodes(t) {
        return models.Node.update({
            is_active: true
        }, {
            where: {
                is_active: false,
                checked_at: {
                    $gte: moment().subtract(2 * this.conf.sleep).toDate()
                }
            },
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Activate nodes:', count);
            }
        });
    }

    _pauseNodes(t) {
        return models.Node.update({
            is_active: false
        }, {
            where: {
                is_active: true,
                checked_at: {
                    $lt: moment().subtract(2 * this.conf.sleep).toDate()
                }
            },
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Pause nodes:', count);
            }
        });
    }

    _restoreFrozenTasks(t) {
        return models.Task.update({
            status: 'failure',
            attempts: models.sequelize.literal('attempts + 1')
        }, {
            where: {
                status: 'working',
                checked_at: {
                    $lt: moment().subtract(this.conf.maxUpdate).toDate()
                }
            },
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Restore frozen tasks:', count);
            }
        });
    }

    _restoreFailedTasks(t) {
        let where = [{status: 'failure'}];
        let conditions = this._failedTasksConditions();
        if (conditions.length) {
            where.push({$or: conditions});
        }

        return models.Task.update({
            status: 'pending',
            worker_node_id: null,
            worker_started_at: null
        }, {
            where: where,
            transaction: t
        }).spread(count => {
            if (count > 0) {
                this.logger.info('Restore failure tasks:', count);
            }
        });
    }

    _deleteDeadTasks(t) {
        return models.Task.destroy({
            where: {
                status: 'pending',
                finish_at: {
                    $lt: moment().toDate()
                }
            },
            transaction: t
        }).then(count => {
            if (count > 0) {
                this.logger.info('Delete dead tasks:', count);
            }
        });
    }

    _deleteCompletedTasks(t) {
        return models.Task.destroy({
            where: {
                status: 'done',
                checked_at: {
                    $lt: moment().subtract(this.conf.maxCompleted).toDate()
                }
            },
            transaction: t
        }).then(count => {
            if (count > 0) {
                this.logger.info('Delete completed tasks:', count);
            }
        });
    }

    _deleteFailedTasks(t) {
        let where = [
            {status: 'failure'},
            {checked_at: {
                $lt: moment().subtract(this.conf.maxFailed).toDate()
            }}
        ];
        let conditions = this._failedTasksConditions();
        if (conditions.length) {
            where.push({$or: conditions});
        }
        return models.Task.destroy({
            where: where,
            transaction: t
        }).then(count => {
            if (count > 0) {
                this.logger.info('Delete failed tasks:', count);
            }
        });
    }

    _failedTasksConditions() {
        let conditions = [];
        _.each(config.workers, (worker) => {
            if (worker.queue) {
                let item = {queue: worker.queue};
                if (worker.maxAttempts !== undefined) {
                    item.attempts = {
                        $lt: worker.maxAttempts
                    };
                }
                conditions.push(item);
            }
        });
        return conditions;
    }

}

module.exports = Manager;

Выводы и планы на будущее


В целом получился неплохой и достаточно надежный инструмент для работы с фоновыми задачами, которым мы можем поделиться с сообществом. Основные идеи и принципы, используемые в этом проекте, мы выработали за несколько лет работы над различными проектами.


В планах развития проекта:


  • Hotreload воркеров без перезапуска мастер процесса. Чтобы можно было обновлять код отдельных воркеров, не вмешиваясь в работу других.
  • Добавление информации о прогрессе задачи (поле progress) и добавление метода в модуль TaskWorker для обновления этой информации.
  • Создание различных интерфейсов для работы с задачами и нодами:
    • CLI
    • Web
    • API
  • Создание базовых воркеров:
    • Процессинг видео файлов.
    • Репликация файлов между серверами для надежного хранения.
  • Тесты для воркеров.

Буду рад конструктивной критике и отвечу на интересующие вопросы в комментариях.

Поделиться с друзьями
-->

Комментарии (26)


  1. napa3um
    21.06.2016 22:02
    +1

    Гляньте http://pm2.keymetrics.io/ и https://github.com/Unitech/pm2, там и поддержка systemd, и GUI, и CLI, и API, и hotreload, балансировка, защита от падений, управление кучей серверов одной командой (в том числе деплой на них, в конфиге на машине деплоящего разработчика достаточно указать только SSH-доступы подконтрольных серверов, все нужные пакеты для разворачивания pm2 он поставит сам). В общем, полное и универсальное решение (воркерами не обязаны быть исключительно скрипты для node.js, можно запускать любые программы), построенное на базе npm-инфраструктуры.


    1. noder
      21.06.2016 22:06

      Спасибо, обязательно посмотрю. Однако свой велосипед не просто так появился. Он эволюционировал из множества реализованных проектов и тесно связан с разрабатываемыми проектами.


      1. napa3um
        21.06.2016 22:20

        У вас получился некий «сервер приложений-демонов», под который нужно отдельно писать апплеты, отдельно реализовывать стратегии деплоя и обновления кода апплетов, а pm2 сразу предлагает универсальное решение, под управление которого можно будет отдать разрабатываемые сервисы вне зависимости от их происхождения и языка написания, и покрывающее сразу большой кусок задач в цикле разработки/эксплуатации ПО (DevOps). Такой единый инструмент для управления всем ландшафтом. Я ни в коем случае не говорю «зря делали», наоборот, говорю о том, что может получиться в итоге. Альтернативы всегда полезны.


    1. heilage
      22.06.2016 06:07

      Пробовали у себя. Поплевались, выбросили. pm2 при работе в режиме кластера напрочь забивает сетевой стек висящими соединениями. Как итог — серьезное увеличение числа 50х ответов. У nodejs и так-то кластерный модуль через пень-колоду написан, а pm2 еще и своих глюков добавляет. Рекомендовать его для хоть сколько-нибудь серьезных проектов не могу.
      И кстати, в исходный код pm2 лучше не смотреть людям с неустойчивой психикой.


      1. napa3um
        22.06.2016 09:35

        Проект развивается, возможно, вы плевались слишком давно. Сам с серьезными проблемами не сталкивался.


  1. napa3um
    21.06.2016 22:43

    Впрочем, ваш бойлерплейт скорее не про демонизацию, а про очередь задач и пул воркеров (а демонизировать можно и с помощью pm2, отдав ему работу по деплою, горячей перезагрузке и мониторингу нагрузки). Потому таки не очевидно, почему вы не выбрали архитектуру с шиной типа MQ, т.к. вместо активной мастер-ноды, раздающей задачи, вы сделали пассивную мастер-ноду с БД, у которой воркеры будут сами запрашивать задачи. Т.е., точка отказа таким решением не устраняется всё равно, а воркеры получаются неабстрагированными от хранилища задач. (Но это может быть, конечно, удобным в вашем проекте.)


    1. noder
      21.06.2016 22:56

      Впрочем, ваш бойлерплейт скорее не про демонизацию, а про очередь задач и воркеров (а демонизировать можно и с помощью pm2, отдав ему работу по деплою, горячей перезагрузке и мониторингу нагрузки).

      Вы правы, демонизация это только часть бойлерплейта. Упор сделан на обработке задач. Нужно подумать над интеграцией pm2 в проект.

      Потому таки неясно, почему вы не выбрали архитектуру с шиной типа MQ

      Одно из исходных условий — мониторинг и управление задачами из самого приложения (куда интегрируется этот проект). Соответственно хранение задач в базе данных — лучший, если не единственный, выбор. Конечно можно сделать гибридное решение, когда задачи находятся в базе данных, а нотификация о поступлении задач проходит через MQ. Но в этом случае появляется еще одна зависимость и все приходится дополнительно налаживать синхронизацию задач между базой и MQ.

      Т.е., точка отказа таким решением не устраняется всё равно, а воркеры получаются неабстрагированными от хранилища задач.

      На данном уровне отказоустойчивость базы данных принципиально не рассматривается. Если база упала, то падает и весь проект. Отказоустойчивость базы данных настраивается отдельно.


  1. galk_in
    21.06.2016 23:29
    +1

    Запрашиваемая конструктивная критика:

    1. Задачи должны находиться в таблице MySQL. Это намного гибче и информативнее, чем использовать какой-нибудь MQ.

      Создается впечатление, что в вашей команде просто не хватает экспертизы по работе с очередями.
    2. Не ясно почему написан boirplate, а не генератор
    3. Зачем использовать bluebird, когда в требованиях packege.json 4-ая нода с нативными промисами?
    4. Странный код стайл. Подключите eslint
    5. Если вы выкладываете в opensourse то следите за актуальность readme.md Это важнее чем статья на хабре.
    6. В некоторых местах кода вызывает недоверие

    7. Тестов нет. Travis и TDD


    И на последок главный вопрос, а что вы анализировали из готовых решений в opensource?


    1. noder
      22.06.2016 00:00

      Спасибо за критику.

      > Создается впечатление, что в вашей команде просто не хватает экспертизы по работе с очередями.
      Экспертиза по работе с очередями есть, нужна тесная интеграция работы с постановленными задачами в самом проекте. Про гибридное решение уже писал выше в комментарии.

      > Не ясно почему написан boirplate, а не генератор
      Генераторы не используем. Не думаю что это минус.

      > Зачем использовать bluebird, когда в требованиях packege.json 4-ая нода с нативными промисами?

      Еще начали использовать обертки над промисами с версии 0.10.x. Ипользовали Q, перешли к bluebird. У blurbid есть много «плюшек», таких как promisifyAll, delay и т.д. Здесь они пока не используются, но могут быть использованы в будущем.

      > Странный код стайл. Подключите eslint
      Что вас смутило в код стайле? eslint подключу обязательно.

      > Если вы выкладываете в opensourse то следите за актуальность readme.md Это важнее чем статья на хабре.
      До README.md еще не добрался. Сначала решил выложить статью на хабре.

      > изменяем переменную из окружения?
      Пример взят из документации log4js https://github.com/nomiddlename/log4js-node#configuration
      Можно конечно использовать метод configure, согласен. Нужно изменить.

      > куча then и не одного catch?
      Все верно, результат метода обрабатывается в базовом воркере. Это описано в статье.

      > Тестов нет. Travis и TDD
      Про тесты я так же в статье писал, что это в планах.

      > И на последок главный вопрос, а что вы анализировали из готовых решений в opensource?
      Как я уже писал выше, реализация эволюционировала из проектов, над которыми мы работали вот уже более 5-ти лет. Соответственно сформировались свои требования и были отработаны многие моменты. Так что изыскания я не проводил.


    1. ChALkeRx
      22.06.2016 01:08

      (Вне контекста самой статьи и остальных замечаний).

      Зачем использовать bluebird, когда в требованиях packege.json 4-ая нода с нативными промисами?


      У нативного Promise пока что есть некоторые проблемы с производительностью и потреблением памяти. На сервере в данный момент лучше всего использовать bluebird, даже без учёта его дополнительных плюшек.

      Плюс текущий API Node.js сам по себе промисы пока ещё не возвращает, а bluebird даёт promisifyAll.


      1. galk_in
        22.06.2016 08:31

        А где можно по-подробней об этом прочитать? Мы не сталкивались с такими проблемами.


        1. ChALkeRx
          22.06.2016 11:01

          https://github.com/petkaantonov/bluebird/tree/master/benchmark посмотрите, например.
          Ну и https://github.com/nodejs/node/issues/3414.


          Плюс в стандарте кое-чего не совсем гладко, а bluebird от него в этом месте отходит, и за счёт этого несколько выигрывает.
          См. https://github.com/promises-aplus/promises-spec/issues/179 и https://github.com/promises-aplus/promises-spec/issues/183.


        1. ChALkeRx
          22.06.2016 11:05

          С другой стороны, переопределять глобальный Promise я бы не стал.
          Я просто делаю const Promise = require('bluebird'); наверху каждого файла.


  1. dos
    21.06.2016 23:50

    Выше уже посоветовали pm2 для демонизации. Дополнительно хочу посоветовать посмотреть в сторону Seneca для построения распределенных воркеров.


    1. noder
      22.06.2016 00:06

      Спасибо, про Seneca впервые слышу. Обязательно посмотрю.


  1. skeletor
    22.06.2016 15:37

    А чем модуль cluster лучше pm2 (http://pm2.keymetrics.io/)?


    1. noder
      22.06.2016 16:27

      Как минимум cluster это встроенный модуль, а pm2 — сторонний.


      1. skeletor
        22.06.2016 16:45

        И что? Если сторонний механизм лучше встроенного, почему бы не использовать его?


        1. noder
          22.06.2016 17:07

          Если честно, я модуль pm2 даже не рассматривал. Для форка процессов я использовал cluster из-за простого API.


    1. heilage
      23.06.2016 06:25

      pm2 использует cluster-модуль внутри себя, поэтому «кто лучше» — вопрос неправильный. Помимо (коряво реализованных) управления процессами и балансировки запросов по процессам у pm2 есть масса других привлекательных плюшек, которые зачастую определяют выбор.


      1. skeletor
        23.06.2016 10:30

        А можно подробнее про корявость и плюшки? Мы сейчас используем pm2, поэтому и интересны все +\-.


        1. heilage
          23.06.2016 13:59

          Плюшки все указаны в readme, в их числе — авто-перезапуск при падении, софт-рестарт воркера при превышении лимитов по памяти или процессору, поддержка деплоя через репозиторий и прочее. При прочих равных — довольно вкусные возможности.

          Насчет корявостей — в cluster-модуле балансировка пакетов по воркерам может работать нестабильно. Не скажу за последние версии ноды, инфа актуальна на версии 0.12 — 4.2. До версии node 4.0 кстати кластер-модуль вообще не работал под нагрузкой — мастер-процесс, а также и god daemon (в случае пм2) просто падал напрочь через 5 минут вместе со всеми воркерами. Потом починили, вроде, но с pm2 (опять же под нагрузкой) началось жесткое забивание сетевого стека висящими коннектами. Предположительно из-за того, что при софтовом перезапуске воркера сокет для общения с god daemon-ом оставался висеть, но я в код настолько глубоко не залезал и не проверял, точно говорить не могу. Симптомы — частые неответы сервера и 50х ошибки. После замены pm2 и кластерного режима nodejs на балансировку nginx-ом по вручную созданным воркерам — исчезли почти все проблемы и общая стабильность значительно повысилась.


          1. ChALkeRx
            23.06.2016 14:15

            в их числе — авто-перезапуск при падении,


            Пусть в меня снова кинут тапком (как в прошлый раз), но нет.

            Это не основная фича pm2 и даже вообще не фича pm2. Если вам нужен только перезапуск при падении — не надо брать для этих целей pm2 или forever в продакшне, надо брать системные инструменты, которые обеспечивают цепочку ватчеров до ядра и до аппаратного таймера.

            Если вы разворачиваете что-то на pm2 — вам нужно убедиться, что сам pm2 перезапустится при его падении, точно так же, как в случае без pm2 вам надо было убедиться, что ваша программа перезапустится. pm2 этой задачи не решает.


            1. heilage
              23.06.2016 14:27

              Да, справедливо. Имел в виду перезапуск воркера, но это же можно сделать тремя строчками и используя голый кластерный модуль :) Промашка вышла.


          1. skeletor
            23.06.2016 14:20

            Насчёт плюшёк — не то, что написано в readme, а то, чем он лучше cluster'a. Мы используем ноду (6.1/6.2) и перезапуск через pm2. Обслуживаем в среднем 200-250 rps. Таких симптомов, как вы описали не замечал.


            1. heilage
              23.06.2016 14:28

              Отвечу то, же что и парой комментов выше — он не лучше кластера ничем, кроме наворотов сверху, потому что внутри себя pm2 непосредственно использует cluster.