我的代码中有一个重试机制,我使用以下行来执行我的重试逻辑。例如,我生成一个随机毫秒来延迟我的执行。当计时器滴答到 30 * 1000 毫秒时,我想取消这个计时器。我怎样才能取消这个计时器并立即执行我的逻辑。
//register retryWhen
Observable.retryWhen(new RetryWhenException());
//retry code.
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(new Function<Wrapper, Observable<?>>() {
@Override
public Observable<?> apply(Wrapper wrapper) throws Exception {
long delay = 60 * 1000;
//How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back?
return Observable.timer(delay, TimeUnit.MILLISECONDS);
}
});
}
}
提前致谢。
最佳答案
如果我对你的问题的理解正确,你希望能够将多个条件组合到重试中,这意味着重试最多会在一定时间后发生(计时器),或者甚至在某些事件发生后更早(网络已连接) ).
在这种情况下,您需要结合这 2 个事件,首先,您需要一些通知网络事件的 Observable
(如何创建它是一个不同的讨论,包装系统应该不是问题使用 Observable 广播事件),那么你可以这样做:
private Observable<NetworkState> networkStateObservable;
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> {
public Observable<?> apply(
final Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), Wrapper::new)
.flatMap(wrapper -> {
long delay = 60 * 1000;
Observable<NetworkState> networkConnectedEvents =
networkStateObservable.filter(networkState -> networkState.isConnected())
take(1);
Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS);
return Observable.amb(Arrays.asList(networkConnectedEvents, timer));
});
}
}
网络状态 Observable
被过滤为仅在连接时获取通知,take(1)
也是为了确保在第一次收到通知后我们将取消订阅它(不需要再听了)。
amb()
运算符似乎非常适合这里,因为它将选择将首先发出的 Observable
,并取消订阅另一个,这意味着万一网络 Observable
在计时器之前发出,计时器 Observable
将被取消订阅(= 计时器将被取消)。
编辑:
删除了错误的 takeUntil(networkConnectedEvents),不需要它,因为 amb 会在需要时取消订阅。
关于android - 如何用 rxjava2 取消 Observable.timer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44273129/