node.js - NodeJS 中使用流和异步读取和处理大小文件

标签 node.js asynchronous stream queue large-files

我在逐行处理文件列表时遇到问题。这是我正在使用的代码:

var LineReader = require("line-by-line");
var async = require("async");
var files = [ "small.txt", "medium.txt", "large.txt" ];

var queue = async.queue(function(task, next){ console.log(task); next(); }, 10);

async.eachSeries(
    files,
    function (file, callback) {
        var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true });

        lineReader.on("error", function (err) {
            callback(err);
        });

        lineReader.on("line", function (line) {
            lineReader.pause();
            queue.push(line);
        });

        queue.drain = function () {
            lineReader.resume(); // I need to resume the stream !
            callback(); // When all lines have been processed, I need to read the next file
        };
    },
    function (err) {
        if (err) return console.log(err);
        console.log("Job done.");
    }
);

我正在使用async “同步”处理每个文件并处理队列中的每一行,以及 line-by-line逐行读取每个文件。

我的问题是:

  • 如果我暂停流,请将线路推送到队列,并在收到此错误后恢复流

RangeError: Maximum call stack size exceeded

  • 如果我暂停流,将线路插入队列并等待队列为空,我将无法恢复流并执行回调

q.drain = function () { lineReader.resume(); callback(); };

如何等到所有行都处理完毕并执行回调来处理下一个文件?

谢谢。

更新:

我发现“逐行”模块有一个奇怪的东西。 “结束”事件被发出两次。所以我决定重构代码,找到问题所在。另一个问题:该模块已经一年没有更新,并且 1 个月前发送了 2 个 Pull 请求。

这是我的解决方案(如果逐行有效):

var LineReader = require("line-by-line");
var async = require("async");
var files = [ "small.txt", "medium.txt", "large.txt" ];

var queue = async.queue(function(task, next){ console.log(task); next(); }, 10);

async.eachSeries(
    files,
    function (file, callback) {
        var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true });

        lineReader.on("error", function (err) {
            callback(err);
        });

        lineReader.on("end", function () {
            callback();
        });

        lineReader.on("line", function (line) {
            lineReader.pause();
            queue.push(line);
        });

        queue.drain = function () {
            lineReader.resume();
        };
    },
    function (err) {
        if (err) return console.log(err);
        console.log("Job done.");
    }
);

使用此解决方案,队列中只有 1 行。如果有人有想法推送超过 1 行然后暂停流。

我会尝试找到另一个没有这个问题的模块,因为我不想为此重写一个新模块。

最佳答案

我会用完全不同的方式解决这个问题。

使用新的 stream API 无需监听事件或暂停。
我会像这样使用 gulpthrough2 :

var gulp = require('gulp')
, thr = require('through2').obj
;

function fixLine (line) {
  // do stuff with a single line of a file.
  // just return it back for no reason :)
  return line
}

files = [ "small.txt", "medium.txt", "large.txt" ]
gulp.src(files).pipe(thr(function(vfs, enc, next){
  // vfs - vinyl filesystem.
  var str = vfs.contents.toString().split('\n').map(fixLine).join('\n')
  vfs.contents = new Buffer(str)
  next(null, vfs)
}))

但是这是异步的。无法保证文件的顺序就是数组中的顺序。但显然,这条线是按顺序进行的。

我希望这有帮助。

关于node.js - NodeJS 中使用流和异步读取和处理大小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22935010/

相关文章:

JavaScript,Node.js : is Array. forEach 异步?

node.js - 用不同的值更新 mongoDB 中的许多文档

node.js - Docker/GitLab : npm dependencies installed in docker image, 但通过运行器未满足

javascript - Nodejs 中堆叠异步回调事件的最佳方式

c++ - cout 输入模板

php - stream_set_write_buffer 或 PHP 中的文件锁定?

audio - 将音频添加到 ffmpeg 视频流

javascript - 如果没有被拒绝, promise 链式结果

node.js - Passport 本地策略没有被调用

c# - 在 C# 中将 Task<T> 转换为 Task<object> 而无需 T