我正在尝试使用 Akka HTTP 构建 Web 套接字服务。我需要处理全部到达的严格消息,以及处理以 m 个多帧到达的流式消息。我正在使用带有 handleWebSocketMessages() 的路由将 Web 套接字的处理传递给流。我的代码如下所示:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒ msg
case TextMessage.Streamed(stream) => ??? // <= What to do here??
}
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[String, String, Any] = {
// Set Up Actors
// ... (this is working)
Flow.fromSinkAndSource(in, out)
}
我不太确定两者如何同时处理 Strict 和 Streamed 消息。我意识到我可以做这样的事情:
.collect {
case TextMessage.Strict(msg) ⇒ Future.successful(msg)
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
}
但现在我的流必须处理 Future[String] 而不仅仅是字符串,然后我不确定如何处理,尤其是因为显然我需要按顺序处理消息。
我确实看到了这个 akka 问题,这似乎有点相关,但不完全是我需要的(我不认为?)。
https://github.com/akka/akka/issues/20096
如有任何帮助,将不胜感激
最佳答案
折叠听起来是一个明智的选择。处理流中的 future 可以使用(例如)
flowOfFutures.mapAsync(parallelism = 3)(identity)
请注意,根据 docs,mapAsync 会保留传入消息的顺序。 .
另一方面,处理流式 WS 消息的其他明智预防措施可能是使用 completionTimeout 并限制消息折叠的绑定(bind)时间和空间(例如)
stream.limit(x).completionTimeout(5 seconds).runFold(...)
关于websocket - 在 Akka 中同时使用严格和流式 WebSocket 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36535143/