android - 为什么不调用 doOnNext()?

标签 android rx-java rx-android reactivex

下面我的订阅者既没有调用 onNext() 也没有调用 onCompleted()。我已经尝试通过 doOnNext()/doOnTerminate() 实现订阅者。我也试过 doAfterTerminate()。我还尝试过显式定义一个订阅者,但没有调用 onNext() 和 onCompleted()。

根据 RxJS reduce doesn't continue ,它是没有终止的 reduce() 所以我尝试添加 take(1) 但那没有用。在同一个 stackoverflow 问题中,有人说问题可能是我的流永远不会关闭。除了 take(1) 之外,也许还有其他方法可以关闭流,但我对 ReactiveX 的理解还不够。

根据 Why is OnComplete not called in this code? (RxAndroid) ,可能是系列中的原始流没有终止。但我不明白为什么如果我调用 take(1) 会很重要,我认为它应该发出终止信号。

基本上,为什么不执行以下行?

System.out.println("doOnNext map.size()=" + map.size());

即使下面这行代码被执行了 98 次:

map.put(e.getKey(), e.getValue());

JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}

JsonObjectObservableRequest调用代码

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

最佳答案

您正确地识别了问题,您没有在源网络 Observable(在 JsonObjectObservableRequest 类)上调用 onCompleted(),而 take(1) 也没有帮助,因为您将它放在之前 reduce
正如您可能理解的那样,reduce 必须在 Observable 上以有限数量的发射进行操作,因为当所有项目都已发射时它会发射累积的项目,因此它依赖于 onCompleted() 事件以了解观察到的流何时结束。

  • 我认为在你的情况下,最好的办法是改变 JsonObjectObservableRequest 的操作方式,为此你不需要 Subject,你可以使用创建方法包装一个回调(你可以看到我的回答 here ,并阅读更多关于回调世界与 RxJava 之间的桥接 here
  • 此外,您不需要发射一个Observable 的东西,然后将它flatmap 到项目,您可以简单地用onNext() 发射项目 并使用 onError() 发出错误,实际上您是在隐藏错误并将它们转换为发射,这可能会使处理错误变得更加困难。
  • 在调用 onNext() 以发出完成信号后,您应该在 onResponse() 调用 onCompleted()。在出现错误的情况下,onError 信号将通知流终止。
  • 另一个问题是取消似乎没有以这种方式处理的请求,您可以阅读源代码以了解在包装 callbacl 调用时如何完成。

关于android - 为什么不调用 doOnNext()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43691447/

相关文章:

retrofit - 是否可以同步运行 Retrofit observable?

java - 在 5 秒计时器 RXJava 上设置可观察值

android - 使用改造和 rx java 的多个 api 请求

android - 如何使用 SharedPreferences 保存字符串数组?

android - 为什么我可以在Android中点击底部的 "behind"?

java - 在 RxJava/RxAndroid 中的可观察对象之间传递响应

android - RxJava,在链接两个可观察对象之前在观察者线程中执行代码

kotlin - 使用 Kotlin 在 RxJava 中处理可空类型

Android - Facebook 集成 : Impossible to Import com. facebook.Session

android - 嵌入式动画,画圈和旋转图像