我正在通过在内部构建图表来制作自定义水槽。这是我的代码的广泛简化,用于演示我的问题:
def mySink: Sink[Int, Unit] = Sink() { implicit builder =>
val entrance = builder.add(Flow[Int].buffer(500, OverflowStrategy.backpressure))
val toString = builder.add(Flow[Int, String, Unit].map(_.toString))
val printSink = builder.add(Sink.foreach(elem => println(elem)))
builder.addEdge(entrance.out, toString.in)
builder.addEdge(toString.out, printSink.in)
entrance.in
}
我遇到的问题是,虽然创建具有相同输入/输出类型的流是有效的,只有一个类型参数并且没有值参数,例如:Flow[Int]
(其中整个文档中都有)仅提供两个类型参数和零值参数是无效的。
根据reference documentation for the Flow object我正在寻找的 apply
方法定义为
def apply[I, O]()(block: (Builder[Unit]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit]
并说
Creates a Flow by passing a FlowGraph.Builder to the given create function.
The create function is expected to return a pair of Inlet and Outlet which correspond to the created Flows input and output ports.
当我试图制作一个我认为非常简单的流程时,我似乎需要处理另一个级别的图形构建器。是否有一种更简单、更简洁的方法来创建一个流来更改其输入和输出的类型,而不需要弄乱其内部端口?如果这是解决这个问题的正确方法,那么解决方案会是什么样子?
奖励:为什么很容易创建一个不改变其输入和输出类型的流程?
最佳答案
如果您想指定流的输入和输出类型,您确实需要使用文档中找到的 apply 方法。不过,使用它的方法与您已经做的几乎完全相同。
Flow[String, Message]() { implicit b =>
import FlowGraph.Implicits._
val reverseString = b.add(Flow[String].map[String] { msg => msg.reverse })
val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))
// connect the graph
reverseString ~> mapStringToMsg
// expose ports
(reverseString.inlet, mapStringToMsg.outlet)
}
您不只是返回入口,而是返回一个包含入口和导出的元组。现在我们可以将这个流程与特定的 Source 或 Sink 一起使用(例如在另一个构建器中,或直接与 runWith 一起使用)。
关于akka-stream - 如何创建具有不同输入和输出类型的流以在图形内部使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30228090/