scala - 无法在 Akka Stream 中使用 GraphStage 类运行 SourceShape

标签 scala akka akka-stream

我正在尝试使用 GraphStage 构造创建 Redis Akka 流源。这个想法是,每当我从 subscribe 方法获得更新时,我都会将其推送到下一个组件。此外,如果没有拉信号,该组件应该背压。这是代码:

class SubscriberSourceShape(channel: String, subscriber: Subscriber) 
    extends GraphStage[SourceShape[String]] {

  private val outlet: Outlet[String] = Outlet("SubscriberSource.Out")

  override def shape: SourceShape[String] = SourceShape(outlet)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      val callback = getAsyncCallback((msg: String) => push(outlet, msg)
      val handler = (msg: String) => callback.invoke(msg)

      override def preStart(): Unit = subscriber.subscribe(channel)(handler)
    }
  }
}

但是,当我使用简单的接收器运行此程序时,我收到此错误:

阶段 [akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@58344854] 中出现错误:阶段 [...SubscriberSourceShape@6a9193dd] 中没有为输出端口 [RedisSubscriberSource.Out(1414886352) 定义处理程序。所有入口和导出必须在图形阶段逻辑的构造函数中使用 setHandler 分配一个处理程序。

出了什么问题?

最佳答案

您收到此错误是因为您尚未为 Source 设置任何输出处理程序,因此当下游组件(Flow 或 Sink)向此 Source 发送拉取信号时,没有处理程序来处理拉取信号。

您可以添加一个 OutputHandler 来消除错误。将 onPull 方法留空,因为您是在 asyncCallback 上生成元素。只需将其添加到 GraphStageLogic 主体中即可:

      setHandler(outlet, new OutHandler {
        override def onPull(): Unit = {}
      })

关于scala - 无法在 Akka Stream 中使用 GraphStage 类运行 SourceShape,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58847650/

相关文章:

scala - Akka OneForOneStrategy 不起作用

java - Akka Stream 中直播资源流程说明

scala - scala中的require和assert之间如何选择

java - Playframework 2.2 Java 未修改

java - 如何指定Actor处理来自特定端口的消息

scala - 为什么会出现Conflicting cross-version suffixes的错误?

scala - 如何使用 Akka HTTP 进行身份验证

scala - 在 finally block 中使用变量

akka - 使用 Akka-Http 流式传输视频或(未知长度的流)

java - 为什么使用 Sink.asPublisher 创建的 Publisher 在被 BroadcastHub 使用时不起作用?