scala - 在 Scala 中拦截套接字关闭上的 Akka HTTP WebSocket 事件

标签 scala websocket akka akka-stream akka-http

我使用 Scala 和 Akka HTTP。

我有一个基于 Akka HTTP 的服务器后端应用程序。传入的 WebSocket 消息由 handleWebSocketMessages 处理:

/**
  * Route for WebSocket request
  */
private val webSocketRoute: Route = pathSuffix(Constants.WSPROXY_WEBSOCKET_PATH_SUFFIX) {
    LOGGER.debug("() Web socket route")
    handleWebSocketMessages(wsRouteImpl)
}

/**
  * This method calls all registered handlers.
  *
  * @return flow for handleWebSocketMessages method
  */
private def wsRouteImpl: Flow[Message, Message, Any] = {
    LOGGER.debug("() wsRouteHandling")
    numOfClients += 1

    LOGGER.info(s"Client has connected, current number of clients: $numOfClients")

    var flow = Flow[Message].mapConcat {
  // Call specific handlers depending on message type
  ...
}

我的 WebSocket 客户端建立双向通信连接并保持事件状态。

绑定(bind)完成:

val binding = Http().bindAndHandle(webSocketRoute, config.host, config.port)

问题是我需要为关闭的套接字注入(inject)回调(例如,如果客户端已断开连接)并减少当前的客户端数量,但我找不到任何入口点。

是否可以在套接字关闭时捕获某种事件?

最佳答案

使用watchTermination :

val numOfClients = new java.util.concurrent.atomic.AtomicInteger(0)

private val webSocketRoute: Route = pathSuffix(Constants.WSPROXY_WEBSOCKET_PATH_SUFFIX) {
  LOGGER.debug("() Web socket route")

  val wsFlow: Flow[Message, Message, Any] =
    wsRouteImpl.watchTermination() { (_, fut) =>
      numOfClients.incrementAndGet()
      LOGGER.info(s"Client has connected. Current number of clients: $numOfClients")

      fut onComplete {
        case Success(_) =>
          numOfClients.decrementAndGet()
          LOGGER.info(s"Client has disconnected. Current number of clients: $numOfClients")
        case Failure(ex) =>
          numOfClients.decrementAndGet()
          LOGGER.error(s"Disconnection failure (number of clients: $numOfClients): $ex")
      }
    }

  handleWebSocketMessages(wsFlow)
}

private def wsRouteImpl: Flow[Message, Message, Any] = ???

关于scala - 在 Scala 中拦截套接字关闭上的 Akka HTTP WebSocket 事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50762479/

相关文章:

Java : Opening client socket

Flutter 与 SignalR 后端进行身份验证的 WebSocket 通信

scala - 如果需要,Akka actor 可以从其邮箱中删除消息吗?

scala - 对于理解: how to run Futures sequentially

scala - 选项 [T] 的最小/最大可能为空序列?

ruby - ruby 中的安全 Websocket 客户端

Akka 的 Java 聚合器

playframework-2.0 - play 框架 2.1 - 调度异步任务

Scala 特化用于基本类型的数值运算

scala - 将具有静态属性和注释的Java类转换为等效的Scala