我有以下代码片段:
Observable.just(1)
.flatMap(-> doSomething1)
.timeout(10, SECONDS)
.flatMap(-> doSomething2)
.timeout(10, SECONDS)
.flatMap(-> doSomething3)
.timeout(10, SECONDS)
.flatMap(-> doSomething4)
.timeout(10, SECONDS)
.subscribe();
我不想在每次flatMap
添加超时
后重复自己。我的第一个想法是仅将超时应用于流的开头或结尾,但这不是我想要的行为,因为它只将超时应用于更接近的可观察值。
Observable.just(1)
.flatMap(-> doSomething1)
.flatMap(-> doSomething2)
.flatMap(-> doSomething3)
.flatMap(-> doSomething4)
.timeout(10, SECONDS)
.subscribe();
Observable.just(1)
.timeout(10, SECONDS)
.flatMap(-> doSomething1)
.flatMap(-> doSomething2)
.flatMap(-> doSomething3)
.flatMap(-> doSomething4)
.subscribe();
doSomethingX
函数在调用时执行一些代码,这可能需要一段时间才能返回下一个可观察值,而该可观察值本身不需要包含在超时中。
如何改进?
更新:
下面是更实际的例子。我的想法是编写一个流,我可以在失败或超时的情况下重试。我正在模拟其中一个运算符超时一次但可以重试的场景。
@Test
public void streamToBeSimplified() throws Exception {
final AtomicBoolean retry = new AtomicBoolean(true);
Action1<Object> print = new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(" >>>" + o);
}
};
Observable.just(1)
.doOnNext(print)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(2);
}
})
.timeout(1, TimeUnit.SECONDS)
.doOnNext(print)
.flatMap(new Func1<Object, Observable<Integer>>() {
@Override
public Observable<Integer> call(Object o) {
if(retry.getAndSet(false)) {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return Observable.just(3);
}
})
.timeout(1, TimeUnit.SECONDS)
.doOnNext(print)
.retry(2)
.subscribe();
}
最佳答案
您可以创建一个像这样的辅助方法:
private Observable doThings() {
return Observable.just(1)
.flatMap(__ -> withTimeout(doSomething1, 10, TimeUnit.SECONDS))
.flatMap(__ -> withTimeout(doSomething2, 10, TimeUnit.SECONDS));
// etc
}
private static <T> Observable<T> withTimeout(Observable<T> observable, long time, TimeUnit timeUnit) {
return observable
.timeout(time, timeUnit);
}
关于java - 如何通过超时简化 rxjava 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40237590/