java - 如何在Project Reactor中正确使用Schedulers.single()?

标签 java multithreading scheduler project-reactor

我正在尝试使用 Project Reactor 及其 Flux 类运行一个基本示例。源应该创建从 1 到 10 的整数,然后打印出发出的整数。

所有示例都在应用程序的主方法中执行,没有运行其他代码。

运行基础知识相当容易:

Flux.range(1, 10).subscribe(System.out::println);

下一步是在另一个线程中发出整数。这可以通过以下方式实现

        Flux.range(1, 10)
                .publishOn(Schedulers.newSingle("OtherThread"))
                .subscribe(System.out::println);

正如项目引用所述,Schedulers.newSingle("OtherThread") 创建“每次调用专用线程”(请参阅​​ Project Reactor Reference )。该引用文献解释说,还有一个 Schedulers.single() ,它可以访问“单个可重用线程”的执行上下文,并“为所有调用者重用同一线程”。

由于我在这个示例中仅在一个点使用线程 publishOn(...) 我的理解是,这两种方法 (newSingle(...)single()) 可以互换使用。

        Flux.range(1, 10)
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);

但是最后一个例子没有打印出任何东西。说实话,经过几个小时的搜索和玩耍后我不明白为什么。

我找到了这篇博客文章Flight of the Flux 3 - Hopping Threads and Schedulers其中将 single() 声明为“用于可以在唯一 ExecutorService 上运行的一次性任务”。但它并没有给黑暗带来光明。

正如我经常期望的那样,这个问题有一个简单的答案为什么在这个基本示例中 newSingle(...)single() 的行为不同?这会让我觉得自己很愚蠢。但如果它最终能解决我的困惑,我会非常高兴。

一个有趣的网站注释是,通过引入 log(),示例打印得像一个魅力

        Flux.range(1, 10)
                .log()
                .publishOn(Schedulers.single())
                .subscribe(System.out::println);
<小时/>

更新:根据Martin Tarjányi的回答我创建了一个gist它通过一个小代码片段和解释文本演示了不同的行为。

最佳答案

当您使用 newSingle(String) 创建新的单个调度程序时 by default it creates a new non-daemon thread ,这意味着它将阻止应用程序退出,直到其线程池未关闭。

但是,如果您使用内置的 single() ,它将使用守护线程,即使其工作尚未完成,也不会阻止应用程序退出。这正是您在示例中看到的:主线程通过组装响应式(Reactive)管道来完成工作,并且无论守护程序单线程的状态如何,虚拟机都会退出。

要在两种情况下具有相同的行为,您可以使用 doOnNext()blockLast() 替换订阅:

Flux.range(1, 10)
    .publishOn(Schedulers.single())
    .doOnNext(System.out::println)
    .blockLast();

通常在响应式(Reactive)编程中强烈不鼓励使用 block 。但是,如果您的主线程没有其他事情可做,那么可以在 react 链上调用 block()

关于java - 如何在Project Reactor中正确使用Schedulers.single()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61261437/

相关文章:

java - Google App Engine在Hello World应用程序中不起作用

java - 是否有任何 Java HTML 解析器生成的节点保留原始文本的索引?

java - 有效的 Java 项目 66 : Why to synchronize both read and write methods?

java - 以定时任务方式运行JAVA程序

Java 作业调度 : Is this possible with Quartz, 如果不是,我的替代方案是什么?

java - 将 JSON 保存到 Sqlite 并显示

java - OpenAM13.0.0--Tomcat自动关机

php - PHP : Delay instructions with CLI or Crontab and NOT Sleep()

c++ - 多个 OpenMP 线程读取(不写入)共享变量的性能成本?

python - 如何在特定日期的特定时间运行 Airflow?