我的这个测试基于 4.5.2. The subscribeOn Method (忽略缺少的 new Thread(...)
部分):
@Test
public void theSubscribeOnMethod() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.doOnSubscribe(sub -> System.out.println(
"[doOnSubscribe] " + Thread.currentThread().getName()))
.map(i -> Thread.currentThread().getName() + ", value " + i);
flux.subscribe(System.out::println);
Thread.sleep(1000);
}
打印此内容(注意
main
线程 [doOnSubscribe]
):[doOnSubscribe] main
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12
刚搬家
subscribeOn
后 doOnSubscribe
产生这个结果(从我的角度来看是正确的):[doOnSubscribe] parallel-scheduler-1
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12
根据Flux.subscribeOn文档:
Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.
所以在这个例子中我期望
doOnSubscribe
发生在 parallel-scheduler
线程而不是 main
尽管我放的地方subscribeOn
.这是一个错误吗?为什么会发生这种情况,我该如何解决?
PS:使用
reactor-core:3.1.0-RELEASE
最佳答案
这不是一个错误。这是预期的行为。这是事件的顺序,乍一看可能会令人困惑。
我稍微修改了你的例子:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.doOnNext(value -> System.out.println(
"[doOnNext] " + Thread.currentThread().getName() + " value: " + value))
.doOnSubscribe(sub -> System.out.println(
"[doOnSubscribe] " + Thread.currentThread().getName()));
flux.subscribe();
Thread.sleep(1000);
输出:
[doOnSubscribe] main
[doOnNext] parallel-scheduler-1 value: 11
[doOnNext] parallel-scheduler-1 value: 12
当您进行命令式思考时,您会期望运算符按声明顺序被调用,因此在另一个之下的一个将在以后被调用。这通常也适用于函数式/响应式(Reactive)编程,但情况并非总是如此。
在上面的例子中
doOnSubscribe
实际上在管道中的其他任何东西之前运行。运营商首先收到有关订阅的通知,然后将订阅转发给其上游运营商,后者将其转发给其上游,依此类推……所以事件发生的顺序是:
flux.subscribe()
订阅管道并触发主线程上的执行 doOnSubscribe
获取有关订阅的通知并在主线程上触发提供的使用者,然后将订阅转发到其上游 doOnNext
收到有关订阅的通知,但它只对 onNext
感兴趣事件,因此除了将订阅转发到其上游(在主线程上)之外,它还没有做任何事情(!)subscribeOn(s)
- 上下文切换发生,此后的每个事件都在提供的调度程序的上下文中发生 map
然后 range
, 都在平行线程 range
是源,所以它发出第一项:onNext(1)
平行线程 map
在 onNext
上触发事件,进行转换并将转换后的元素作为 onNext
发送并行线程上的事件 subscribeOn
除了转发 onNext
之外什么都不做 Activity doOnNext
消费者被触发,因为它收到了 onNext
事件,仍在并行线程上 doOnSubscribe
忽略 onNext
事件,只是将事件转发给下一个运算符(operator) onNext(2)
重复了 6-10 个事件项目 TLDR :订阅事件从底部操作符传播到顶部操作符(
doOnSubscribe
和 susbcribeOn
在此传播过程中按此顺序触发),之后实际数据从顶部源操作符(range
)反向流动) 至底部。自
doOnSubscribe
之前运行 susbcribeOn
,它仍然会在主线程上运行。
关于java - 为什么 subscribeOn 方法不切换上下文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59553733/