websocket - 在 Akka 中同时使用严格和流式 WebSocket 消息

标签 websocket akka akka-stream akka-http

我正在尝试使用 Akka HTTP 构建 Web 套接字服务。我需要处理全部到达的严格消息,以及处理以 m 个多帧到达的流式消息。我正在使用带有 handleWebSocketMessages() 的路由将 Web 套接字的处理传递给流。我的代码如下所示:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ msg
    case TextMessage.Streamed(stream) => ??? // <= What to do here??
  }
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[String, String, Any] = {
  // Set Up Actors
  // ... (this is working)
  Flow.fromSinkAndSource(in, out)
}

我不太确定两者如何同时处理 Strict 和 Streamed 消息。我意识到我可以做这样的事情:

  .collect {
    case TextMessage.Strict(msg) ⇒ Future.successful(msg)
    case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
  }

但现在我的流必须处理 Future[String] 而不仅仅是字符串,然后我不确定如何处理,尤其是因为显然我需要按顺序处理消息。

我确实看到了这个 akka 问题,这似乎有点相关,但不完全是我需要的(我不认为?)。

https://github.com/akka/akka/issues/20096

如有任何帮助,将不胜感激

最佳答案

折叠听起来是一个明智的选择。处理流中的 future 可以使用(例如)

flowOfFutures.mapAsync(parallelism = 3)(identity)

请注意,根据 docs,mapAsync 会保留传入消息的顺序。 .

另一方面,处理流式 WS 消息的其他明智预防措施可能是使用 completionTimeout 并限制消息折叠的绑定(bind)时间和空间(例如)

stream.limit(x).completionTimeout(5 seconds).runFold(...)

关于websocket - 在 Akka 中同时使用严格和流式 WebSocket 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36535143/

相关文章:

scala - akka 流 throttle 如何工作?

python - 使用 Python 与使用 JSON 的 web socket 通信

scala - 使用 Akka Streams 读取大文件

java - 在 spring 中运行 javax.websocket 端点?

java - 如何模拟基于 ActorSelection 的 Actor ?

scala - 使用 Akka IO 通过 Tcp 客户端发送非 ByteString

multithreading - 如何保证akka中fork的顺序

scala - MergeLatest 的默认值

php - 从 docker 到 websocket 的彩色输出

http - Websocket - 客户端应该发送 ping 帧吗?