В первой части был рассмотрен переход от синхронного кода к асинхронному. Во второй части я рассмотрю переход от побайтового чтения из файла к построчному. Поскольку операция кодирования из одной кодировки в другую довольно тривиальна, я чуть больше расскажу про теоретическую сторону вопроса.



Часть 2: Как прочитать строку из файла



Самый тривиальный способ прочитать строки из файла — это прочитать весь файл в строку и разбить её по разделителям строк:

fs.readFile('large.txt', { encoding : 'utf8' },
  (err, data) => {
    if (err) throw err;
    data.split('\n').forEach(line => {
      doSomethingWithLine(line);
    });
  });


Он же, пожалуй, самый быстрый. Но он же требует больше всего памяти — от 100% до 200% от размера файла. 200% — это одновременно и самый распространенный случай, так как в памяти кодировка у строки UTF-16 и поэтому размер требуемой памяти умножается на два если файл содержит в основном символы из однобайтного диапазона UTF-8.

Кроме того, разработчики Node.js не рекомендуют загружать много данных в Node.js процесс (см. What is the memory limit on a node process?). Сделано это не очень элегантно — даже если физической памяти хватает, то при попытке загрузить файл больше 1Gb бросается исключение:

this.parent = new SlowBuffer(this.length);
              ^
RangeError: length > kMaxLength


Если же файл поменьше, то можно получить и такое:

FATAL ERROR: CALL_AND_RETRY_0 Allocation failed - process out of memory


Остается только обрабатывать файл по частям. Для этого нужно его по частям прочитать и Node.js предоставляет для этого минимум 5 способов:

  1. Использовать «старые» потоки — открыть поток и подписаться на событие «data».
  2. Использовать «новые» потоки — подписаться на событие «readable» и использовать метод read().
  3. Создать свой WritableStream и направить в него файловый поток методом «pipe()».
  4. Использовать файловые дескрипторы и набор методов open(), read(), close().
  5. Использовать синхронные варианты — openSync(), readSync(), closeSync().


Варианты 1-3 являются более элегантными, так как оперируют удобной абстракцией — потоком. Это позволяет рассматривать программу как диаграмму потоков данных (data flow diagram) и при дизайне архитектуры оперировать такими терминами как слияние, разделение и трансформация.

Также варианты 1 и 2 отличаются возможностью чтения символов из файла. В вариантах 3 и 4 данные из файла записываются в буфер и затем их надо конвертировать в текст.

// Вариант #1 - "старые" потоки
var stream = fs.createReadStream(file, { encoding : 'utf8' });
stream.on("data", (_, data) => processData(data));
stream.on("end", done);

// Вариант #2 - "новые" потоки
var stream = fs.createReadStream(file, { encoding : 'utf8' });
stream.on("readable", () => processData(stream.read()));
stream.on("end", done);

// Вариант #3 - pipe
var stream = fs.createReadStream(file, { encoding : 'utf8' });
var writeStream = new Writable();
writeStream._write = (chunk, encoding, callback) => {
    processData(chunk);
    callback();
};
writeStream.on('end', done);
stream.pipe(writeStream);

// Вариант #4 - асинхронные методы fs
fs.open(file, 'r', (err, fd) => {
    var buffer = new Buffer(1000*1000);
    (function next() {
        fs.read(fd, buffer, 0, buffer.length, null,
          (err, bytesRead, buffer) => {
              if (bytesRead === 0) {
                  fs.close(fd, done);
              } else {
                  processData(buffer);
                  next();
              }
          });
    }());
});


Более концептуальным является отличие с точки зрения получения данных из файла. Варианты 1-2 получают следующий фрагмент как только завершается обработчик события текущего фрагмента. В случае асинхронного кода в обработчике последовательность его выполнения непредсказуема:

function processData(chunk) {
  console.log('first')
  setImmediate(() => {
    console.log('second');
    setImmediate(() => console.log('third'));
  });
}

var stream = fs.createReadStream(file, { encoding : 'utf8' });
stream.on("readable", () => processData(stream.read()));

...
first
third
second
third
first
second
...


Ситуацию можно поправить используя методы pause()/resume().

function processData(chunk, done) {
  console.log('first')
  setImmediate(() => {
    console.log('second');
    setImmediate(() => {
      console.log('third');
      done();
    });
  });
}

var stream = fs.createReadStream(file, { encoding : 'utf8' });
stream.on("readable", () => {
  stream.pause();
  processData(stream.read(), () => stream.resume());
});

...
first
second
third
first
second
third
...


В вариантах 3 и 4 следующий фрагмент будет получен только после передачи управления (вариант 3) или запроса (вариант 4).

Думаю, что информации достаточно для реализации функции createTextReader() из первой части статьи. Из всех вариантов наиболее соответствующим является четвертый, поскольку поток управления у него аналогичен интерфейсу (request-callback).

function createTextReader(file, options, done) {
  var length, encoding, separator;

  if ('function' === typeof options) {
    done = options;
    options = { };
  }

  length = 4 * 1024;
  encoding = options.encoding || 'utf8';
  separator = (options.separator || '\n');

  fs.open(file, 'r', (err, fd) => {
    var eof, tail, buffer, decoder, lines;

    if (err) {
      done(err);
      return;
    }

    eof = false;
    buffer = new Buffer(length);
    tail = '';
    lines = [];

    decoder = new StringDecoder(encoding);

    done(null, {
      readLine : done => {
        var line;
        if (lines.length > 0) {
          line = lines.shift();
          done(null, line);
        } else if (eof) {
          done(null, null);
        } else {
          (function read() {
            fs.read(fd, buffer, 0, length, null,
              function (err, bytesRead, buffer) {
                var index, position;

                if (bytesRead === 0) {
                  eof = true;
                  done(null, tail);
                } else {
                  tail = tail + decoder.write(buffer.slice(0, bytesRead));
                  index = -1;
                  while (-1 !== (position = tail.indexOf(separator, index))) {
                    lines.push(tail.substring(index, position));
                    index = position + separator.length;
                  }
                  tail = tail.substring(index);
                  if (lines.length === 0) {
                    read();
                  } else {
                    line = lines.shift();
                    done(null, line);
                  }
                }
              });
          }());
        }
      },
      close : done => {
        fs.close(fd, () => {
          if (done) {
            done(err || null);
          }
        });
      }
    });
  });
}


Послесловие



В двух частях этой статьи я постарался изложить все, что мне пригодилось при создании модуля https://github.com/AlexAtNet/async-read-lines. К сожалению, многое осталось за рамками, не на все хватило времени. Так что если нашли ошибку или опечатку — пишите в личные сообщения. Если у вас есть вопросы по теме статьи — буду рад ответить в комментариях. Если увидите баги в модуле — создавайте запрос в github issues. Связаться со мной лично можно через сайт alexatnet.com.

Успехов в программировании!



Об авторе: Александр Неткачев — старший разработчик на С# и F#. Поддерживает сайт alexatnet.com, проводит вебинары (Code&Coffe), помогает с кодом начинающим разработчикам (CodeReview4U).

Комментарии ()