我已经使用 node-oracledb 几个月了,并且已经成功实现了迄今为止所需的目标。
我目前正在开发一个搜索应用程序,该应用程序可能会从一次调用中返回大约 200 万行数据。为了确保我不会与浏览器和服务器断开连接,我想我应该尝试 queryStream,以便有持续的数据流返回客户端。
我实现了queryStream例如,这对于几十万行来说效果很好。然而,当返回的行数大于一百万时,Node 就会耗尽内存。通过记录和观察客户端和服务器日志事件,我可以看到客户端在发送和接收的行数方面远远落后于服务器。所以,看起来 Node 正在崩溃,因为它缓冲了太多的数据。
值得注意的是,此时,我的 selectstream 实现位于通过 Express 调用的 req/res 函数内。
为了返回数据,我做了类似的事情......
stream.on('data', function (data) {
rowcount++;
let obj = new myObjectConstructor(data);
res.write(JSON.stringify(obj.getJson());
});
我一直在阅读有关流和管道如何帮助实现流程的内容,因此我希望能够将查询结果通过管道传输到 a) 帮助实现流程,b) 能够在发送回客户端之前将结果通过管道传输到其他函数。
例如
function getData(req, res){
var stream = myQueryStream(connection, query);
stream
.pipe(toSomeOtherFunction)
.pipe(yetAnotherFunction)
.pipe(res);
}
我花了几个小时试图找到一个允许我管道结果的解决方案或示例,但我陷入困境并需要一些帮助。
如果我遗漏了一些明显的东西,我深表歉意,但我仍在掌握 Node,尤其是流。
提前致谢。
最佳答案
这里有点阻抗不匹配。 queryStream API 发出 JavaScript 对象行,但您想要流式传输到客户端的是 JSON 数组。基本上,您必须在开头添加一个左括号,在每行后面添加一个逗号,在末尾添加一个右括号。
我将向您展示如何在 Controller 中执行此操作,该 Controller 像您所做的那样直接使用驱动程序,而不是像我在 this series 中提倡的那样使用单独的数据库模块。 .
const oracledb = require('oracledb');
async function get(req, res, next) {
try {
const conn = await oracledb.getConnection();
const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});
res.writeHead(200, {'Content-Type': 'application/json'});
res.write('[');
stream.on('data', (row) => {
res.write(JSON.stringify(row));
res.write(',');
});
stream.on('end', () => {
res.end(']');
});
stream.on('close', async () => {
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
stream.on('error', async (err) => {
next(err);
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
} catch (err) {
next(err);
}
}
module.exports.get = get;
一旦掌握了概念,您就可以使用可重用的 Transform 类来简化一些事情,该类允许您在 Controller 逻辑中使用管道:
const oracledb = require('oracledb');
const { Transform } = require('stream');
class ToJSONArray extends Transform {
constructor() {
super({objectMode: true});
this.push('[');
}
_transform (row, encoding, callback) {
if (this._prevRow) {
this.push(JSON.stringify(this._prevRow));
this.push(',');
}
this._prevRow = row;
callback(null);
}
_flush (done) {
if (this._prevRow) {
this.push(JSON.stringify(this._prevRow));
}
this.push(']');
delete this._prevRow;
done();
}
}
async function get(req, res, next) {
try {
const toJSONArray = new ToJSONArray();
const conn = await oracledb.getConnection();
const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});
res.writeHead(200, {'Content-Type': 'application/json'});
stream.pipe(toJSONArray).pipe(res);
stream.on('close', async () => {
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
stream.on('error', async (err) => {
next(err);
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
} catch (err) {
next(err);
}
}
module.exports.get = get;
关于node.js - 如何从 'pipe' 事件中获取 'on data' oracle-db 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50327055/