java - Http Websocket 作为 Akka 流源

标签 java scala akka akka-stream akka-http

我想使用 akka 流在 websocket 上收听。也就是说,我只想将它视为 Source

然而,所有official examples将 websocket 连接视为一个 Flow

我目前的方法是结合使用 websocketClientFlowSource.maybe。当没有新的 Message 被发送到流中时,这最终会导致上游由于 TcpIdleTimeoutException 而失败。

因此,我的问题是双重的:

  1. 有没有一种方法——我显然错过了——将 websocket 视为一个 Source
  2. 如果使用Flow 是唯一的选择,如何正确处理TcpIdleTimeoutException?无法通过提供流监督策略来处理异常。使用 RestartSource 重新启动源也无济于事,因为源不是问题所在。

更新

所以我尝试了两种不同的方法,为方便起见将空闲超时设置为1秒

application.conf

akka.http.client.idle-timeout = 1s

使用 keepAlive(按照 Stefano 的建议)

Source.<Message>maybe()
    .keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
    .viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
    { ... }

执行此操作时,上游仍然失败并出现 TcpIdleTimeoutException

使用 RestartFlow

但是,我发现了 this approach ,使用 RestartFlow:

final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
    Duration.apply(3, TimeUnit.SECONDS),
    Duration.apply(30, TimeUnit.SECONDS),
    0.2,
    () -> createWebsocketFlow(system, websocketUri)
);

Source.<Message>maybe()
    .viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
    { ... }

(...)

private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
    return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}

这是因为我可以将 websocket 视为源(尽管是人为的,正如 Stefano 所解释的那样)并通过在 Exception 时重新启动 websocketClientFlow 来保持 tcp 连接有效> 发生。

不过,这并不是最佳解决方案。

最佳答案

  1. 没有。 WebSocket 是双向 channel ,因此 Akka-HTTP 将其建模为 Flow。如果在您的特定情况下您只关心 channel 的一侧,则您可以使用 Flow.fromSinkAndSource(Sink.ignore , mySource)Flow.fromSinkAndSource(mySink, Source.maybe),视情况而定。

  2. 根据 documentation :

    Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

    有一个 ad-hoc 组合器来注入(inject)保持 Activity 消息,请参见下面的示例和这个 Akka cookbook recipe .注意:这应该发生在客户端。

    src.keepAlive(1.second, () => TextMessage.Strict("ping"))

关于java - Http Websocket 作为 Akka 流源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48153502/

相关文章:

scala - com.paypal.core.rest.PayPalRESTException :Read timed out

actor - 如何确定在 akka 中产生的 Actor 数量?

java - Akka调度程序: Run next only when current run is complete

java - android.os.NetworkOnMainThreadException android 调用 webservice

java - 使用 PersistenceUnitInfo 注册 ClassTransformer 时发生错误

postgresql - 升级数据库后应用程序无法在 Heroku 上运行

scala - 打印集合 : variable specific

java - 对于 Jackrabbit First Hops 示例,应选择哪个 Maven 原型(prototype)代码?

java - 将位图数组设置为 imageview 时发生主线程网络异常

java - Akka java 永远不会关闭 ActorRef