我有一个使用流的复杂数据处理管道,其中有一个可读流输入
、一个可写流输出
和一系列转换流(让我们称之为它们是step1
、step2
、step3
和step4
)。虽然 step1
、step3
和 output
是无状态的,仅依赖于传入的数据 block 来生成输出,逐个 block ,step2
和 step4
是聚合步骤,从多个 block 收集数据以生成其输出,并且通常具有在时间上重叠的输出(例如 chunk1、chunk3 和 chunk5 可能会生成输出 1、chunk2 chunk4 可能会产生output2,依此类推)。
目前,管道的结构如下:
input.pipe(step1).pipe(step2).pipe(step3).pipe(step4).pipe(output);
该管道的计算成本非常高,因此我想将其拆分到多个实例,最好在多个内核上运行。 Node.js 流保证顺序保留,因此 Node.js 似乎平衡了消息传递,使得来自一个步骤的数据 block 首先传递到下一步,这是我在任何方法上都需要拥有的属性我想出了使这个计算并发的方法。
我绝对不是要求指导,更多的是如果有人以前解决过这个问题,以及用于此类事情的一般方法。我不太确定从哪里开始。
最佳答案
虽然我还没能完成订单保存,但是我支持的流式框架,scramjet ,将使您真正接近实现目标。
我将在这里插入您找到最佳解决方案:
let seq = 0;
source.pipe(new DataStream())
.map(data => {data, itr: seq++}) // mark your order
.separate(x => x % 8) // separate into 8 streams
.cluster((stream) => { // spawn subprocesses
// do your multi threaded transforms here
}, {threads: 8})
.mux((a, b) => a.itr - b.itr) // merge in the order above
在某些时候我会引入重新排序,但为了保持抽象,我不能采取太多的捷径,但你可以像上面示例中的 count 的 2^52 限制一样采用你的捷径(seq 将耗尽位)然后增加空间)。
这应该会引导您找到一些解决方案。
关于javascript - 如何在保持顺序的同时进行并发 Node.js 流处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45249456/