java - 订阅现有的可观察对象,除非它已完成

标签 java reactive-programming rx-java observable

我已经多次需要这样的构造,但我不太知道如何处理它。我的问题是:当 A 发生时,我想创建一个复杂的可观察值(通过组合几个运算符来创建)。它将异步完成一些操作,发布结果并完成。同时,我想允许对该可观察量进行新订阅,但一旦完成,就应该创建新的可观察量,它是第一个可观察量的副本(或者只是做同样的事情)。

(编辑)作为示例,我们有一个简单的可观察量:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)。我的目标是实现以下行为:

[毫秒:操作]

0: obs.subscribe(...) - 我希望这个 observable 在 ~1s 后完成

500:obs.subscribe(...) - 这应该在约 500 毫秒后完成

950:如上所述,应在 50 毫秒后完成

1500:原始可观察应该已经完成​​。我现在想重新开始一切,并在 1 秒后完成此处的订阅

2000:在这里,我想连接到最新的可观察对象,并期望它在 500 秒后完成(因为新的秒从 1500 开始计数)

我不太知道如何以正确且线程安全的方式做到这一点。我可以用一个可观察量来做到这一点吗?

最佳答案

您可以使用defershare来实现这一点。

Observable<Long> o = Observable.defer(() ->
    Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();

o.subscribe(System.out::println);   // T = 0
Thread.sleep(500);
o.subscribe(System.out::println);  // T = 500
Thread.sleep(450);
o.subscribe(System.out::println);   // T = 950

Thread.sleep(550);
o.subscribe(System.out::println);   // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println);   // T == 2000

Thread.sleep(1000);

前 3 个将在 1 秒后同时完成(具有相同的值),后两个将在第一个后 1.5 秒完成(与第一个值不同)。

关于java - 订阅现有的可观察对象,除非它已完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33995800/

相关文章:

angular - tap() 未在 RXJS 管道中触发

android-studio - Kotlin 和 RxJava2 zip 运算符 - 不能使用提供的参数调用以下函数

java - 如何处理 RxJava 链中抛出的错误

java - 随机生成器出了问题?

java - 限制一定数量的客户端连接到服务器

c# - CombineLatest 是否保留了可观察量的顺序?

android - 将无限次调用的回调转换为 Observable

Java 的 JDB(和 Eclipse)在连接到具有许多线程的远程 JVM 时挂起(即通过 TCPIP 传输线程调试信息)

java - Mahout:使用Java矢量化包含文档的文件夹

javascript - 响应式(Reactive)表达式中的热可观察序列是由什么开始的