我是第一次使用流,从可读流中提取数据时遇到一些问题。
我正在使用pg和 pg-copy-streams从 PSQL DB 中以流形式提取大量数据,目的是使用数据库中的数据创建 CSV 文件。
这是我的代码:
const aws = require('aws-sdk');
const {Client} = require('pg'); // Needs the nodePostgres Lambda Layer
const copyTo = require('pg-copy-streams').to;
exports.handler = async (event) => {
let response = {};
console.log(JSON.stringify(event));
const client = new Client();
const deviceId = event.deviceId;
const fromDate = event.fromDate;
const toDate = event.toDate;
if (!deviceId) { // if we do not have a device id, just bail.
return response = {
statusCode: 400,
body: "No device Id",
};
}
const tempTableQuery = getQuery(deviceId, fromDate, toDate);
console.log("Search query: " + tempTableQuery);
try {
await client.connect();
await client.query(tempTableQuery);
const q = `COPY temp_csv_table to STDOUT with csv DELIMITER ';'`;
const dataStream = client.query(copyTo(q));
// dataStream.pipe(console.log(process.stdout));
dataStream.on('readable', function() {
// There is some data to read now.
let data;
while (data = this.read()) {
console.log(data); //<- this dosent print anything :(
}
});
dataStream.on('error', async function (err) {
// Here we can control stream errors
await client.end();
});
dataStream.on('end', async function () {
await client.end();
});
} catch (e) {
response = {
statusCode: 500,
result: "Error: " + e
};
} finally {
client.end();
}
};
function getQuery(deviceId, fromDate, toDate) {
return `CREATE TEMPORARY TABLE temp_csv_table AS
SELECT *
FROM sensor_data_v2
WHERE device_id = '${deviceId}' and
time_stamp between '${fromDate}' and '${toDate}'
LIMIT 10;`;
}
问题:
- 如何从数据流中提取行?
- 有更好的方法吗?
注释:
- 在 AWS Lambda NodeJS 10.x 运行时上运行。
- 我知道表中有我指定的过滤器的数据。
- 我为此测试设置了 LIMIT 10,这些条件将返回 2600 行数据。
- 我将使用 csv-write-stream打包以使用数据库中的数据制作 CSV 文件。没有真正附加到这个包,很高兴使用另一个 CSV 编写器,如果它更容易使用的话。
最佳答案
我尝试复制这个,我认为你的问题实际上是在 try-catch-finally block 上。
因此,当您的代码到达 const dataStream = client.query(copyTo(q))
部分时,它会启动与 Promise 无关的流处理,因此执行直接转到 finally
block 。但是,在这个 finally
block 中,您终止了客户端,因此数据流(仍在运行)将收到错误 Error:连接终止
,因为流传输需要一些时间。
要解决此问题,您只需从 finally block 中删除 client.end()
并将该客户端终止放入 catch block 和流 end
事件处理程序中,然后它应该可以工作。
您还可以在 read
事件期间执行 console.log(data.toString())
,否则打印出来的数据将是二进制的。
关于node.js - 从 Lambda 中的 NodeJS 流提取数据时遇到问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58800261/