javascript - 如何在保持顺序的同时进行并发 Node.js 流处理?

标签 javascript node.js concurrency stream

我有一个使用流的复杂数据处理管道,其中有一个可读流输入、一个可写流输出和一系列转换流(让我们称之为它们是step1step2step3step4)。虽然 step1step3output 是无状态的,仅依赖于传入的数据 block 来生成输出,逐个 block ,step2step4 是聚合步骤,从多个 block 收集数据以生成其输出,并且通常具有在时间上重叠的输出(例如 chunk1、chunk3 和 chunk5 可能会生成输出 1、chunk2 chunk4 可能会产生output2,依此类推)。

目前,管道的结构如下:

input.pipe(step1).pipe(step2).pipe(step3).pipe(step4).pipe(output);

该管道的计算成本非常高,因此我想将其拆分到多个实例,最好在多个内核上运行。 Node.js 流保证顺序保留,因此 Node.js 似乎平衡了消息传递,使得来自一个步骤的数据 block 首先传递到下一步,这是我在任何方法上都需要拥有的属性我想出了使这个计算并发的方法。

我绝对不是要求指导,更多的是如果有人以前解决过这个问题,以及用于此类事情的一般方法。我不太确定从哪里开始。

最佳答案

虽然我还没能完成订单保存,但是我支持的流式框架,scramjet ,将使您真正接近实现目标。

我将在这里插入您找到最佳解决方案:

let seq = 0;
source.pipe(new DataStream())
    .map(data => {data, itr: seq++})        // mark your order
    .separate(x => x % 8)                   // separate into 8 streams
    .cluster((stream) => {                  // spawn subprocesses
         // do your multi threaded transforms here
    }, {threads: 8})
    .mux((a, b) => a.itr - b.itr)           // merge in the order above

在某些时候我会引入重新排序,但为了保持抽象,我不能采取太多的捷径,但你可以像上面示例中的 count 的 2^52 限制一样采用你的捷径(seq 将耗尽位)然后增加空间)。

这应该会引导您找到一些解决方案。

关于javascript - 如何在保持顺序的同时进行并发 Node.js 流处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45249456/

相关文章:

java - 如何控制一个Spring应用中有多少个线程?

algorithm - 我如何确定所有 Actor 都收到了广播消息

javascript - 通过 id 加载特定页面时数据表不起作用

javascript - 当我使用 ip :port 时 getUserMedia 未定义

javascript - 使用 Three.js 将一个对象的旋转应用于另一个对象

javascript - 当我在 mongodb 集合中添加项目时如何管理 foreach 中的 promise

javascript - 相对 URL 不适用于 Node 中的 axios

javascript - express-fileupload 图片上传不起作用

java - 如何轮流监听两组线程获取synchronized section?

c# - 客户端用户确认后调用ASPX服务器端函数?