android - RxJava - 等到重试完成其他 Activity/fragment 中的其他可观察对象

标签 android android-fragments rx-java reactive-programming

用例:我正在开发一个 Android 应用程序,它有一个带有 4 个选项卡的 viewpager,它们都是 fragment 。对于每个选项卡/fragment ,我必须每 5 分钟使用 Oauth 和 token 过期连接到 REST Api。

当前解决方案: 使用 RxJava 和 retryWhen 运算符,我可以在收到 401 HTTP 错误时重新进行身份验证。对于每个订阅和消费的 Observable 流,使用:

retryWhen(refreshTokenAuthenticator)

因此,当 token 过期时,流会使用它,然后执行真正的 api 调用。

问题: 这仅适用于一个订阅中消耗的一个可观察对象,但我需要允许用户在选项卡之间切换而不阻止他/她考虑到 401 错误可能随时出现在任何 Api 调用的任何 fragment 中。

问题:有没有办法让可观察对象等待其他不在同一流/订阅者中的可观察对象以 onNext() 结束?事实上在不同的 fragment 中?所以 api 调用场景将是这样的:

Api Call Fragment A --> request
Api Call Fragment A <-- response 200 Code

Api Call Fragment B --> request
Api Call Fragment B <-- response 401 Code (retryWhen in action)
Api Call Fragment B --> request (refreshToken)
Api Call Fragment B <-- response 200 (with new access token saved in the app)

几乎同时...

Api Call Fragment C --> request
Api Call Fragment C <-- response 401 Code (retryWhen in action)

Observable in Fragment C Waits till Observable in Fragment B finish (onNext())

Api Call Fragment C --> request
Api Call Fragment C <-- response 200

这是我已有的,每个 API 调用看起来几乎一样:

public void getDashboardDetail() {

    Subscription subscription = repository.getDashboard()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retryWhen(tokenAuthenticator)
            .subscribe(new RestHttpObserver<UserDataDto>() {
                @Override
                public void onUnknownError(Throwable e) {
                    getMvpView().onError(e);
                }

                @Override
                public void onHostUnreachable() {
                    getMvpView().onHostUnreachable();
                }

                @Override
                public void onHttpErrorCode(int errorCode, ErrorDto errorDto) {
                    getMvpView().onHttpErrorCode(errorCode, errorDto);
                }

                @Override
                public void onCompleted() {
                    //Do nothing...
                }

                @Override
                public void onNext(UserDataDto response) {
                    getMvpView().onReceiveUserData(response);
                }
            });

    this.compositeSubscription.add(subscription);

}

还有我的 RefreshTokenAuthenticator:

public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> {

private static final int RETRY_COUNT = 1;

private static final int HTTP_ERROR_CODE = 401;

@Inject
private UserRepository repository;

@Inject
private SessionManager sessionManager;

@Inject
private MyApplication application;


@Inject
private RefreshTokenAuthenticator() {
}

@Override
public synchronized Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .flatMap(new Func1<Throwable, Observable<?>>() {
                int retryCount = 0;

                @Override
                public Observable<?> call(final Throwable throwable) {

                    retryCount++;
                    if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) {
                        int errorCode = ((HttpException) throwable).code();
                        if (errorCode == HTTP_ERROR_CODE) {
                            return repository
                                    .refreshToken(sessionManager.getAuthToken().getRefreshToken())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribeOn(Schedulers.io())

                                    .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto))
                                    .doOnError(throwable1 -> {
                                        Log.e("RefreshTokenAuth", "DoOnError", throwable1);
                                        application.logout();
                                    });

                        }
                    }
                    // No more retries. Pass the original Retrofit error through.
                    return Observable.error(throwable);
                }
            });
}

最佳答案

1) 使 auth token 的来源缓存上次成功的结果 + 提供使该缓存结果无效的方法:

class Auth {
    private Observable<AuthToken> validToken;

    synchronized void invalidateAuthToken() {
        validToken = null;
    }

    synchronized Observable<AuthToken> getAuthToken() {
        if (validToken == null) {
            validToken = repository
                .refreshToken(...) // start async request
                .doOnError(e -> invalidateAuthToken())
                .replay(1); // cache result
        }
        return validToken; // share among all subscribers
    }
}

2) 要访问网络服务,请使用以下模式:

Observable<Data1> dataSource1 = 
    Observable.defer(auth.getAuthToken()) // always start from token
        .flatMap(token ->
            repository.fetchData1(token, ...)) // use token to call web service
        .doOnError(e -> auth.invalidateAuthToken())
        .retry(N); // retry N times

关于android - RxJava - 等到重试完成其他 Activity/fragment 中的其他可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42056375/

相关文章:

android - 在 datePickerdialog 中设置特定日期

android - 从 android 调用 web 服务时出错

java - 使用多个 Fragment 的最佳方式是什么?

android - adMob 横幅在软键盘弹出时覆盖 TextView

android - 通过联系人选择器向 Android 中的 ContactsContract.CommonDataKinds.Event 添加事件

android - Onbackpressed 关闭应用程序登录后不关闭应用程序

android - fragment 标准过渡没有动画

rx-java - 如何在 rx java 中分组并返回列表

java - 将 CharSequence 转换为 Float

java - RxJava 缓冲 - 忽略零项