android - 在 Rxjava 上管理线程

标签 android multithreading redux rx-java state-management

在 Jake Wharton 的演讲之后,我正在尝试实现 Redux 模式来管理状态:http://jakewharton.com/the-state-of-managing-state-with-rxjava/ 我希望流中的所有内容都在后台线程上运行并在 AndroidMainThread 上接收输出。但是在当前设置下,我的订阅者抛出一个异常,即我正在 AndroidMainThread 以外的另一个线程上操作 UI。提前致谢。

Observable events = Observable.merge(Observable.just(new GetUsersEvent()/*, other event streams*/));
        events.compose(mergeEvents(GetUsersEvent.class, /* other events */))
                .compose(events -> events.map(event -> {
                    BaseAction action = null;
                    if (event instanceof GetUsersEvent)
                        action = new GetUsersAction();
                    return action;
                })
                        .compose(actions -> actions.flatMap(action -> Observable.just(action)
                                .flatMap(action -> {
                                    Observable result = Observable.empty();
                                    if (action instanceof GetUsersAction)
                                        result = userListVM.getUsers()
                                                .subscribeOn(AndroidSchedulers.from(handlerThread.getLooper())); // I am using a handler thread to receive live updates from realm
                                    return result;
                                })
                                .map(Result::successResult)
                                .onErrorReturn(Result::errorResult)
                                .startWith(Result.IN_FLIGHT)))
                        .scan(initialState, (currentUIModel, result) -> {
                            if (result.isLoading())
                                currentUIModel = UIModel.loadingState(bundle);
                            else if (result.isSuccessful())
                                currentUIModel = UIModel.successState(result);
                            else currentUIModel = UIModel.errorState(result.getError());
                            return currentUIModel;
                        })
                        .observeOn(AndroidSchedulers.mainThread()))
                .subscribe(o -> {/* update UI */}, OnErrorNotImplementedException::new);

mergeEvents 转换器:

public Observable.Transformer<BaseEvent, BaseEvent> mergeEvents(Class... classes) {
        return events -> events.subscribeOn(Schedulers.io())
                .publish(shared -> {
                    List<Class> classList = Arrays.asList(classes);
                    for (int i = 0, size = classList.size(); i < size; i++)
                        shared = shared.mergeWith(shared.ofType(classList.get(i)));
                    return shared;
                });
    }

堆栈跟踪:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
    at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
       at android.os.Handler.handleCallback(Handler.java:746)
       at android.os.Handler.dispatchMessage(Handler.java:95)
       at android.os.Looper.loop(Looper.java:148)
       at android.app.ActivityThread.main(ActivityThread.java:5443)
       at java.lang.reflect.Method.invoke(Native Method)
       at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728)
       at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)
    Caused by: rx.exceptions.OnErrorNotImplementedException: Expected to be called on the main thread but was RxIoScheduler-2
       at com.zeyad.usecases.app.components.mvvm.BaseSubscriber.onError(BaseSubscriber.java:36)
       at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:153)
       at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
       at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
       at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
       at rx.internal.operators.OperatorTakeUntil$1.onError(OperatorTakeUntil.java:50)
       at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:273)
       at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216)
       at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
       at android.os.Handler.handleCallback(Handler.java:746) 
       at android.os.Handler.dispatchMessage(Handler.java:95) 
       at android.os.Looper.loop(Looper.java:148) 
       at android.app.ActivityThread.main(ActivityThread.java:5443) 
       at java.lang.reflect.Method.invoke(Native Method) 
       at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728) 
       at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618) 
    Caused by: java.lang.IllegalStateException: Expected to be called on the main thread but was RxIoScheduler-2
       at rx.android.MainThreadSubscription.verifyMainThread(MainThreadSubscription.java:58)
       at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:19)
       at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:10)
       at rx.Observable.unsafeSubscribe(Observable.java:10346)
       at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
       at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
       at rx.Observable.unsafeSubscribe(Observable.java:10346)
       at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
       at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
       at rx.Observable.unsafeSubscribe(Observable.java:10346)
       at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
       at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
       at rx.Observable.unsafeSubscribe(Observable.java:10346)
       at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
       at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
       at rx.Observable.unsafeSubscribe(Observable.java:10346)
       at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
       at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
       at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.fastPath(OnSubscribeFromArray.java:76)
       at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:58)
       at rx.Subscriber.setProducer(Subscriber.java:211)
       at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
       at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
       at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscr

最佳答案

I want everything in the stream to run on a background thread and receive the output on the AndroidMainThread

您不能在后台线程中拥有流中的所有内容,因为您的事件源是必须在主线程中注册的 UI 事件。

您在 mergeEvents 转换器中应用 IO Scheduler,这意味着您的所有 UI 事件都将在 IO 线程中订阅。

可以在日志中看到源码:

 Caused by: java.lang.IllegalStateException: Expected to be called on the main thread but was RxIoScheduler-2
   at rx.android.MainThreadSubscription.verifyMainThread(MainThreadSubscription.java:58)
   at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:19)
   at com.jakewharton.rxbinding.support.v7.widget.RecyclerViewScrollEventOnSubscribe.call(RecyclerViewScrollEventOnSubscribe.java:10)

您正在尝试订阅不在主线程的 RecyclerView 滚动事件。
您可以简单地通过订阅 mainThread 的 mergeEvents 来修复它,然后将 observeOn 放在您选择的某个 bg 线程,这样流的​​其余部分将在 UI 之外处理线程。

关于android - 在 Rxjava 上管理线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43621851/

相关文章:

reactjs - 模拟在另一个函数中使用的函数

android - Flutter redux StoreConnector vs StoreBuilder

Android Emulator CONNECT 隧道缺少预期的 header

c# - 在 Thread.Sleep() 期间保持 UI 响应

objective-c - 如何允许用户在不暂停应用程序更新循环的情况下调整 NSSlider?

java - java中单CPU的线程调度程序?

unit-testing - Angular2 + karma test + redux - 进程未定义

Android textview 不支持换行

android - MPAndroidChart 设置当前可见的 X 轴

Android ndk std::to_string 支持