akka-stream - 如何创建具有不同输入和输出类型的流以在图形内部使用?

标签 akka-stream

我正在通过在内部构建图表来制作自定义水槽。这是我的代码的广泛简化,用于演示我的问题:

def mySink: Sink[Int, Unit] = Sink() { implicit builder =>

    val entrance = builder.add(Flow[Int].buffer(500, OverflowStrategy.backpressure))
    val toString = builder.add(Flow[Int, String, Unit].map(_.toString))
    val printSink = builder.add(Sink.foreach(elem => println(elem)))

    builder.addEdge(entrance.out, toString.in)
    builder.addEdge(toString.out, printSink.in)

    entrance.in
}

我遇到的问题是,虽然创建具有相同输入/输出类型的流是有效的,只有一个类型参数并且没有值参数,例如:Flow[Int](其中整个文档中都有)仅提供两个类型参数和零值参数是无效的。

根据reference documentation for the Flow object我正在寻找的 apply 方法定义为

def apply[I, O]()(block: (Builder[Unit]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit]

并说

Creates a Flow by passing a FlowGraph.Builder to the given create function.

The create function is expected to return a pair of Inlet and Outlet which correspond to the created Flows input and output ports.

当我试图制作一个我认为非常简单的流程时,我似乎需要处理另一个级别的图形构建器。是否有一种更简单、更简洁的方法来创建一个流来更改其输入和输出的类型,而不需要弄乱其内部端口?如果这是解决这个问题的正确方法,那么解决方案会是什么样子?

奖励:为什么很容易创建一个不改变其输入和输出类型的流程?

最佳答案

如果您想指定流的输入和输出类型,您确实需要使用文档中找到的 apply 方法。不过,使用它的方法与您已经做的几乎完全相同。

Flow[String, Message]() { implicit b =>
  import FlowGraph.Implicits._

  val reverseString = b.add(Flow[String].map[String] { msg => msg.reverse })
  val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))

  // connect the graph
  reverseString ~> mapStringToMsg

  // expose ports
  (reverseString.inlet, mapStringToMsg.outlet)
}

您不只是返回入口,而是返回一个包含入口和导出的元组。现在我们可以将这个流程与特定的 Source 或 Sink 一起使用(例如在另一个构建器中,或直接与 runWith 一起使用)。

关于akka-stream - 如何创建具有不同输入和输出类型的流以在图形内部使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30228090/

相关文章:

scala - FTP 在 mesos docker 容器中不起作用

java - java中的Akka流广播

scala - 在集群节点上运行的 Akka Streams

scala - 发送 FakeRequest 时如何为 akka.stream.Materializer 提供隐式值?

scala - 流式传输由其他流组成的文件

Azure 事件中心 : de-mux and aggregate events from all partitions of eventhub using IEventProcessor

postgresql - 使用 slick 的 3.0.0 流式结果和 Postgresql 的正确方法是什么?

scala - 重用 akka-stream 流的优雅方式

scala - Akka流-如何访问流的物化值