我已经多次需要这样的构造,但我不太知道如何处理它。我的问题是:当 A 发生时,我想创建一个复杂的可观察值(通过组合几个运算符来创建)。它将异步完成一些操作,发布结果并完成。同时,我想允许对该可观察量进行新订阅,但一旦完成,就应该创建新的可观察量,它是第一个可观察量的副本(或者只是做同样的事情)。
(编辑)作为示例,我们有一个简单的可观察量:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)
。我的目标是实现以下行为:
[毫秒:操作]
0: obs.subscribe(...)
- 我希望这个 observable 在 ~1s 后完成
500:obs.subscribe(...)
- 这应该在约 500 毫秒后完成
950:如上所述,应在 50 毫秒后完成
1500:原始可观察应该已经完成。我现在想重新开始一切,并在 1 秒后完成此处的订阅
2000:在这里,我想连接到最新的可观察对象,并期望它在 500 秒后完成(因为新的秒从 1500 开始计数)
我不太知道如何以正确且线程安全的方式做到这一点。我可以用一个可观察量来做到这一点吗?
最佳答案
您可以使用defer
和share
来实现这一点。
Observable<Long> o = Observable.defer(() ->
Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS))
.share();
o.subscribe(System.out::println); // T = 0
Thread.sleep(500);
o.subscribe(System.out::println); // T = 500
Thread.sleep(450);
o.subscribe(System.out::println); // T = 950
Thread.sleep(550);
o.subscribe(System.out::println); // T = 1500
Thread.sleep(500);
o.subscribe(System.out::println); // T == 2000
Thread.sleep(1000);
前 3 个将在 1 秒后同时完成(具有相同的值),后两个将在第一个后 1.5 秒完成(与第一个值不同)。
关于java - 订阅现有的可观察对象,除非它已完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33995800/