node.js - 考虑背压,将数据从 Cassandra 流式传输到文件

标签 node.js cassandra stream node-streams

我有一个 Node 应用程序,可以收集投票提交并将其存储在 Cassandra 中。投票存储为 Base64 编码的加密字符串。该 API 有一个名为 /export 的端点,它应该获取所有这些投票字符串(可能 > 100 万个),将它们转换为二进制并将它们一个接一个地附加到 votes.egd 文件中。然后该文件应该被压缩并发送给客户端。我的想法是从 Cassandra 流式传输行,将每个投票字符串转换为二进制并写入 WriteStream。 我想将此功能包装在 Promise 中以方便使用。我有以下内容:

streamVotesToFile(query, validVotesFileBasename) {
  return new Promise((resolve, reject) => {
    const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);

    writeStream.on('error', (err) => {
      logger.error(`Writestream ${validVotesFileBasename}.egd error`);
      reject(err);
    });

    writeStream.on('drain', () => {
      logger.info(`Writestream ${validVotesFileBasename}.egd error`);
    })

    db.client.stream(query)
    .on('readable', function() {
      let row = this.read();
      while (row) {
        const envelope = new Buffer(row.vote, 'base64');
        if(!writeStream.write(envelope + '\n')) {
          logger.error(`Couldn't write vote`);
        }
        row = this.read()
      }
    })
    .on('end', () => { // No more rows from Cassandra
      writeStream.end();
      writeStream.on('finish', () => {
        logger.info(`Stream done writing`);
        resolve();
      });
    })
    .on('error', (err) => { // err is a response error from Cassandra
      reject(err);
    });
  });
}

当我运行这个程序时,它会将所有投票附加到一个文件中并正常下载。但我有很多问题/疑问:

  1. 如果我向 /export 端点发出请求并且此函数运行,则在它运行时,对应用程序的所有其他请求都非常慢,或者在导出请求完成之前无法完成。我猜测是因为事件循环被 Cassandra 流中的所有这些事件占用(每秒数千次)?

  2. 所有投票似乎都能很好地写入文件,但几乎每次 writeStream.write() 调用我都会得到 false 并看到相应的记录消息(参见代码)?

  3. 我知道我需要考虑反压和 WritableStream 的“耗尽”事件,因此理想情况下我会使用 pipe() 并将投票通过管道传输到文件,因为它内置了反压支持(对吗?),但由于我需要处理每一行(转换为二进制并可能在将来添加来自其他行字段的其他数据),我将如何使用管道做到这一点?

最佳答案

这是 TransformStream 的完美用例:

const myTransform = new Transform({
  readableObjectMode: true,
  transform(row, encoding, callback) {
    // Transform the row into something else
    const item = new Buffer(row['vote'], 'base64');
    callback(null, item);
  }
});

client.stream(query, params, { prepare: true })
  .pipe(myTransform)
  .pipe(fileStream);

查看有关如何在 Node.js API Docs 中实现 TransformStream 的更多信息.

关于node.js - 考虑背压,将数据从 Cassandra 流式传输到文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42569580/

相关文章:

mongodb - 允许自定义 CRDT 合并的分布式数据库

Cassandra 最小堆大小

c++ - ostream插入运算符与其非成员重载的关系

delphi - Delphi7中如何从流中剪切一部分

node.js - Node.js 上的条件断点

javascript - 使用 Discord.js 时出现 "Error: Cannot find module"

javascript - 在 NodeJS 中迭代循环时出现 TypeError

Cassandra 非柜台系列

C++流式传输到char数组?

javascript - 更新 : Unable to invalidate a subdocument that has not been added to an array