Node.Js 流管道上的异步迭代器

标签 node.js nodejs-stream

我有以下管道:

readFile > parseCSV > otherProcess

readFile 是标准 Node.Js createReadStream,而 parseCSV 是 Node.js 转换流(模块 link)。

我想逐行迭代 csv 文件并一次处理一行。因此,流和异步迭代器是完美的搭配。

我有以下代码可以正常工作:

async function* readByLine(path, opt) {
  const readFileStream = fs.createReadStream(path);
  const csvParser = parse(opt);
  const parser = readFileStream.pipe(csvParser);
  for await (const record of parser) {
    yield record;
  }
}

我对 Node.Js 流还很陌生,但我从许多来源了解到,模块 stream.pipeline 优于 .pipe 方法读取流。

如何更改上面的代码以使用 stream.pipeline (实际上是从 util.promisify(pipeline) 获得的 promise 版本)并产生一行当时?

最佳答案

添加到@eol的答案中,我建议存储 promise 并在异步迭代后等待它。

const fs = require('fs');
const parse = require('csv-parse');
const stream = require('stream');

async function* readByLine(path, opt) {
    const readFileStream = fs.createReadStream(path);
    const csvParser = parse(opt);
    const promise = stream.promises.pipeline(readFileStream, csvParser);
    for await (const record of csvParser) {
        yield record;
    }
    await promise;
}

通过在循环之前调用 await pipeline(...),它将消耗整个流,然后您才能从缓冲区中剩余的内容进行迭代,这在小流上偶然起作用,但可能会在较大(或无限/惰性)流上中断。

根据我们等待的位置,回调等效项可能会更清楚地表明发生了什么。

// await before iterating
stream.pipeline(a, b, err => {
  if (err) return callback(err)

  for await (const record of b) {
    // process record
  }

  callback()
}

// await after iterating
for await (const record of stream.pipeline(a, b, callback)) {
  // process record
}

关于Node.Js 流管道上的异步迭代器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65521166/

相关文章:

javascript - 如何在 Node js 中写入 xls 文件和流以响应

node.js - Koa 服务器发送的事件仅对浏览器中的 EventSource 产生错误

javascript - 尝试在discord.js 中记录消息

node.js - Angular 4 具有通用 "Unexpected token import"

javascript - 无法扩展EventEmitter?

node.js - 什么时候使用集群或 worker_threads 更好?

javascript - 如何在nodejs中使用事件和管道创建函数

node.js - 使用 NodeJS 流式传输大型静态文件

node.js - 我想学习如何处理异步代码的 Node js 代码

javascript - 使用作为单引号字符串传递的值解析 JSON 对象