我创建了一个 akka 流,它有一个进程函数和一个传递给它的错误处理函数。 Source
和 Sink
完全封装在 ClosedShape
中RunnableFlow
.我的意图是将一个项目传递给父类并在整个流程中运行它。在我开始测试之前,这一切似乎都有效。我正在使用 scala-test 并将附加传递到进程函数和错误处理函数中的列表。我随机生成错误以查看事物也流向错误处理程序函数。问题是,如果我将 100 个项目传递给父类,那么我希望 error 函数中的项目列表和 process 函数中的项目列表加起来为 100。由于 Source 和 Sink 是完全封装的,我不没有明确的方法告诉测试等待,并在所有项目通过流处理之前到达断言/应该语句。我创建了 this gist来描述流。
以下是上述要点的示例测试:
import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._
class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestSpec"))
override def afterAll = {
Thread.sleep(500)
mat.shutdown()
TestKit.shutdownActorSystem(system)
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))
"TestSpec" must {
"handle messages" in {
val testStream = new Testing() // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
(1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on
testStream.errors.size should not be (0) // passes
testStream.processed.size should not be (0) // passes
(testStream.processed.size + testStream.errors.size) should be (100) // fails due to checking before all items are processed
}
}
}
最佳答案
根据 Viktor Klang 对链接 Gist 的评论。事实证明这是一个很好的解决方案:
def consume(
errorHandler: BadData => Unit, fn: Data => Unit, a: String
): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
import GraphDSL.Implicits._
val source = b.add(Source.single(a))
val broadcast = b.add(Broadcast[String](2))
val merge = b.add(Zip[String, String])
val process = new ProcessorFlow(fn)
val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
(input: Xor[BadData, Data]) =>
input.swap.getOrElse((new Throwable, ("", "")))
))
source ~> broadcast.in
broadcast.out(0) ~> Flow[String].map(_.reverse) ~> merge.in0
broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
merge.out ~> process ~> failed ~> errors ~> sink
ClosedShape
}
)
这让我可以
Await.result
在 RunnableGraph 上用于测试目的。再次感谢 Viktor 提供的解决方案!
关于scala - 如何使用封装的源和接收器测试 akka 流闭合形状可运行图,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35516519/