android - RxAndroid ViewObservable NetworkOnMainThreadException

标签 android multithreading rx-java rx-android

我有一个 Button我从中创建了一个 Observable<OnClickEvent> .

单击按钮时,我希望从网络中获取文件,但我遇到了有关网络和线程的问题。

这个例子抛出 android.os.NetworkOnMainThreadException :

Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

所以我从另一个线程尝试。

以下抛出 rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main] :

networkButtonObservable
    .subscribeOn(Schedulers.newThread())
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

好的..现在我尝试使用 .debounce()一开始:

networkButtonObservable
    .debounce(10, TimeUnit.MILLISECONDS)
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

这就成功了。

显然我不喜欢在我的代码中添加延迟,所以我试图弄清楚线程方面发生了什么。为什么第一个示例不执行 .map() 中的代码在后台线程中?

或者我在这里缺少什么?

---更新

我更改我的 TestAPI 以返回一个 Observable,并将对 networkButtonObservable 的第一次调用更改为 .flatMap() .这也能正常工作。但我仍然不知道为什么原来的方式使用.map()应该会失败。

networkButtonObservable
    .flatMap(new Func1<OnClickEvent, Observable<?>>() {
        @Override
        public Observable<?> call(OnClickEvent onClickEvent) {
            return TestAPI.getTestService().fetchTestResponseObservable();
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

最佳答案

我不是 Android 方面的专家,但根据错误消息,我认为您需要在主线程和后台线程之间反弹值。通常,Android 示例会向您展示如何将 subscribeOn/observeOn 对添加到您的流处理中:

Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});

但在这些情况下,“源”通常是您可以控制的冷可观察量。

在你的问题中,源是一个热的Observable,你需要在主线程上订阅特定的要求,但你需要在后台线程上进行网络调用,然后显示结果在主线程上。

在这种情况下,您可以多次使用observeOn:

networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));

我认为 fetchTestResponseObservable 有自己的 subscribeOnobserveOn 应用于它,因此它不会抛出网络异常。

我还想提一下,使用多个 subscribeOn 在功能上等同于仅使用一个最接近发射源的订阅,但从技术上讲,它会占用未使用的线程资源。然而,在流中使用多个 observeOn 具有相关性,因为您可以有意义地“管道化”线程之间的流处理。

关于android - RxAndroid ViewObservable NetworkOnMainThreadException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30482465/

相关文章:

java - 使用 RxAndroid 生成树排序结构

javascript - 自动表单手动提交

java - 将 Content-ID 添加到多部分实体

android - 应用程序在抽屉导航内的 fragment 内按下后退按钮时退出

java - 是否没有办法迭代或复制 Java ThreadLocal 的所有值?

android - Rxjava 返回 LiveData 列表

java - 谷歌地图应用程序中未显示地理 Intent 标签

c# - 如何锁定方法内容

python - 在 Linux 上创建线程与进程的开销

java - 如何使用 RxJava 1.x 向调用者抛出异常,而不是处理它?