scala - 如何使用封装的源和接收器测试 akka 流闭合形状可运行图

标签 scala scalatest akka-stream

我创建了一个 akka 流,它有一个进程函数和一个传递给它的错误处理函数。 SourceSink完全封装在 ClosedShapeRunnableFlow .我的意图是将一个项目传递给父类并在整个流程中运行它。在我开始测试之前,这一切似乎都有效。我正在使用 scala-test 并将附加传递到进程函数和错误处理函数中的列表。我随机生成错误以查看事物也流向错误处理程序函数。问题是,如果我将 100 个项目传递给父类,那么我希望 error 函数中的项目列表和 process 函数中的项目列表加起来为 100。由于 Source 和 Sink 是完全封装的,我不没有明确的方法告诉测试等待,并在所有项目通过流处理之前到达断言/应该语句。我创建了 this gist来描述流。

以下是上述要点的示例测试:

import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._

class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  def this() = this(ActorSystem("TestSpec"))

  override def afterAll = {
    Thread.sleep(500)
    mat.shutdown()
    TestKit.shutdownActorSystem(system)
  }

  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))

  "TestSpec" must {
    "handle messages" in {
      val testStream = new Testing()                                                 // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
      (1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on

      testStream.errors.size should not be (0)                                       // passes
      testStream.processed.size should not be (0)                                    // passes
      (testStream.processed.size + testStream.errors.size) should be (100)           // fails due to checking before all items are processed
    }
  }
}

最佳答案

根据 Viktor Klang 对链接 Gist 的评论。事实证明这是一个很好的解决方案:

def consume(
    errorHandler: BadData => Unit, fn: Data => Unit, a: String
  ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
    GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
      import GraphDSL.Implicits._

      val source = b.add(Source.single(a))
      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Zip[String, String])
      val process = new ProcessorFlow(fn)
      val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
      val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
        (input: Xor[BadData, Data]) =>
          input.swap.getOrElse((new Throwable, ("", "")))
      ))

      source ~> broadcast.in
                broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                         merge.out ~> process ~> failed ~> errors ~> sink

      ClosedShape
    }
  )

这让我可以 Await.result在 RunnableGraph 上用于测试目的。再次感谢 Viktor 提供的解决方案!

关于scala - 如何使用封装的源和接收器测试 akka 流闭合形状可运行图,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35516519/

相关文章:

java - 使用 PlayWS 通过 Java 创建 WSClient - Materializer null

scala - 使用 Intellij 自动导入的自定义 sbt 配置

scala - 如何在 Play Framework 中隐藏文本字段

scala - Scala 中是否可以同时具有隐式 Ordering[Option[T] 和 Ordered[Option[T]] ?

scala - 将元素从外部推送到 fs2 中的 react 流

scala - Akka 流到 actor 问题

scala - 快速获取数据框中的记录数

scala - 为什么在IntelliJ中运行Scala 2.13测试而不是在Scala 2.12中运行时出现此错误?

scala - SBT/ScalaTest : Configurations already specified for module

scala - SBT:我可以有一个测试套件/ list 并仍然按需运行单独的测试吗?