Большинство Node-объектов — вроде HTTP-запросов, ответов и потоков (streams) — реализуют модуль EventEmitter, благодаря которому они могут генерировать и прослушивать события.


const EventEmitter = require('events')

Простейшая форма управления по событиям — это callback-стиль некоторых популярных Node.js-функций, к примеру fs.readFile. По этой аналогии событие генерируется однократно (когда Node готов к вызову коллбэка), а коллбэк действует как обработчик события. Давайте сначала разберём эту базовую форму событийно-управляемой архитектуры.


Вызови меня, когда будешь готов, Node!


Изначально Node обрабатывал асинхронные события с помощью коллбэков. Это было давно, ещё до того как в JavaScript появилась нативная поддержка промисов и фича async/await. Коллбэки — это просто функции, которые вы передаёте другим функциям. Такое возможно в JavaScript, потому что функции — это объекты первого класса.


Важно понимать, что коллбэки не индикаторы асинхронного вызова в коде. Функция может вызывать коллбэк как синхронно, так и асинхронно. Например, хост-функция fileSize принимает коллбэк-функцию cb, причём вызывает её синхронно или асинхронно в зависимости от условия:


function fileSize (fileName, cb) {
  if (typeof fileName !== 'string') {
    return cb(new TypeError('argument should be string')); // Sync
  }
  fs.stat(fileName, (err, stats) => {
    if (err) { return cb(err); } // Async
    cb(null, stats.size); // Async
  });
}

Это плохой подход, приводящий к неожиданным ошибкам. Создавайте такие хост-функции, которые принимают коллбэки либо всегда синхронно, либо всегда асинхронно.


Давайте разберём простой пример типичной асинхронной Node-функции, написанной в коллбэк-стиле:


const readFileAsArray = function(file, cb) {
  fs.readFile(file, function(err, data) {
    if (err) {
      return cb(err);
    }
    const lines = data.toString().trim().split('\n');
    cb(null, lines);
  });
};

readFileAsArray берёт путь файла и коллбэк-функцию. Считывает содержимое файла, разбивает на массив строк и вызывает применительно к этому массиву коллбэк-функцию. Вот как это можно использовать. Допустим, файл numbers.txt лежит в одной директории с таким контентом:


10
11
12
13
14
15

Если у нас есть задача посчитать числа в этом файле, то для упрощения кода можно воспользоваться readFileAsArray:


readFileAsArray('./numbers.txt', (err, lines) => {
  if (err) throw err;
  const numbers = lines.map(Number);
  const oddNumbers = numbers.filter(n => n%2 === 1);
  console.log('Odd numbers count:', oddNumbers.length);
});

Этот код читает в массиве строк числовой контент, парсит его как числа и выполняет подсчёт.
Здесь работает характерный для Node коллбэк-стиль. У коллбэка есть error-first-аргумент err, который может принимать значение null. Мы передаём этот коллбэк в качестве последнего аргумента хост-функции. Всегда делайте так в своих функциях, потому что пользователи наверняка будут на это рассчитывать. Пусть ваша хост-функция получает коллбэк в виде последнего аргумента, и пусть коллбэк ожидает в качестве своего первого аргумента error-объект.


Современные JS-альтернативы коллбэкам


В современном JavaScript есть такие объекты, как промисы. Они могут быть альтернативой коллбэкам в случае асинхронных API. Вместо передачи коллбэка в качестве аргумента и обработки ошибки в том же месте промис позволяет отдельно обрабатывать успешные и ошибочные ситуации, а также соединять несколько асинхронных вызовов в цепочки, а не делать их вложенными.


Если функция readFileAsArray поддерживает промисы, то мы можем использовать её следующим образом:


readFileAsArray('./numbers.txt')
  .then(lines => {
    const numbers = lines.map(Number);
    const oddNumbers = numbers.filter(n => n%2 === 1);
    console.log('Odd numbers count:', oddNumbers.length);
  })
  .catch(console.error);

Вместо передачи коллбэка мы вызываем функцию .then применительно к возвращаемому значению хост-функции. Обычно .then даёт нам доступ к тем же строкам массива, которые мы получаем в коллбэк-версии, поэтому можем работать как раньше. Для обработки ошибок добавим вызов .catch применительно к результату, что обеспечит нам доступ к ошибке, если она возникнет.


Благодаря новому объекту Promise в современном JavaScript стало легче реализовать поддержку промис-интерфейса хост-функцией. Вот функция readFileAsArray, модифицированная так, чтобы она поддерживала промис-интерфейс в дополнение к уже поддерживаемому коллбэк-интерфейсу:


const readFileAsArray = function(file, cb = () => {}) {
  return new Promise((resolve, reject) => {
    fs.readFile(file, function(err, data) {
      if (err) {
        reject(err);
        return cb(err);
      }
      const lines = data.toString().trim().split('\n');
      resolve(lines);
      cb(null, lines);
    });
  });
};

Функция возвращает объект Promise, в который обёртывается асинхронный вызов fs.readFile. У промиса два аргумента: функции resolve и reject. Если нам нужно вызвать коллбэк с ошибкой, то используем промис-функцию reject, а для коллбэка с данными — промис-функцию resolve.


Единственное отличие заключается в том, что нам нужно иметь значение по умолчанию для коллбэк-аргумента на тот случай, если код используется с промис-интерфейсом. Например, в качестве аргумента можно использовать простую, по умолчанию пустую функцию () => {}.


Применение промисов с помощью async/await


Добавление промис-интерфейса позволяет гораздо легче работать с вашим кодом, если нужно использовать асинхронную функцию в цикле. С коллбэками ситуация усложняется. Промисы немного улучшают положение дел, как и генератор функций. Иными словами, более свежая альтернатива для работы с асинхронным кодом — функция async. Она позволяет обращаться с асинхронным кодом как с синхронным, что сильно улучшает читабельность кода.


Вот как можно использовать функцию readFileAsArray с помощью async/await:


async function countOdd () {
  try {
    const lines = await readFileAsArray('./numbers');
    const numbers = lines.map(Number);
    const oddCount = numbers.filter(n => n%2 === 1).length;
    console.log('Odd numbers count:', oddCount);
  } catch(err) {
    console.error(err);
  }
}
countOdd();

Сначала создаём асинхронную функцию — обычную функцию со словом async в начале. Внутри неё мы вызываем функцию readFileAsArray, словно она возвращает переменную lines, и для этого мы используем ключевое слово await. Если вызов readFileAsArray был синхронным, то продолжаем код. Чтобы выполнить получившееся, мы исполняем функцию async. Так получается просто и читабельно. Для работы с ошибками нам нужно обернуть вызов async в выражение try/catch.


Благодаря фиче async/await нам не потребовался специальный API (вроде .then и .catch). Мы лишь иначе маркировали функции и взяли чистый JavaScript.


Мы можем использовать async/await с любой функцией, поддерживающей промис-интерфейс. Но не можем — с асинхронными функциями в коллбэк-стиле (например, setTimeout).


Модуль EventEmitter


EventEmitter — это модуль, содействующий коммуникации между объектами в Node. Он является ядром асинхронной событийно-управляемой архитектуры. Многие из встроенных в Node модулей наследуют от EventEmitter.


Его идея проста: emitter-объекты генерируют именованные события, которые приводят к вызову ранее зарегистрированных прослушивателей. Так что у эмиттера есть две основные функции:


  • Генерирование именованных событий.
  • Регистрация и дерегистрация функций-прослушивателей.

Для работы с EventEmitter нужно создать расширяющий его класс.


class MyEmitter extends EventEmitter {

}

Эмиттеры — это то, что мы инстанцируем из классов на основе EventEmitter:


const myEmitter = new MyEmitter();

В любой момент жизненного цикла эмиттеров мы можем воспользоваться функцией emit и сгенерировать любое именованное событие.


myEmitter.emit('something-happened');

Генерирование события — это сигнал того, что соблюдено какое-то условие. Обычно речь идёт об изменении состояния генерирующего объекта. С помощью метода on можно добавить функции-прослушиватели, которые будут исполняться каждый раз, когда эмиттеры генерируют свои ассоциированные именованные события.


События !== асинхронность


Взгляните на пример:


const EventEmitter = require('events');

class WithLog extends EventEmitter {
  execute(taskFunc) {
    console.log('Before executing');
    this.emit('begin');
    taskFunc();
    this.emit('end');
    console.log('After executing');
  }
}

const withLog = new WithLog();

withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));

withLog.execute(() => console.log('*** Executing task ***'));

Класс WithLog — это эмиттер. Он определяет один экземпляр функции execute. Она получает один аргумент — функцию задачи (task function) — и оборачивает её исполнение в log-выражения. События генерируются до и после исполнения.


Чтобы увидеть, в какой очерёдности всё работает, зарегистрируем прослушивателей для именованных событий и выполним пример задачи по запуску всей цепочки.


Результат:


Before executing
About to execute
*** Executing task ***
Done with execute
After executing

Что я хочу отметить касательно результата исполнения кода: здесь нет ничего асинхронного.


  • Сначала получаем строку «Before executing».
  • Затем именованное событие begin приводит к появлению строки «About to execute».
  • Далее реально исполняемая строка генерирует строку «*** Executing task ***».
  • Потом именованное событие end приводит к появлению строки «Done with execute».
  • В конце получаем строку «After executing».

Совсем как старые добрые коллбэки, не предполагающие, что события характерны для синхронного или асинхронного кода. Это важно, потому что если мы передаём в execute асинхронную taskFunc, то генерируемые события больше не будут точны.


Можно эмулировать эту ситуацию с помощью вызова setImmediate:


// ...

withLog.execute(() => {
  setImmediate(() => {
    console.log('*** Executing task ***')
  });
});

Теперь результат будет такой:


Before executing
About to execute
Done with execute
After executing
*** Executing task ***

Это неправильно. Строки после асинхронного вызова, приводящие к появлению вызовов «Done with execute» и «After executing», появляются в неправильной очерёдности.


Для генерирования события после завершения асинхронной функции нам нужно скомбинировать коллбэки (или промисы) с этой событийно-управляемой коммуникацией. Это демонстрируется на нижеприведённом примере.


Одно из преимуществ использования событий вместо обычных коллбэков — то, что мы можем много раз реагировать на один и тот же сигнал благодаря определению многочисленных прослушивателей. Чтобы сделать то же самое с помощью коллбэков, придётся написать больше логики внутри одного доступного коллбэка. События — прекрасный способ реализовать многочисленные внешние плагины, добавляющие функциональность к ядру приложения. Можно считать их «разъёмами» для кастомизации поведения при изменении состояния.


Асинхронные события


Давайте преобразуем наш синхронный пример в нечто асинхронное и немного более полезное.


const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    this.emit('begin');
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    });
  }
}

const withTime = new WithTime();

withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));

withTime.execute(fs.readFile, __filename);

Класс WithTime исполняет asyncFunc и с помощью вызовов console.time и console.timeEnd сообщает о времени, затраченном этой asyncFunc. Он генерирует правильную последовательность событий до и после исполнения. Также он генерирует error/data-события для работы с обычными сигналами асинхронных вызовов.


Протестируем эмиттер withTime, передав ему вызов асинхронной функции fs.readFile. Вместо обработки данных из файла с помощью коллбэка мы теперь можем прослушивать data-событие.


Выполнив этот код, мы, как и ожидалось, получаем правильную последовательность событий, а также отчёт о времени выполнения:


About to execute
execute: 4.507ms
Done with execute

Обратите внимание, что для этого нам нужно было скомбинировать коллбэк с эмиттером. Если бы asynFunc также поддерживала и промисы, то всё то же самое можно было бы реализовать с помощью async/await:


class WithTime extends EventEmitter {
  async execute(asyncFunc, ...args) {
    this.emit('begin');
    try {
      console.time('execute');
      const data = await asyncFunc(...args);
      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    } catch(err) {
      this.emit('error', err);
    }
  }
}

Не знаю, как для вас, но для меня это выглядит гораздо читабельнее, чем код на основе коллбэков или строк с .then/.catch. Фича async/await максимально приближает нас к JavaScript, что я считаю большим достижением.


Аргументы событий и ошибки


В предыдущем примере было два события, сгенерированных с дополнительными аргументами. Error-cобытие сгенерировано error-объектом.


this.emit('error', err);

Data-cобытие сгенерировано data-объектом.


this.emit('data', data);

После именованного события мы можем использовать столько аргументов, сколько нужно, и все они будут доступны внутри функций-прослушивателей, которые мы зарегистрировали для этих именованных событий.


Например, для работы с data-событием зарегистрированная функция-прослушиватель получит доступ к data-аргументу, который был передан сгенерированному событию. И этот data-объект — именно то, что предоставляет asyncFunc.


withTime.on('data', (data) => {
  // do something with data
});

Обычно событие error специальное. В примере с коллбэками — если мы не обрабатываем error-событие с помощью прослушивателя, то Node-процесс завершается.


Чтобы продемонстрировать это поведение, снова вызовем исполнение метода с плохим аргументом:


class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err); // Not Handled
      }

      console.timeEnd('execute');
    });
  }
}

const withTime = new WithTime();

withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);

Первый вызов исполнения (execute call) приведёт к ошибке. Node-процесс упадёт или завершится:


events.js:163
      throw er; // Unhandled 'error' event
      ^
Error: ENOENT: no such file or directory, open ''

Это падение повлияет на второй вызов исполнения, который может вообще не быть выполнен.


Если зарегистрировать прослушивателя для специального события error, то поведение Node-процесса изменится. Например:


withTime.on('error', (err) => {
  // do something with err, for example log it somewhere
  console.log(err)
});

В данном случае будет сообщено об ошибке первого вызова исполнения, но Node-процесс не упадёт и не завершится. Второй вызов исполнения нормально закончится:


{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms

Обратите внимание, что сейчас Node ведёт себя иначе с функциями на основе промисов, он лишь выдаёт предупреждение, но в конце концов это изменится:


UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Другой способ обработки исключений из-за сгенерированных ошибок — регистрация прослушивателя глобального события процесса uncaughtException. Однако глобальная ловля ошибок при таком событии — идея плохая.


Стандартный совет относительно uncaughtException: избегайте его использования. Но если вам это необходимо (например, для отчёта о случившемся или для очисток), то позвольте процессу в любом случае завершиться:


process.on('uncaughtException', (err) => {
  // something went unhandled.
  // Do any cleanup and exit anyway!

  console.error(err); // don't do just that.

  // FORCE exit the process too.
  process.exit(1);
});

Однако представим, что одновременно произошло несколько error-событий. Это означает, что прослушиватель uncaughtException запущен несколько раз, что может стать проблемой при очистке кода. Такое бывает, к примеру, когда многочисленные вызовы приводят к завершению работы базы данных.


Модуль EventEmitter предоставляет метод once. Он сигнализирует о том, что хватит и одного вызова прослушивателя. Метод практично использовать с uncaughtException, потому что при первом непойманном исключении мы начнём выполнять чистку, зная, что в любом случае процесс завершится.


Порядок прослушивателей


Если для одного события зарегистрировать несколько прослушивателей, то они станут вызываться в каком-то порядке. Первый зарегистрированный будет и первым вызванным.


// ?????
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// ?????
withTime.on('data', (data) => {
  console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

Если выполнить этот код, то сначала в лог будет занесена строка «Length», а потом «Characters», потому что именно в таком порядке мы определили их прослушивателей.


Если нужно определить нового прослушивателя, но чтобы он вызывался первым, можно воспользоваться методом prependListener:


// ?????
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// ?????
withTime.prependListener('data', (data) => {
  console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

В этом случае в логе сначала появится строка «Characters».


И наконец, если вам нужно убрать прослушивателя, то воспользуйтесь методом removeListener.


На этом всё.

Поделиться с друзьями
-->

Комментарии (9)


  1. anitspam
    07.06.2017 09:27
    -7

    Мейл.ру, у вас такие хорошие технические статьи, что начинает вериться в превращение в корпорацию бобра.

    Но потом всё снова встаёт на свои места :( Может быть здесь вы расскажете: что за ерунда творится с м-агентом?

    И наконец, если вам нужно убрать прослушивателя, то воспользуйтесь методом removeListener.

    каким методом воспользоваться, чтобы агент не сообщал постоянно, что хочет обновиться?

    ваши пользователи уже дошли до такого http://forum.oszone.net/thread-326927-3.html
    Когда разработчик хочет сделать больно
    Запрет автоматического обновления Mail.Ru агента до версии 10
    1) Сносим все установленные версии Mail.Ru агента унинстайлером, например Geek Uninstaller
    2) создаём пустую папку c:\Users\имя пользователя\AppData\Roaming\Mail.Ru\Agent\bin (нужно иметь доступ к скрытым системным папкам)
    3) запрещаем все права NTFS всем группам и пользователям созданной папке bin
    4) установливаем, запускаем Mail.RU Agent и настраеваем свой профиль, например Mail.RU Agent 6.5.9316 RePack (& Portable) by elchupacabra (проверено на этой версии) скачать можно тут https://nnmclub.to/forum/viewtopic.php?t=906161
    5) ждём некоторое время, в папке c:\Users\имя пользователя\AppData\Roaming\Mra\Update
    появляется установочный файл новой версии magentsetup.exe и файл version4.txt
    6) закрываем Mail.RU Agent
    7) в файле version4.txt меняем значение 1 на 0
    20056 magentsetup.exe 249003180 1 http://mra.mail.ru/update/update9.html silent_update:1 'Новый Агент 6.5!'
    меняем на
    20056 magentsetup.exe 249003180 0 http://mra.mail.ru/update/update9.html silent_update:0 'Новый Агент 6.5!'
    8) запрещаем только запись, правами NTFS всем группам и пользователям version4.txt
    9) установочный файл c:\Users\имя пользователя\AppData\Roaming\Mra\Update\magentsetup.exe
    файл заменяем пустым файлом magentsetup.exe (создаём пустой текстовый файл, потом преименовываем его в исполняемый exe)
    10) запрещаем права NTFS созданному файлу magentsetup.exe
    11) включаем Mail.RU Agent и пользуемся


  1. Odrin
    07.06.2017 11:26
    +3

    ????? ?????

    Это перевод статьи на хинди?


  1. search
    07.06.2017 12:57

    Для работы с EventEmitter нужно создать расширяющий его класс.

    И далее по тексту


    class WithLog extends EventEmitter {
      execute(taskFunc) {
        console.log('Before executing');
        this.emit('begin');
        taskFunc();
        this.emit('end');
        console.log('After executing');
      }
    }

    Этот пример нарушает "S" и "I" из SOLID и создаёт наивысшую связанность.


    Для достижения большей гибкости кода лучше избегать расширения стандартных классов.


    Во-первых всегда можно сделать импиративно:


    const emitter = new EventEmitter();
    emitter.emit('begin');
    taskFunc();
    emitter.emit('end');

    Такой код легко читать и легко менять. Минус такого кода в том что он выглядет "не круто".


    Если уж не терпится создать класс для повторного использования кода, то лучше помнить мантру делигирование — лучшая альтернатива наследованию:


    class WithLog {
      constructor(emitter) {
        this.emitter = emitter;
      }
    
      execute(taskFunc) {
        console.log('Before executing');
        this.emitter.emit('begin');
        taskFunc();
        this.emitter.emit('end');
        console.log('After executing');
      }
    }
    
    const withLog = new WithLog(new EventEmitter());
    withLog.execute();

    Оператор new несёт гораздо меньше потенциального вреда и рисков полного рефакторинга чем оператор extend.


    Такие дела.


    1. RidgeA
      07.06.2017 13:02

      Хотя я с вами согласен, но мне кажется что статья не об этом.
      Если в каждую статью вмещать всевозможные best practise, то это может отвлекать внимание от того, о чем статья.


    1. mayorovp
      07.06.2017 13:35
      +1

      Как бы EventEmitter был создан именно для того, чтобы от него можно было наследоваться.


      1. search
        07.06.2017 13:48

        Да, официальная документация предлагает наследование, но я не увидел в этом никаких преимуществ. Уж простите дурака. Может проясните зачем оно нужно?


        1. mayorovp
          07.06.2017 14:58

          Во-первых, это попросту проще.
          Во-вторых, это делает класс похожим на системные классы — меньше неожиданностей для коллег.
          В-третьих, это упрощает код — поскольку базовый класс не может внезапно оказаться разделяемым между несколькими объектами — можно полагаться на владение им же. Например, можно подписаться на свое же событие — и знать что это не окажется случайно чужое событие с тем же именем.


  1. IgorKlopov
    07.06.2017 15:28
    +1

    Во-первых вказано “Создавайте такие хост-функции, которые принимают коллбэки либо всегда синхронно, либо всегда асинхронно” а про dezalgo не сказано. http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony
    Ну а во-вторых, автор допустил ошибку в своей “универсальной” функции


    readFileAsArray = function(file, cb = () => {})

    Сразу бросилось в глаза значение cb по-умолчанию. И не зря. Потому что в случае когда
    1) readFileAsArray принимает колбэк и не использует возвращаемый промис и
    2) fs.readFile возвращает ошибку
    то срабатывает reject(err), не ловится и вызывает unhandledRejection, что в дальнейшем в статье и происходит


    UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open

    а автор похоже не понимает почему. Уверен, что значение по-умолчанию надо от cb убрать, а тело должно проверять ее наличие


        fs.readFile(file, function(err, data) {
          if (err) {
            if (!cb) return reject(err);
            return cb(err);
          }
          const lines = data.toString().trim().split('\n');
          if (!cb) return resolve(lines);
          cb(null, lines);
        });


    1. mayorovp
      07.06.2017 16:16

      Да проще можно. Если уж обещание все равно создается в любом случае — надо его и использовать.


      var result = new Promise((resolve, reject) => {
        // ...
      });
      if (cb) result.then(lines => cb(null, lines), cb);
      return result;