scala - 从 GraphStage (Akka 2.4.2) 内部关闭 Akka 流

标签 scala akka-stream

在 Akka Stream 2.4.2 中,PushStage 已被弃用。对于 Streams 2.0.3,我使用的是这个答案中的解决方案:

How does one close an Akka stream?

这是:

import akka.stream.stage._

    val closeStage = new PushStage[Tpe, Tpe] {
      override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
        case elem if shouldCloseStream ⇒
          // println("stream closed")
          ctx.finish()
        case elem ⇒
          ctx.push(elem)
      }
    }

我如何从 GraphStage/onPush() 内部立即关闭 2.4.2 中的流?

最佳答案

使用这样的东西:

val closeStage = new GraphStage[FlowShape[Tpe, Tpe]] {
  val in = Inlet[Tpe]("closeStage.in")
  val out = Outlet[Tpe]("closeStage.out")

  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case elem if shouldCloseStream ⇒
          // println("stream closed")
          completeStage()
        case msg ⇒
          push(out, msg)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

它更冗长,但一方面可以以可重用的方式定义此逻辑,另一方面不再需要担心流元素之间的差异,因为 GraphStage 可以是以与处理流相同的方式处理:

val flow: Flow[Tpe] = ???
val newFlow = flow.via(closeStage)

关于scala - 从 GraphStage (Akka 2.4.2) 内部关闭 Akka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35544856/

相关文章:

scala - 如何在不依赖 Product 的情况下为元组绑定(bind)类型参数?

ScalaJs With Play Framework(测试错误)

java - Akka 流 : Simulating a failed stream with OverflowStrategy. 失败()

scala - akka-http 发送连续分块的 http 响应(流)

scala - 如何在 Akka HTTP 中将数字流完成为 CSV 值?

scala - 为什么可以在其定义范围之外看到对象的 protected 内部类?

scala - NULL 指针异常,同时在 foreach() 中创建 DF

scala - 模拟specs2中的slick.dbio.DBIO组成

scala - 你如何处理 Akka Flow 中的 futures 和 mapAsync?

scala - 如何解释官方文档中的 Akka Streams 图?