scala - 如何使用 akka-http websocket 客户端监听 websocket 服务器关闭事件

标签 scala websocket akka-http

我有 websocket 客户端连接到 akka-http websocket 服务器,我如何监听服务器上发生的连接关闭事件(即服务器关闭/服务器关闭 websocket 连接)?

object Client extends App {


  implicit val actorSystem = ActorSystem("akka-system")
  implicit val flowMaterializer = ActorMaterializer()

  val config = actorSystem.settings.config
  val interface = config.getString("app.interface")

  val port = config.getInt("app.port")


  // print each incoming strict text message
  val printSink: Sink[Message, Future[Done]] =
    Sink.foreach {
      case message: TextMessage.Strict =>
        println(message.text)

      case _ => {
        sourceQueue.map(q => {
          println(s"offering message on client")
          q.offer(TextMessage("received unknown"))
        })
        println(s"received unknown message format")
      }
    }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
      .keepAlive(FiniteDuration(1, TimeUnit.SECONDS), () => TextMessage.Strict("Heart Beat"))
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.right)


  val (upgradeResponse, sourceClose) =
    Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow)

  val connected = upgradeResponse.map { upgrade =>
    // just like a regular http request we can get 404 NotFound,
    // with a response body, that will be available from upgrade.response
    if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Done
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
    }
  }


  connected.onComplete(println)

}

最佳答案

Websocket 连接终止被建模为常规流完成,因此在您的情况下,您可以使用 Sink.foreach 产生的物化 Future[Done]:

val flow = Flow.fromSinkAndSourceMat(printSink, source)(Keep.both)

val (upgradeResponse, (sinkClose, sourceClose)) =
  Http().singleWebSocketRequest(..., flow)

sinkClose.onComplete {
  case Success(_) => println("Connection closed gracefully")
  case Failure(e) => println("Connection closed with an error: $e")
}

关于scala - 如何使用 akka-http websocket 客户端监听 websocket 服务器关闭事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37727410/

相关文章:

java - AWS - 使用 @connections websocket 回调 url 从后端发送响应(单向) - API Gateway websocket 协议(protocol)

ruby-on-rails - 如何让服务器端 faye 客户端在我的 Rails 应用程序中运行?

scala - 困惑重新 : solution to fibonnaci numbers with Scala infinite streams that used a def'd method within a def

scala - 元组推理失败

javascript - 通过 WebSocket 解码音频 block

json - 喷雾-json。如何从json中获取对象列表

scala - Akka http 丢失发件人引用

ssl - 无法将 Akka-http 10.10 中的 AhcWSClient 配置为 AcceptAnyCertificate

parsing - Haskell 相当于 scala 收集

scala - Spark 是否在读取时维护 Parquet 分区?