android - 如何管理 RxJava 中需要的 Looper 线程

标签 android rx-java rx-android

我想在 Observable 中封装一个查询 ContentProvider 并订阅 ContentProvider 游标以提供持续更新的逻辑。

由于可观察对象执行 IO 工作,我需要在 Schedulers.io() 中订阅它。问题是我无法注册 ContentObserver,因为它需要一个准备好循环程序的线程。

管理它并将其封装到单个Observable中的推荐方法是什么。

代码说明:

public Observable<Integer> unreadCountObservable() {
    return Observable.create(subscriber -> {
        new UnreadCountObservable(subscriber);
    });
}

private class UnreadCountObservable {
    private Subscriber subscriber;

    public UnreadCountObservable(Subscriber subscriber) {
        this.subscriber = subscriber;
        Cursor cursor = queryUnread(subscriber);
        cursor.registerContentObserver(observer);
        subscriber.add(Subscriptions.create(() -> {
            cursor.unregisterContentObserver(observer);
            cursor.close();
        }));
    }

    @NonNull
    private Cursor queryUnread(Subscriber subscriber) {
        Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
        if(cursor.moveToNext()) {
            Integer count = cursor.getInt(0);
            subscriber.onNext(count);
        } else {
            subscriber.onNext(0);
        }
        return cursor;
    }

    private ContentObserver observer = new ContentObserver(new Handler()) {
        @Override
        public boolean deliverSelfNotifications() {
            return false;
        }

        @Override
        public void onChange(boolean selfChange) {
            Timber.d("New sms data changed");
            queryUnread(subscriber);
        }
    };
}

注意 1 上面代码的问题在于,由于 registerObserver,它不能用 .subscribeOn(Schedulers.io() 调用,如果它是将其称为 mainThread,然后查询也在其上运行)

注意:将所有内容封装在一个Observable中是一个关键要求和这个问题的动机

我现在最好的想法是为我使用 Observable 的 Activity 创建一个 HandlerThread,并使用该线程中的循环程序。但是想知道是否有更好的选择,以及制作通用调度程序(例如 looperIoScheduler())是否有意义可能会导致问题。

最佳答案

Observable 链中,您可以根据需要随时更改线程。看看here .

函数 rx.Observable#observeOn(rx.Scheduler) 可以在链内的任何地方。尝试做这样的事情(伪代码):

Observable.just(cursor)
        .observeOn(AndroidSchedulers.mainThread())
        .map((Cursor) -> {
                cursor.registerContentObserver(observer);
                return cursor;
            }
        }).observeOn(Schedulers.io());
subscriber.add(Subscriptions.create(() -> {
        cursor.unregisterContentObserver(observer);
        cursor.close();
    }));

关于android - 如何管理 RxJava 中需要的 Looper 线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39449263/

相关文章:

javascript - WebView :webpage design changed in Android

android - 为什么我的 RxJava 设置阻塞了我的 UI 线程?使用BluetoothAdapter.startLeScan回调

reactive-programming - RxJava : Can you give me a real live scenario to use flatMap instead of map

rx-java - 如何在RxJava中将Observable子类化?

android - 为什么在此代码中不调用 OnComplete? (RxAndroid)

android - RxJava 中 AndroidSchedulers.mainThread() 的替代方案是什么?

java - Android - 如何上传视频/图像到 PHP 服务器

java - Android 上的 TextView 在任一方向上仅显示两行文本

java - Android:将 CheckBox 添加到 ListAdapter

java - Observable 仍在 UI 线程上运行