在 Akka Stream 2.4.2 中,PushStage 已被弃用。对于 Streams 2.0.3,我使用的是这个答案中的解决方案:
How does one close an Akka stream?
这是:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
我如何从 GraphStage/onPush() 内部立即关闭 2.4.2 中的流?
最佳答案
使用这样的东西:
val closeStage = new GraphStage[FlowShape[Tpe, Tpe]] {
val in = Inlet[Tpe]("closeStage.in")
val out = Outlet[Tpe]("closeStage.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = grab(in) match {
case elem if shouldCloseStream ⇒
// println("stream closed")
completeStage()
case msg ⇒
push(out, msg)
}
})
setHandler(out, new OutHandler {
override def onPull() = pull(in)
})
}
}
它更冗长,但一方面可以以可重用的方式定义此逻辑,另一方面不再需要担心流元素之间的差异,因为 GraphStage
可以是以与处理流相同的方式处理:
val flow: Flow[Tpe] = ???
val newFlow = flow.via(closeStage)
关于scala - 从 GraphStage (Akka 2.4.2) 内部关闭 Akka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35544856/