我对官方托管的示例代码有几个问题 here :
val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
ClosedShape
})
- 什么时候通过
create
传入图表?
为什么create时传入了topHeadSink、bottomHeadSink
,而sharedDoubler
却没有传入?它们有什么区别?
- 什么时候需要
builder.add
?
我可以在没有 builder.add
的情况下在图表外部创建广播吗?如果我在图中添加几个流,是否也应该通过 builder.add
添加流?令人困惑的是,有时我们需要 builder.add
,有时则不需要。
更新
我觉得这仍然令人困惑:
这些方法之间的区别在于,使用 builder.add(...)
导入会忽略导入图形的具体化值,而通过工厂方法导入则允许包含它。
topHS、bottomHS
是从 create
导入的,因此它们将保留其物化值。如果我执行 builder.add(topHS)
会怎样?
你如何解释sharedDoubler
:它是否具有物化值?如果我使用 builder.add
会怎样?
GraphDSL.create(topHeadSink, BottomHeadSink)((_, _))
的((_,_))
是什么意思?
它看起来像我们只需要的样板,但我不确定它是什么。
最佳答案
- When do you pass in a graph through create?
当您想要获取传递给 create
工厂方法的图形的具体化值时。您问题中的 RunnableGraph
的类型是 RunnableGraph[(Future[Int], Future[Int])]
,这意味着该图的物化值为 ( future [Int], future [Int])
:
val g = RunnableGraph.fromGraph(...).run() // (Future[Int], Future[Int])
val topHeadSinkResult = g._1 // Future[Int]
val bottomHeadSinkResult = g._2 // Future[Int]
现在考虑以下变体,它定义图形“内部”的接收器并丢弃物化值:
val g2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHeadSink
broadcast.out(1) ~> sharedDoubler ~> bottomHeadSink
ClosedShape
}).run() // NotUsed
g2
的值为 NotUsed
。
- When do you need builder.add?
图表的所有组件都必须添加到构建器中,但 ~>
有变体运算符将最常用的组件(例如 Source
和 Flow
)添加到底层的构建器中。但是,执行扇入(例如合并
)或扇出(例如广播
)的连接操作必须显式传递如果您使用的是 Graph DSL,则添加到 builder.add
。
请注意,对于简单图形,您可以使用联结而无需使用图形 DSL。这是 documentation 中的示例:
val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)
- What does this mean? the
((_,_))
ofGraphDSL.create(topHeadSink, bottomHeadSink)((_, _))
?
它是一个柯里化(Currying)参数,指定要保留的具体化值。此处使用 ((_, _))
等同于:
val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((t, b) => (t, b)) {
implicit builder => (topHS, bottomHS) =>
...
}).run() // (Future[Int], Future[Int])
换句话说,在此上下文中 ((_, _))
是 ((t, b) => (t, b))
的简写,其中保留传入的两个接收器各自的物化值。例如,如果您只想保留 topHeadSink
的物化值,则可以将调用更改为以下内容:
val g = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((t, _) => t) {
implicit builder => (topHS, bottomHS) =>
...
}).run() // Future[Int]
关于scala - 如何解释官方文档中的 Akka Streams 图?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48633778/