node.js - 从 Lambda 中的 NodeJS 流提取数据时遇到问题

标签 node.js amazon-web-services aws-lambda node-streams

我是第一次使用流,从​​可读流中提取数据时遇到一些问题。

我正在使用pgpg-copy-streams从 PSQL DB 中以流形式提取大量数据,目的是使用数据库中的数据创建 CSV 文件。

这是我的代码:

const aws = require('aws-sdk');
const {Client} = require('pg'); //  Needs the nodePostgres Lambda Layer
const copyTo = require('pg-copy-streams').to;

exports.handler = async (event) => {
let response = {};

console.log(JSON.stringify(event));

const client = new Client();

const deviceId = event.deviceId;
const fromDate = event.fromDate;
const toDate = event.toDate;

if (!deviceId) { // if we do not have a device id, just bail.
    return response = {
        statusCode: 400,
        body: "No device Id",
    };
}

const tempTableQuery = getQuery(deviceId, fromDate, toDate);
console.log("Search query: " + tempTableQuery);
try {

    await client.connect();

    await client.query(tempTableQuery);

    const q = `COPY temp_csv_table to STDOUT with csv DELIMITER ';'`;
    const dataStream = client.query(copyTo(q));

    // dataStream.pipe(console.log(process.stdout));
    dataStream.on('readable', function() {
        // There is some data to read now.
        let data;

        while (data = this.read()) {
            console.log(data); //<- this dosent print anything :(
        }
    });

    dataStream.on('error', async function (err) {
        // Here we can control stream errors
        await client.end();
    });
    dataStream.on('end', async function () {
        await client.end();
    });


} catch (e) {
    response = {
        statusCode: 500,
        result: "Error: " + e
    };
} finally {
    client.end();
}
};

function getQuery(deviceId, fromDate, toDate) {
return `CREATE TEMPORARY TABLE temp_csv_table AS
            SELECT * 
            FROM sensor_data_v2 
            WHERE device_id = '${deviceId}' and 
                  time_stamp between '${fromDate}' and '${toDate}' 
            LIMIT 10;`;
}

问题:

  1. 如何从数据流中提取行?
  2. 有更好的方法吗?

注释:

  1. 在 AWS Lambda NodeJS 10.x 运行时上运行。
  2. 我知道表中有我指定的过滤器的数据。
  3. 我为此测试设置了 LIMIT 10,这些条件将返回 2600 行数据。
  4. 我将使用 csv-write-stream打包以使用数据库中的数据制作 CSV 文件。没有真正附加到这个包,很高兴使用另一个 CSV 编写器,如果它更容易使用的话。

最佳答案

我尝试复制这个,我认为你的问题实际上是在 try-catch-finally block 上。

因此,当您的代码到达 const dataStream = client.query(copyTo(q)) 部分时,它会启动与 Promise 无关的流处理,因此执行直接转到 finally block 。但是,在这个 finally block 中,您终止了客户端,因此数据流(仍在运行)将收到错误 Error:连接终止,因为流传输需要一些时间。

要解决此问题,您只需从 finally block 中删除 client.end() 并将该客户端终止放入 catch block 和流 end 事件处理程序中,然后它应该可以工作。

您还可以在 read 事件期间执行 console.log(data.toString()),否则打印出来的数据将是二进制的。

关于node.js - 从 Lambda 中的 NodeJS 流提取数据时遇到问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58800261/

相关文章:

amazon-web-services - 在 AWS Step Function 中将多个输入传递到 Map State

angularjs - E/configParser - 错误消息 : configuration file cli. js 未导出配置对象

node.js - 如何从 postman 扩展 "form-data"值中获取值

javascript - 没有返回有效的 JSON?

amazon-web-services - AWS CodeBuild + CodePipeline : "No matching artifact paths found"

python - 我如何使用 aws lambda 将文件写入 s3 (python)?

javascript - 在 python 中, "export"来自模块的定制对象

ruby-on-rails - 如何在 Amazon Linux 上将 selenium 与 chrome 驱动程序一起使用。错误:Selenium::WebDriver::Error::UnknownError:未知错误:找不到 Chrome 二进制文件

amazon-web-services - Elasticsearch 1.3.2升级后的问题

node.js - 跨 AWS Lambda 函数调用共享数据库连接