Асинхронность node.js часто приводят как одно из достоинств платформы. Действительно, организация программы в виде небольших задач и последовательное их выполнение в рамках одного потока дает некоторое преимущество в распределении ресурсов и большей отзывчивостью приложения. Но это также приводит к необходимости организовывать код в нетривиальной манере. Какой именно я покажу на примере библиотеки для чтения строк из файла большого объема (500Mb и более).



Поскольку статья получается довольно большой, публикую в двух частях. В первой будет рассматриваться асинхронность, во второй — файловые операции.

Часть 1: От последовательного кода к асинхронному



Задача чтения строки из текстового файла в кодировке UTF-8 не очень простая. Основная сложность — в правильной обработке очередного блока бинарных данных, граница которого может попасть в середину UTF-8 символа или разделителя строк.

Давайте пока предположим, что эта задача успешно решена. Тогда последовательный алгоритм будет простым:

function processLineSync(line) {
  // ...
}

var line, reader = createTextReader(file);
while (line = reader.readLineSync())
  processLineSync(line);
reader.close();


Для тестирования сделаем простую реализацию createTextReader():

function createTextReader() {
  var counter = 0;
  return {
    readLineSync : () => {
      counter += 1;
      return counter < 10000000 ? counter.toString() : null;
    },
    readLine : done => {
      counter += 1;
      done(null, counter < 10000000 ? counter.toString() : null);
    },
    close : () => { }
  };
}


Нужно заметить, что цикл, код чтения очередной строки и её обработка — последовательные. Соответственно, их выполнение блокирует процесс node.js.

Для того, что бы написать правильный асинхронный код надо понять, чем асинхронный отличается от последовательного. Для этого предположим что у метода processLineSync() есть асинхронный аналог — processLine(line, err => { }). Если мы попробуем выполнить асинхронную операцию в цикле, то получим то или иное переполнение. Давайте посмотрим на примере.

function processLine(line, done) {
  setImmediate(() => { /* ... */ done(); });
}

var line, reader = createTextReader(file);
while (line = reader.readLineSync())
  processLine(line, () => { });
reader.close();


Через пару минут этот код падает с ошибкой «out of memory». При этом не выведя ни одного «ok».

А происходит вот что: while (line = reader.readLineSync())…; вычитывает данные синхронно и создает (а не вызывает) обработчики строк. Естественно, создание обработчика приводит к выделению памяти для него и через конечное число итераций память заканчивается. Обработчики создаются так как асинхронная операция в node.js является некоторым объектом, содержащим функцию, который будет вызван после завершения текущего блока. А текущий блок все никак не завершается, что и приводит к переполнению.

Если цикл не годится, что же тогда делать? Получается, что тело цикла само по себе должно быть функцией. И вызывать саму себя для продолжения итераций. Этот код уже намного сложнее простого цикла и в самом простом случае представляет собой следующую конструкцию:

function asyncWhile(criteria, iteration, done) {
  if (criteria()) {
    iteration(() => asyncWhile(criteria, iteration, done));
  } else {
    done();
  }
}


Ну и пример её использования:

var line, reader = createTextReader(file);
asyncWhile(
  () => {
    line = reader.readLineSync();
    return line != null;
  },
  done => processLine(line, () => done()),
  () => reader.close());


Работает уже лучше, но пока выглядит немного неуклюже, так как используются глобальные переменные. Так же отсутствует обработка ошибок и будет переполнение стека (но об этом чуть позже). Также было бы неплохо читать данные из файла асинхронно.

Что бы уменьшить зависимость от глобальной переменной в теле цикла перебросим её в виде параметра. А контекст цикла поместим в контекст функций. Получим следующее:

function asyncWhile(context, criteria, iteration, done) {
  var value = criteria.call(context);
  if (value) {
    iteration.call(context, value, () => asyncWhile(context, criteria, iteration, done));
  } else {
    done.call(context);
  }
}

asyncWhile(createTextReader(file),
  () => this.readLineSync(),
  (line, done) => processLine(line, () => done()),
  () => this.close());


Можно заметить, что параметры третьего аргумента совпадают с параметрами processLine. Аналогично для второго параметра processLine. Они оставлены что бы продемонстрировать подход, но, конечно, их можно сократить до передачи функции непосредственно:

var reader = createTextReader(file);
asyncWhile(reader, reader.readLineSync, processLine, reader.close);


Теперь надо добавить обработку ошибок. Для этого предположим, что ошибка возвращается первым параметром callback функции processLine. Такой метод возвращения ошибки является довольно стандартным для node.js.

Добавляем обработку ошибок и получаем:

function asyncWhile(context, criteria, iteration, done) {
  var value = criteria.call(context);
  if (value) {
    iteration(value, err => {
      if (err) {
        done.call(context, err);
      } else {
        asyncWhile(context, criteria, iteration, done);
      }
    });
  } else {
    done.call(context);
  }
}


Использование почти не изменилось:

asyncWhile(createTextReader(file),
  () => this.readLineSync(),
  (line, done) => processLine(line, (err) => {
    if (err) console.log(err);
    done();
  }),
  () => this.close());


Теперь давайте подсчитаем количество строк в файле:

asyncWhile({ reader : createTextReader(file), counter : 0 },
  () => this.reader.readLineSync(),
  (line, done) => {
    this.counter += 1;
    done();
  },
  () => {
    console.log(this.counter);
    this.reader.close();
  });


И почти сразу получаем «Maximum call stack size exceeded». Анализ стека показывает многократно повторяющийся шаблон asyncWhile() -> Object. -> asyncWhile() -> … Это довольно стандартная проблема, которая обходится обновлением стека через вызов функции из основного цикла node.js в момент рекурсии:

function asyncWhile(context, criteria, iteration, done) {
  var value = criteria.call(context);
  if (value) {
    iteration.call(context, value, err => {
      if (err) {
        done.call(context, err);
      } else {
        setImmediate(() => asyncWhile(context, criteria, iteration, done));
      }
    });
  } else {
    done.call(context, null);
  }
}


setImmediate() помещает обработчик в очередь и продолжает выполнение. Этот обработчик будет вызван по завершению текущего блока.

Предпоследним штрихом заменим блокирующее чтение из файла на асинхронное (не забыв про обработку ошибок):

function asyncWhile(context, criteria, iteration, done) {
  criteria.call(context, (err, value) => {
    if (err) {
      done.call(context, err);
    } else {
      if (value) {
        iteration.call(context, value, err => {
          if (err) {
            done.call(context, err);
          } else {
            setImmediate(() => asyncWhile(context, criteria, iteration, done));
          }
        });
      } else {
        done.call(context, null);
      }
    }
  });
}


И последним сделаем createTextReader тоже асинхронным:

  function createTextReader(file, done) {
    var counter = 0;
    done(null, {
      readLine : (done) => {
        counter += 1;
        done(null, counter < 10000000 ? counter.toString() : null);
      },
      close : () => { }
    });
  }


Используем:

createTextReader(file, (err, reader) => {
  asyncWhile(reader,
    done => this.readLine((err, line) => done(err, line)),
    (line, done) => processLine(line, (err) => {
      if (err) console.log(err);
      done();
    }),
    () => this.close());
});


Как видно, асинхронный код намного сложнее синхронного. И намного медленнее — примерно в 40 раз. С другой стороны он не блокирует поток, что позволяет запускать несколько таких обработчиков параллельно (в смысле node.js). Ну и чем более сложный обработчик для каждой записи, тем больше время обработки превышает расходы на поддержку асинхронности.



Об авторе: Александр Неткачев — старший разработчик на С# и F#. Поддерживает сайт alexatnet.com, проводит вебинары (Code&Coffe), помогает с кодом начинающим разработчикам (CodeReview4U).

Комментарии ()