javascript - Node.js:从 MongoDB 流式传输到文件

标签 javascript node.js mongodb csv streaming

我对 javascript/node.js 相当陌生,正在尝试实现一个非常基本的场景:连接到 MongoDB,将 JSON 响应转换为 CSV,将其写入文件。我已经尝试如下

fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var Json2csvStream = require('json2csv-stream');
var Stream = require('stream');
var JSONStream = require('JSONStream');
var es = require('event-stream');
var csv = require('csv');

var fields = ['execAmendTime', 'execTime', 'execType', 'lastMkt', 'manualExecFlag', 'orderId', 'riskTrade', 'rootOrdId', 'salesCommissionRate', 'salesCommissionType', 'theoPov20Px',
'theoPov20BL', 'tradeFlags', 'tradeNotes', 'transactTime', 'version', 'book.bookName', 'businessUnit', 'commissionRate', 'commissionSource', 'commissionType', 'counterBook.bookName', 
'counterParty.name', 'createTime', 'currency', 'direction', 'execQuantity', 'fxRate','orderQuantity', 'positionTrader.name', 'price', 'primaryTrader.name','rootSystem', 'source',
'sectorGicsLevel1', 'salesTrader.name', 'tradedPrice', 'isCRB', 'clientCategory', 'tradeId', 'tradeDate', 'instrument.instrumentRic', 'notionalUSD','commissionUSD', 'region'];

// Connect to the db
MongoClient.connect("mongodb://*****", function (err, db) {
    if (err) { return console.dir(err); }

    if (!err) {
        console.log("We are connected");
    }

    db.open(function (err, db) {
        if (err) { return console.dir(err); }
        var newDb = db.db("test_db");

        var collection = newDb.collection('test', function (err, collection) {
            if (err) { return console.dir(err); }
            var parser = new Json2csvStream();
            var writer = fs.createWriteStream('out.csv');
            var stream = collection.find({ tradeDate: new Date('2015-12-29T00:00:00.000Z') }).stream();

            stream.pipe(parser).pipe(writer);

            stream.on("data", function (item) {
                console.log(item);
            });

            stream.on('end', function () {
                console.log("ended");
            });

            stream.on("end", function () {
                newDb.close();
                db.close();
            });
        });
    });
});

我收到如下错误。

我尝试使用 JSON.stringify 等添加转换,但我的尝试都没有成功。看来我需要等到 Mongo 的查询流完成才能开始将其输入 json2csv 转换器?

有什么想法吗?我在这里做的是根本错误的事情吗?

非常感谢!

输出:

We are connected
D:\WebTrial\MongoProject\node_modules\mongodb\lib\utils.js:98
    process.nextTick(function() { throw err; });
                                ^

TypeError: Invalid non-string/buffer chunk
    at validChunk (_stream_writable.js:178:14)
    at Writable.write (_stream_writable.js:205:12)
    at ondata (_stream_readable.js:525:20)
    at emitOne (events.js:82:20)
    at emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:146:16)
    at Readable.push (_stream_readable.js:110:10)
    at D:\WebTrial\MongoProject\node_modules\mongodb\lib\cursor.js:1102:10
    at handleCallback (D:\WebTrial\MongoProject\node_modules\mongodb\lib\utils.j
s:96:12)
    at D:\WebTrial\MongoProject\node_modules\mongodb\lib\cursor.js:673:5

最佳答案

这是因为 find().stream() 流式传输对象,而 Json2csvStream 需要字符串。 event-stream 可以帮助您将对象字符串化。我还简化了你的代码,有不必要的东西:

var fs = require('fs');
var MongoClient = require('mongodb').MongoClient;
var es = require('event-stream');
var Json2csvStream = require('json2csv-stream');

// var Db = require('mongodb').Db;
// var Server = require('mongodb').Server;
// var Stream = require('stream');
// var JSONStream = require('JSONStream');
// var csv = require('csv');

var fields = ['execAmendTime', 'execTime', 'commissionUSD', 'region'];

// Connect to the db
// you can put the db name in the url
MongoClient.connect("mongodb://localhost:27017/test_db", function (err, db) {
    if (err) {
        return console.dir(err);
    } else {
        console.log("We are connected");
    }

    // without strict: true, err is always null
    // in strict mode, there is an err if the collection doesn't exist
    db.collection('test', { strict: true }, function (err, collection) {
        if (err) {
            return console.dir(err);
        }

        var json2csv = new Json2csvStream();
        var writer = fs.createWriteStream('out.csv');

        var mongoStream = collection.find(
            { tradeDate: new Date('2015-12-29T00:00:00.000Z') }
        ).stream();

        var stream = mongoStream
            .pipe(es.map(function (doc, next) {
                doc = JSON.stringify(doc);
                // console.log(doc);
                next(null, doc);
            })).pipe(json2csv).pipe(writer).on('close', function () {
                console.log('done...');
                db.close();
            });
    });
});

关于javascript - Node.js:从 MongoDB 流式传输到文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34547642/

相关文章:

javascript - 尝试将消息发送到特定 channel ,但无法定义我的客户端

mongodb - 为什么索引的方向在 MongoDB 中很重要?

javascript - jQuery按时间间隔替换图像

javascript - 从要顺序执行的 Promise 数组创建 jquery Promise

javascript - 如何创建后退/前进按钮?

javascript - 如何在现有 GraphQL 服务器上添加根和查看器查询以支持中继

javascript - 使用 AJAX 从客户端( Angular )获取数据到 Node.js

javascript - 在 JavaScript 中实现调车场算法

node.js - Node.js ws 包上的正确错误处理

c++ - 使 system() 产生的子进程在父进程收到终止信号并退出后继续运行