scala - 如何有条件地将消息传递到特定的下一个阶段(以及其他阶段)?

标签 scala akka typesafe akka-stream

考虑一个简单的场景,根据传递的消息的某些属性,我希望它由特定的下一阶段处理并继续。

[Source[ActionMessage]] ~> [Flow[ActionMessage, EnrichedActionMessage]] 
~> (eAM: EnrichedActionMessage => eAM.actionType match {
      case ActionA => eAM ~> Flow[EnrichedActionMessage, ReactionA] ~> Sink[ReactionA]
      case ActionB => eAM ~> Flow[EnrichedActionMessage, ReactionB] ~> Sink[ReactionB]
      case ActionC => eAM ~> Flow[EnrichedActionMessage, ReactionC] ~> Sink[ReactionC]
    })

如何实现到阶段图阶段的条件路由?

最佳答案

此答案基于akka-stream版本2.4.2-RC1。其他版本中的 API 可能略有不同。该依赖项可以由 sbt 消耗:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC1"

使用分区组件:

val shape = GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._

  val first = b.add(Sink.foreach[Int](elem ⇒ println("first:\t" + elem)))
  val second = b.add(Sink.foreach[Int](elem ⇒ println("second:\t" + elem)))
  val third = b.add(Sink.foreach[Int](elem ⇒ println("third:\t" + elem)))
  val p = b.add(Partition[Int](3, elem ⇒ elem match {
    case 0                ⇒ 0
    case elem if elem < 0 ⇒ 1
    case elem if elem > 0 ⇒ 2
  }))

  p ~> first
  p ~> second
  p ~> third

  SinkShape(p.in)
}
Source(List(0, 1, 2, -1, 1, -5, 0)).to(shape).run()

/*
Output:
first: 0
third: 1
third: 2
second: -1
third: 1
second: -5
first: 0
*/

除了SinkShape,您还可以返回一个new FanOutShape3(p.in, p.out(0), p.out(1), p.out(2)) 如果您希望稍后对元素进行任何处理。

关于scala - 如何有条件地将消息传递到特定的下一个阶段(以及其他阶段)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35225866/

相关文章:

mongodb - Scala 根据最大​​值过滤列表

akka - 如何告诉akka actor成功发送消息到其他actor的邮箱?

Java STM : Questions on Multiverse STM

playframework - SBT :ResolvedException - Unresolved dependency PlayFramework 2. 3.6

scala - 封装在 Future 中的同步 HTTP 请求是否被视为 CPU 或 IO 限制?

scala - 如何在 Scala 中使用滑动流获取斐波那契数列?

scala - 如何将配置文件传递给 Scala jar 文件

Java 和 "Type Safety..."

scala - 由于 "Invalid memory access of location"导致的 JVM 段错误

java - AKKA:Actor 路由器消息持久化