我正在尝试使用 Project Reactor 及其 Flux 类运行一个基本示例。源应该创建从 1 到 10 的整数,然后打印出发出的整数。
所有示例都在应用程序的主方法中执行,没有运行其他代码。
运行基础知识相当容易:
Flux.range(1, 10).subscribe(System.out::println);
下一步是在另一个线程中发出整数。这可以通过以下方式实现
Flux.range(1, 10)
.publishOn(Schedulers.newSingle("OtherThread"))
.subscribe(System.out::println);
正如项目引用所述,Schedulers.newSingle("OtherThread")
创建“每次调用专用线程”(请参阅 Project Reactor Reference )。该引用文献解释说,还有一个 Schedulers.single()
,它可以访问“单个可重用线程”的执行上下文,并“为所有调用者重用同一线程”。
由于我在这个示例中仅在一个点使用线程 publishOn(...)
我的理解是,这两种方法 (newSingle(...)
和single()
) 可以互换使用。
Flux.range(1, 10)
.publishOn(Schedulers.single())
.subscribe(System.out::println);
但是最后一个例子没有打印出任何东西。说实话,经过几个小时的搜索和玩耍后我不明白为什么。
我找到了这篇博客文章Flight of the Flux 3 - Hopping Threads and Schedulers其中将 single()
声明为“用于可以在唯一 ExecutorService 上运行的一次性任务”。但它并没有给黑暗带来光明。
正如我经常期望的那样,这个问题有一个简单的答案为什么在这个基本示例中 newSingle(...)
和 single()
的行为不同?这会让我觉得自己很愚蠢。但如果它最终能解决我的困惑,我会非常高兴。
一个有趣的网站注释是,通过引入 log()
,示例打印得像一个魅力
Flux.range(1, 10)
.log()
.publishOn(Schedulers.single())
.subscribe(System.out::println);
<小时/>
更新:根据Martin Tarjányi的回答我创建了一个gist它通过一个小代码片段和解释文本演示了不同的行为。
最佳答案
当您使用 newSingle(String)
创建新的单个调度程序时 by default it creates a new non-daemon thread ,这意味着它将阻止应用程序退出,直到其线程池未关闭。
但是,如果您使用内置的 single()
,它将使用守护线程,即使其工作尚未完成,也不会阻止应用程序退出。这正是您在示例中看到的:主线程通过组装响应式(Reactive)管道来完成工作,并且无论守护程序单线程的状态如何,虚拟机都会退出。
要在两种情况下具有相同的行为,您可以使用 doOnNext()
和 blockLast()
替换订阅:
Flux.range(1, 10)
.publishOn(Schedulers.single())
.doOnNext(System.out::println)
.blockLast();
通常在响应式(Reactive)编程中强烈不鼓励使用 block 。但是,如果您的主线程没有其他事情可做,那么可以在 react 链上调用 block()
。
关于java - 如何在Project Reactor中正确使用Schedulers.single()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61261437/