android - 如何用 rxjava2 取消 Observable.timer?

标签 android rx-java reactive-programming retrofit2 rx-java2

我的代码中有一个重试机制,我使用以下行来执行我的重试逻辑。例如,我生成一个随机毫秒来延迟我的执行。当计时器滴答到 30 * 1000 毫秒时,我想取消这个计时器。我怎样才能取消这个计时器并立即执行我的逻辑。

//register retryWhen
Observable.retryWhen(new RetryWhenException());

//retry code.
public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>>{       
public Observable<?> apply(final Observable<? extends Throwable> observable) throws Exception {
            return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Function<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> apply(Wrapper wrapper) throws Exception {
                    long delay = 60 * 1000;
                    //How can I add some code here to cancel this time and execute this api call immediately if I receive a event like network gets back?
                    return Observable.timer(delay, TimeUnit.MILLISECONDS);
                }
            });
        }
}

提前致谢。

最佳答案

如果我对你的问题的理解正确,你希望能够将多个条件组合到重试中,这意味着重试最多会在一定时间后发生(计时器),或者甚至在某些事件发生后更早(网络已连接) ).
在这种情况下,您需要结合这 2 个事件,首先,您需要一些通知网络事件的 Observable(如何创建它是一个不同的讨论,包装系统应该不是问题使用 Observable 广播事件),那么你可以这样做:

private Observable<NetworkState> networkStateObservable;

public class RetryWhenException implements Function<Observable<? extends Throwable>, Observable<?>> {
    public Observable<?> apply(
            final Observable<? extends Throwable> observable) throws Exception {
        return observable.zipWith(Observable.range(1, count + 1), Wrapper::new)
                .flatMap(wrapper -> {
                    long delay = 60 * 1000;
                    Observable<NetworkState> networkConnectedEvents =
                            networkStateObservable.filter(networkState -> networkState.isConnected())
                               take(1);
                    Observable<Long> timer = Observable.timer(delay, TimeUnit.MILLISECONDS);

                    return Observable.amb(Arrays.asList(networkConnectedEvents, timer));
                });
    }
}

网络状态 Observable 被过滤为仅在连接时获取通知,take(1) 也是为了确保在第一次收到通知后我们将取消订阅它(不需要再听了)。
amb() 运算符似乎非常适合这里,因为它将选择将首先发出的 Observable,并取消订阅另一个,这意味着万一网络 Observable 在计时器之前发出,计时器 Observable 将被取消订阅(= 计时器将被取消)。

编辑:

删除了错误的 takeUntil(networkConnectedEvents),不需要它,因为 amb 会在需要时取消订阅。

关于android - 如何用 rxjava2 取消 Observable.timer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44273129/

相关文章:

android - 如何获取设备位置

android - 如果所有值均为零,MPAndroid 条形图在中间显示零值

rx-java - 单流和多个订阅者

ios - RxSwift - 在 X 秒内未收到元素后发出

javascript - Observable 的下一个/上一个导航

system.reactive - 是否有一个 Rx 框架函数可以创建一个可观察对象,该可观察对象在一段时间过去后结束?

android - FirebaseUI 授权库 : Google sign in fails with: W/AuthMethodPicker: Firebase login unsuccessful

android - 如何启动相机并将拍摄的照片发送到另一个 Activity

java - 你如何使用 RxJava Observables 进行延续?

java - Android Volley RxJava - 多个请求