scala - 如何在 Akka-Stream 2.0 流程开始时向 ActorRef 发送消息?

标签 scala akka akka-stream

目标是发送WSConnectEvent一旦客户端连接并且流开始。使用 akka-streams 1.0,我能够通过以下方式完成此操作:

Flow(Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)) {
  implicit builder =>
    sdpSource =>

      // Incoming SDP offer flow
      val fromWebsocket = builder.add(Flow[Message].collect {
        case TextMessage.Strict(txt) => {
          val event = txt.parseJson.convertTo[WSResponseEvent]
          WSMessageEvent(callUUID, userUUID, event.id, event.data)
        }
      })

      // Outgoing SDP answer flow
      val toWebsocket = builder.add(Flow[WSResponseEvent].map {
        case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
      })

      val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
      val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
      val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))

      // Join events, also sends actor for sending stuff
      val merge = builder.add(Merge[CallControlEvent](2))
      val actorAsSource = builder.materializedValue.map(actor => WSConnectEvent(callUUID, userUUID, actor))

      fromWebsocket ~> merge.in(0)
      actorAsSource ~> merge.in(1)

      merge ~> callActorSink
      sdpSource ~> toWebsocket
      (fromWebsocket.inlet, toWebsocket.outlet)
}

在尝试升级它以与 Akka-Streams 2.0.1 一起使用时,我更改了以下代码,但我并没有收到 WSConnectEvent信息。我不确定这是因为我的源设置不正确,还是我没有实现 ActorRef适本地。
val sdpSource = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)

Flow.fromGraph(
  GraphDSL.create() { implicit builder =>

    // Incoming SDP offer flow
    val fromWebsocket = builder.add(Flow[Message].collect {
      case TextMessage.Strict(txt) => {
        val event = txt.parseJson.convertTo[WSResponseEvent]
        WSMessageEvent(callUUID, userUUID, event.id, event.data)
      }
    })

    // Outgoing SDP answer flow
    val toWebsocket = builder.add(Flow[WSResponseEvent].map {
      case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
    })

    val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
    val callActorRef = Await.result(callActorSelection.resolveOne(), Duration.Inf);
    val callActorSink = Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID))

    // Join events, also sends actor for sending stuff
    val merge = builder.add(Merge[CallControlEvent](2))
    val actorAsSource = sdpSource.mapMaterializedValue(WSConnectEvent(callUUID, userUUID, _))

    fromWebsocket ~> merge.in(0)
    actorAsSource ~> merge.in(1)

    merge ~> callActorSink
    sdpSource ~> toWebsocket
    FlowShape(fromWebsocket.in, toWebsocket.out)
  }
)

最佳答案

感谢johanandren的帮助,mapMaterializedValue不是正确的方法,相反,我需要构建一个流来发送 WSConnectEvent并连接 builder.materializeValue 的输出通过它直到端口中的“合并”,如下所示:

// Join events, also sends actor for sending stuff
val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))

builder.materializedValue ~> actorConnected ~> merge.in(1)

完整的工作示例:
val sdpSource: Source[WSResponseEvent, ActorRef] = Source.actorRef[WSResponseEvent](65535, OverflowStrategy.fail)

Flow.fromGraph(GraphDSL.create(sdpSource) {
  implicit builder =>
    { (responseSource) =>
      import GraphDSL.Implicits._

      // Incoming SDP offer flow
      val fromWebsocket = builder.add(Flow[Message].mapAsync(1)(_ match {
        case tm: TextMessage => tm.textStream.runFold("")(_ + _).map(Some(_))
        case bm: BinaryMessage =>
          bm.dataStream.runWith(Sink.ignore)
          Future.successful(None)
      }).collect {
        case Some(txt) => {
          val event = txt.parseJson.convertTo[WSResponseEvent]
          WSMessageEvent(callUUID, userUUID, event.id, event.data)
        }
      })

      // Outgoing SDP answer flow
      val toWebsocket = builder.add(Flow[WSResponseEvent].map {
        case msg: WSResponseEvent => TextMessage(msg.toJson.compactPrint)
      })

      val callActorSelection = actorSystem.actorSelection(s"/user/application/call-service/call-${callUUID.toString}")
      val callActorRef = Await.result(callActorSelection.resolveOne(), 2.minutes);
      val toCallActor = builder.add(Sink.actorRef[CallControlEvent](callActorRef, WSDisconnectEvent(callUUID, userUUID)))

      // Join events, also sends actor for sending stuff
      val merge = builder.add(Merge[CallControlEvent](2))
      val actorConnected = Flow[ActorRef].map(WSConnectEvent(callUUID, userUUID, _))

      fromWebsocket ~> merge.in(0)
      builder.materializedValue ~> actorConnected ~> merge.in(1)

      merge ~> toCallActor
      responseSource ~> toWebsocket

      FlowShape.of(fromWebsocket.in, toWebsocket.out)
    }
})

关于scala - 如何在 Akka-Stream 2.0 流程开始时向 ActorRef 发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34558522/

相关文章:

java - Scala 通用型与存在型类型混淆

scala - 适合使用 Futures 和 Promises 进行延迟初始化吗?

akka-stream - 需要一个如何在 akka 流中使用 flatMapPrefix 的示例

scala - 如何从 Future[Iterator] 创建源?

scala - 如何在 slick 中使用 sql """插值编写动态 SQL 查询

scala - scala 中的逆变

java - 可以为每个 Actor 指定自定义路由或自定义构造函数参数

kubernetes - 是否可以自动缩放 Akka

java - 物化值在 Akka Stream 中如何工作

scala - 从 Scala 使用的 Redis 客户端库建议