据我了解,
SubscribeOn
运算符指定线程,其中 可观察源应该开始发射。- 如果链上有多个
SubscribeOn
,则第一个会占用 对整个流程的影响。 ObserveOn
可用于随时将线程翻转到下游 一点,每当链中存在ObserveOn
时,它就会改变 下游线程
但是我尝试了 Subject
的示例,我发现 SubscribeOn
在整个链中没有任何效果。
这是我的示例
Subject<String> mSubject = PublishSubject.create();
我喜欢它
mSubject
.map(s -> "String :" + s)
.doOnNext(s -> Log.d(TAG, "Started at Thread :" + Thread.currentThread().getName()))
.flatMap(s -> Observable.just(1))
.map(Object::toString)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Log.d(TAG, "subscribed: " + s + " at " + Thread.currentThread().getName());
});
并从其他地方触发
mSubject.onNext("hello");
在这里你可以看到,我已经给出了 .subscribeOn(Schedulers.io())
,所以我期望
mSubject
.map(s -> "String :" + s)
.doOnNext(s -> Log.d(TAG, "Started at Thread :" + Thread.currentThread().getName()))
.flatMap(s -> Observable.just(1))
.map(Object::toString)
直到这些在调度程序线程中执行。然后我将使用 observeOn
的线程翻转到主线程。但这里是这段代码的日志
D/MainActivity: Started at Thread :main
D/MainActivity: subscribed: 1 at main
为什么它没有在 Scheduler 线程上启动?
我尝试使用普通的Observable
而不使用主题。
Observable.just("Hello")
.map(s -> "String :" + s)
.doOnNext(s -> Log.d(TAG, "Started at Thread :" + Thread.currentThread().getName()))
.flatMap(s -> Observable.just(1))
.map(Object::toString)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Log.d(TAG, "subscribed: " + s + " at " + Thread.currentThread().getName());
});
这是日志
D/MainActivity: Started at Thread :RxCachedThreadScheduler-2
D/MainActivity: subscribed: 1 at main
我可以看到它正在按预期工作!
那么受试者身上发生了什么
最佳答案
来自帖子Using subjects (由官方 RxJava Subject 文档链接),它指出:
By default, subjects do not perform any synchronization across threads. They do not take a scheduler but rather assume that all serialization and grammatical correctness are handled by the caller of the subject.
因此,根据我的最佳理解,这意味着所使用的线程是调用 onNext()
的代码之一,稍后被发送到观察到的线程。
关于java - SubscibeOn 对主题没有影响,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48002677/