scala - Akka 流 : What is the difference between Unzip and Broadcast?

标签 scala stream akka akka-stream

我正在努力实现这样的目标:

graph

我正在尝试使用 Flow.fromGraph 创建此流

  • 我可以使用 Zip[B, C] 加入 2 个流
  • 我可以通过两种方式拆分:
    • 使用广播[A](2)
    • 使用 UnZip[(A,A)],前面是 .map(a -> (a, a))

map(f1)map(f2) 都是我从包含的库中获取的自定义流程,所以我无法真正修改它们,所以请不要't say .map(a => (f1(a), f2(a)))

这两种情况有什么区别,或者是等同的?我发现唯一不同的是 Broadcast 只有在 all 下游取消时才取消的能力(eagerCancel = false) 这是它的默认行为,与 UnZip 不同(它的作用与广播对 eagerCancel = true 的作用)

此外,如果两条路径中的任何一条发生故障,会发生什么情况?即,如果对于特定元素,f1 抛出错误会产生什么影响?

另外,假设我们没有 f2 函数(所以没有 map 操作)并且我们想在最后发出 (b,a),应该f2 被身份流取代,还是可以一起跳过? (如果是后者,您曾经会使用身份流吗?)

val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)

split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?

这可能有助于内部缓冲区/背压?

最佳答案

它们都是扇出运算符;然而

从文档中解压:

Takes a stream of two element tuples and unzips the two elements ino two different downstreams.

虽然广播

Emit each incoming element each of n outputs.

因此我们可以得出结论,Unzip 只是一个 n = 2 的广播;但重要的是如果元素是一个元组,Broadcast 只会创建相同元组的n 输出。解压缩将为元素 AB

创建 2 个输出 each

关于scala - Akka 流 : What is the difference between Unzip and Broadcast?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55278303/

相关文章:

scala - scala中的交换位置列表

stream - 根据某些条件添加过滤器 java 8

java - 无法打开输出流

scala - 这与 akka Actor 类中的 self

scala - MergeLatest 的默认值

swing - scala.swing.ListView : single/multiple selection

scala - sbt 提交给出 "not a valid command"错误

scala - 将模拟 Actor 注入(inject) Spray 路线进行测试

c# - 如何在不缓冲的情况下流式传输来自 WCF 的响应?

scala - akka 调度器的延迟如何?