我想使用 akka 流在 websocket 上收听。也就是说,我只想将它视为 Source
。
然而,所有official examples将 websocket 连接视为一个 Flow
。
我目前的方法是结合使用 websocketClientFlow
和 Source.maybe
。当没有新的 Message
被发送到流中时,这最终会导致上游由于 TcpIdleTimeoutException
而失败。
因此,我的问题是双重的:
- 有没有一种方法——我显然错过了——将 websocket 视为一个
Source
? - 如果使用
Flow
是唯一的选择,如何正确处理TcpIdleTimeoutException
?无法通过提供流监督策略来处理异常。使用RestartSource
重新启动源也无济于事,因为源不是问题所在。
更新
所以我尝试了两种不同的方法,为方便起见将空闲超时设置为1秒
application.conf
akka.http.client.idle-timeout = 1s
使用 keepAlive(按照 Stefano 的建议)
Source.<Message>maybe()
.keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
.viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
{ ... }
执行此操作时,上游仍然失败并出现 TcpIdleTimeoutException
。
使用 RestartFlow
但是,我发现了 this approach ,使用 RestartFlow
:
final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
Duration.apply(3, TimeUnit.SECONDS),
Duration.apply(30, TimeUnit.SECONDS),
0.2,
() -> createWebsocketFlow(system, websocketUri)
);
Source.<Message>maybe()
.viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
{ ... }
(...)
private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}
这是因为我可以将 websocket 视为源(尽管是人为的,正如 Stefano 所解释的那样)并通过在 Exception
时重新启动 websocketClientFlow
来保持 tcp 连接有效> 发生。
不过,这并不是最佳解决方案。
最佳答案
没有。 WebSocket 是双向 channel ,因此 Akka-HTTP 将其建模为
Flow
。如果在您的特定情况下您只关心 channel 的一侧,则您可以使用Flow.fromSinkAndSource(Sink.ignore , mySource)
或Flow.fromSinkAndSource(mySink, Source.maybe)
,视情况而定。根据 documentation :
Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.
有一个 ad-hoc 组合器来注入(inject)保持 Activity 消息,请参见下面的示例和这个 Akka cookbook recipe .注意:这应该发生在客户端。
src.keepAlive(1.second, () => TextMessage.Strict("ping"))
关于java - Http Websocket 作为 Akka 流源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48153502/