我正在努力实现这样的目标:
我正在尝试使用 Flow.fromGraph
创建此流
- 我可以使用
Zip[B, C]
加入 2 个流 - 我可以通过两种方式
拆分
:- 使用
广播[A](2)
- 使用
UnZip[(A,A)]
,前面是.map(a -> (a, a))
- 使用
map(f1)
和 map(f2)
都是我从包含的库中获取的自定义流程,所以我无法真正修改它们,所以请不要't say .map(a => (f1(a), f2(a)))
这两种情况有什么区别,或者是等同的?我发现唯一不同的是 Broadcast
只有在 all 下游取消时才取消的能力(eagerCancel = false
) 这是它的默认行为,与 UnZip
不同(它的作用与广播对 eagerCancel = true
的作用)
此外,如果两条路径中的任何一条发生故障,会发生什么情况?即,如果对于特定元素,f1 抛出错误会产生什么影响?
另外,假设我们没有 f2
函数(所以没有 map 操作)并且我们想在最后发出 (b,a)
,应该f2 被身份流取代,还是可以一起跳过? (如果是后者,您曾经会使用身份流吗?)
val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)
split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
这可能有助于内部缓冲区/背压?
最佳答案
它们都是扇出运算符;然而
从文档中解压
:
Takes a stream of two element tuples and unzips the two elements ino two different downstreams.
虽然广播
Emit each incoming element each of n outputs.
因此我们可以得出结论,Unzip 只是一个 n = 2
的广播;但重要的是如果元素是一个元组,Broadcast 只会创建相同元组的n
输出。解压缩将为元素 A
和 B
关于scala - Akka 流 : What is the difference between Unzip and Broadcast?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55278303/