node.js - 用于 CSV 解析的 Highland.js

标签 node.js functional-programming highland.js

我正在尝试以一种非常实用的方式编写。我们正在使用 Highland.js 来管理流处理,但是因为我太新了,我觉得我对如何处理这种独特情况感到非常困惑。

这里的问题是文件流中的所有数据都不一致。文件中的第一行通常是标题,我们希望将其存储到内存中并随后将所有行压缩到流中。

这是我的第一次尝试:

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

var headers = [];

var through = _.pipeline(
    _.split(),
    _.head(),
    _.doto(function(col) {
        headers = col.split(',');
        return headers;
    }),

    ......

    _.splitBy(','),
    _.zip(headers),
    _.wrapCallback(process)
);

_(stream)
    .pipe(through)
    .pipe(output);

管道中的第一个命令是按行拆分文件。下一个抓取 header ,doto 将其声明为全局变量。问题是流中接下来的几行不存在,因此进程被阻塞...可能是因为它上面的 head() 命令。

我已经尝试了一些其他变体,但我觉得这个示例让您了解我需要使用它的地方。

关于此的任何指导都会有所帮助——它还会提出一个问题,即如果我在每一行中都有不同的值,我如何才能将流程流拆分成多个不同的长度/复杂性不同的流操作。

谢谢。

编辑:我产生了更好的结果,但我质疑它的效率——有没有一种方法可以优化它,所以在每次运行时我都不会检查是否记录了标题?这仍然感觉草率。

var through = _.pipeline(
    _.split(),
    _.filter(function(row) {
        // Filter out bogus values
        if (! row || headers) {
            return true;
        }
        headers = row.split(',');
        return false;
    }),
    _.map(function(row) {
        return row.split(',')
    }),
    _.batch(500),
    _.compact(),
    _.map(function(row) {
        return JSON.stringify(row) + "\n";
    })
);

_(stream)
    .pipe(through)

最佳答案

您可以使用 Stream.observe()Stream.fork()拆分流。

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var through = highland.pipeline(function(s) {
    var headerStream, headers;
    // setup a shared variable to store the headers
    headers = [];
    // setup the csv processing
    s = s
        // split input into lines
        .split()
        // remove empty lines
        .compact()
        // split lines into arrays
        .map(function(row) {
            return row.split(',');
        });
    // create a new stream to grab the header
    headerStream = s.observe();
    // pause the original stream
    s.pause();
    // setup processing of the non-header rows
    s = s
        // drop the header row
        .drop(1)
        // convert the rest of the rows to objects
        .map(function(row) {
            var obj = headers.reduce(function(obj, key, i) {
                obj[key] = row[i];
                return obj;
            }, {});
            return JSON.stringify(obj) + "\n";
        });
    // grab the first row from the header stream
    // save the headers and then resume the normal stream
    headerStream.head().toArray(function(rows) {
        headers = rows[0];
        s.resume();
    });
    return s;
});
_(stream)
    .pipe(through)
    .pipe(output);

也就是说,您的 csv 解析不考虑在您的值中转义换行符和逗号。通常,这是通过将值用双引号引起来在 csv 文件中完成的。然后通过将两个并排放置来转义双引号。要做到这一点有点棘手,所以我建议使用处理它的包,例如 fast-csv .

那么您的代码可能如下所示:

var _      = require('highland');
var fs     = require('fs');
var csv    = require('fast-csv');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

_(stream.pipe(csv({headers: true, ignoreEmpty: true})))
    .map(function(row) {
        return JSON.stringify(row) + "\n";
    })
    .pipe(output);

关于node.js - 用于 CSV 解析的 Highland.js,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29395358/

相关文章:

javascript - Nativescript Javascript 和 Watson Cloud SDK 集成

node.js - Express 应用程序不允许我检查 req 中的内容

haskell - 学习 Haskell 等纯函数式语言背后的理论的引用资料?

functional-programming - 实现组合器演算

javascript - highland.js: map 功能未按预期工作

node.js - 如何使用默认 catch 和处理程序创建 Promise

node.js - NodeJs 和 npm : The package mocha does not satisfy its siblings' peerDependencies requirements

scala - 使用 fs2.Stream 分组事件

javascript - 使用 highland.js 查看流的第一个对象

node.js - 使用流减少 nodejs 应用程序内存?