java - RxJava 在网络关闭时取消订阅

标签 java networking stream rx-java vert.x

如果网络请求关闭,我当前正在尝试取消订阅可观察对象,以便不再进一步传输数据。

我的 Observable 是通过以下方式创建的:

Observable.interval(1, TimeUnit.SECONDS)
.map { LocalDateTime.now() }.share()

所以有多个订阅者。但我不知道如果网络关闭如何取消订阅。

我目前正在将带有 vert.x 的服务器发送事件的数据传输到客户端,如下所示:

flow.subscribe({
  response.write("the current time is $it")
}, ::println, {
  response.end()
})

如果我取消客户端的请求,可观察对象将继续“流式传输”数据。

感谢您的帮助

最佳答案

您可以通过调用dispose()来取消订阅订阅者

Disposable disposable = flow.subscribe({
  response.write("the current time is $it")
}, ::println, {
  response.end()
})

disposable.dispose();

更新:自定义可观察

val observable: Observable<LocalDateTime> = Observable.create { emitter ->
    val timer = Timer()
    timer.schedule(timerTask {
        if (emitter.isDisposed) {//<-- cancel emmitting items if disposed
            timer.cancel()
        } else {
            emitter.onNext(LocalDateTime.now())
        }
    }, 0, 1000)

}
disposable = observable.share().subscribe { t ->
    System.out.println(" Hello World! $t");
    disposable?.dispose()//<-- here calling dispose() causes observable to stop emitting items
}

关于java - RxJava 在网络关闭时取消订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57006536/

相关文章:

c# WebResponse内容长度限制

java - 维基百科 API 返回的摘录没有文章中的所有字符?

java - keystore 和信任库对于 ssl 是强制性的吗?

java - 如何将 Java 网络服务器添加到 OSGI 包中

networking - 虚幻引擎 4 多人 LAN - 第二个玩家未生成

swift - 如何在 Swift 中共享流位置 betwixt 函数?

c# - File.Open 跨线程编写

java - STS 3.6 RELEASE 错误运行服务器

Java - 如果我更改列表中的项目,列表中的对象也会更改吗?

java - 如何在android中获取两个时间差