我正在将 AsyncTaskLoader
迁移到 RxJava,试图了解有关 RxJava 并发方法的所有细节。简单的事情运行正常,但是我在处理以下代码时遇到了困难:
这是执行的顶级方法:
mCompositeDisposable.add(mDataRepository
.getStuff()
.subscribeOn(mSchedulerProvider.io())
.subscribeWith(...)
mDataRepository.getStuff() 看起来像这样:
public Observable<StuffResult> getStuff() {
return mDataManager
.listStuff()
.flatMap(stuff -> Observable.just(new StuffResult(stuff)))
.onErrorReturn(throwable -> new StuffResult(null));
最后一层:
public Observable<Stuff> listStuff() {
Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName());
String sql = <...>;
return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql).mapToList(mStuffMapper);
}
因此,使用上面的代码,log
将打印出 .listStuff() - main
,这不完全是我正在寻找的内容。我不太确定为什么。我的印象是,通过设置 subscribeOn
,从链中拉出的每个事件都将在 subscribeOn 方法中指定的线程上处理。
我认为正在发生的事情是,在到达 mBriteDatabase
之前,源代码(又名最终层)不是来自 RxJava 世界,因此在 createQuery< 之前不是一个事件
被调用。所以我可能需要某种 wrapper ?我尝试过应用 .fromCallable
,但是这是非 Rx 代码的包装器,并且我的数据库层返回一个可观察的...
最佳答案
您的 Log.d
调用发生
- 当
listStuff
被调用时立即 - 在调用 getStuff 后立即执行
- 这是您向我们展示的顶级代码 fragment 中发生的第一件事。
如果您需要在订阅发生时执行此操作,则需要明确:
public Observable<Stuff> listStuff() {
String sql = <...>;
return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql)
.mapToList(mStuffMapper)
.doOnsubscribe(() -> Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName()));
}
关于java - 即使指定了 subscribeOn,代码也在主线程上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42279647/