android - 在RxJava中使用 "skipWhile"结合 "repeatWhen"实现服务器轮询

标签 android networking retrofit rx-java long-polling

我真的很喜欢 RxJava,它是一个很棒的工具,但有时很难理解它是如何工作的。 我们在我们的 Android 项目中使用带有 RxJava 的 Retrofit,并且有以下用例:

我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作。服务器完成后,我必须交付结果。所以我已经用 RxJava 成功地完成了,这里是代码 fragment : 我将“skipWhile”与“repeatWhen”一起使用

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
        .skipWhile(new Func1<CheckJobResponse, Boolean>() {

            @Override
            public Boolean call(CheckJobResponse checkJobResponse) {
                boolean shouldSkip = false;

                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());

                switch (checkJobResponse.getJobStatus()){
                    case CheckJobResponse.PROCESSING:
                        shouldSkip = true;
                        break;
                    case CheckJobResponse.DONE:
                    case CheckJobResponse.ERROR:
                        shouldSkip = false;
                        break;
                }
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);

                return shouldSkip;
            }
        })
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
                return observable.delay(1, TimeUnit.SECONDS);
            }
        }).subscribe(new Subscriber<CheckJobResponse>(){
            @Override
            public void onNext(CheckJobResponse response) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);

            }

            @Override
            public void onError(BaseError error) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
                Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();

            }

            @Override
            public void onCompleted() {
                if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
            }
        });

代码运行良好:

当服务器响应作业正在处理时,我从“skipWhile”链返回“true”,原始 O​​bservable 等待 1 秒并再次执行 http 请求。 重复此过程,直到我从“skipWhile”链返回“false”。

这里有一些我不明白的地方:

  1. 我在“skipWhile”的文档中看到它不会从原始 Observable 发出任何东西(onError、onNext、onComplete),直到我从它的“call”方法返回“false”。那么,如果它不发出任何东西,为什么“repeatWhen”Observable 会执行它的工作呢?它等待一秒钟并再次运行请求。谁发起的?

  2. 第二个问题是:为什么“repeatWhen”的 Observable 不会永远运行,我的意思是为什么当我从“skipWhile”返回“false”时它停止重复?如果我返回“false”,我会在我的订阅者中成功进入 onNext。

  3. 在“repeatWhile”的文档中说,最终我在我的订阅者中收到了对“onComplete”的调用,但从未调用过“onComplete”。

  4. 即使我更改链接“skipWhile”和“repeatWhen”的顺序也没有区别。这是为什么?

我知道 RxJava 是开源的,我只能阅读代码,但正如我所说 - 它真的很难理解。

谢谢。

最佳答案

我以前没有使用过repeatWhen,但这个问题让我很好奇,所以我做了一些研究。

skipWhile 确实发出onErroronCompleted,即使它从不返回true 在那之前。因此,每次 checkJob() 发出 onCompleted 时都会调用 repeatWhen。这回答了问题 #1。

其余问题基于错误的假设。您的订阅将永远运行,因为您的 repeatWhen 永远不会终止。那是因为 repeatWhen 比您意识到的要复杂得多。每当从源中获取 onCompleted 时,其中的 Observable 就会发出 null 。如果你接受它并返回 onCompleted 然后它结束,否则如果你发出任何东西它会重试。由于 delay 只是接受发射并延迟它,所以它总是再次发射 null。因此,它会不断重新订阅。

#2 的答案是它永远运行;您可能正在执行此代码之外的其他操作来取消订阅。对于 #3,您永远不会得到 onCompleted,因为它永远不会完成。对于 #4,顺序无关紧要,因为您要无限期地重复。

现在的问题是,如何获得正确的行为?就像使用 takeUntil 而不是 skipWhile 一样简单。这样,您将一直重复直到得到您想要的结果,从而在您希望它结束​​时终止流。

这是一个代码示例:

Observable<Boolean> source = ...; // Something that eventually emits true

source
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
    .takeUntil(result -> result)
    .filter(result -> result)
    .subscribe(
        res -> System.out.println("onNext(" + res + ")"),
        err -> System.out.println("onError()"),
        () -> System.out.println("onCompleted()")
    );

在此示例中,source 发出 bool 值。我每 1 秒重复一次,直到源发出 true。我一直服用直到 resulttrue。我过滤掉所有 false 的通知,因此订阅者在它为 true 之前不会收到它们。

关于android - 在RxJava中使用 "skipWhile"结合 "repeatWhen"实现服务器轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34943734/

相关文章:

java - 安卓。如何使用改造发布原始数据或帮助使用 asynctask

android - 当 Kotlin 中的数据发生变化时,如何在 RecyclerView 中重新绑定(bind)项目?

android - ParseUser 固定保存最终本地数据存储错误

javascript - 在 Android 应用程序中本地存储 Javascript,这可能吗?

networking - 无法使用内置 ssh 客户端从 macOS 通过 22 以外的任何端口 ssh 到远程主机

C Socket 程序未打印所需的输出(无错误)

android - 为什么我无法在我的 Android 应用程序中使用 Retrofit 在 POST 请求中成功发送 JSON

android - 如何在 Android 应用程序中以编程方式复制文本?

java - 通过 MAC 地址检测 wi-fi 或有线网络接口(interface)

android - 为 WebTokens 身份验证改造自定义客户端