java - Reactor Schedulers 在主线程完成后继续运行很长时间?如何处理?

标签 java project-reactor

我有一个关于如何在使用 Reactor 3 时清理调度程序工作线程的问题

Flux.range(1, 10000)
.publishOn(Schedulers.newElastic("Y"))
.doOnComplete(() -> { 
    // WHAT should one do to ensure the worker threads are cleaned up
    logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));

当我执行上面的代码时,我看到的是,一旦主线程完成运行,工作线程仍处于 WAITING 状态一段时间。

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)

有没有办法控制它?即它们可以被处理 onComplete() 吗?我试过 Schedulers.shutdownNow() 但它没有帮助。

另一方面,当我这样做时,我能够控制调度程序的处置。 哪种是首选/提倡的方式?

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
        Flux.range(1, 10000)
        .concatWith(Flux.empty())
        .publishOn(s)
        .doOnComplete(() -> {           
            s.dispose();
            logger.info("Shut down all Scheduler worker threads");
        })
        .subscribe(x -> logger.debug(x+ "**"));

最佳答案

如果您使用 Schedulers.new[Elastic|...] ,那么您有责任跟踪结果 Scheduler如果你想关闭它。 Schedulers.shutdownNow()只会在您不明确时关闭库使用的默认调度程序,例如 Schedulers.elastic() (注意没有 new 前缀)。

在所有操作运行后清理的最佳方法是使用 doFinally .这将在 onError 之后异步执行清理回调| onComplete | cancel事件。最好确保它是链中的最后一个运算符,尽管它会尝试在所有情况下真正最后执行。

唯一需要注意的是,它与之前的运算符在同一线程中运行,换句话说,您正试图关闭的线程... s.dispose()doFinally回调将在执行器的任务队列处理完毕后关闭执行器,因此在这种情况下,线程消失之前会有轻微的延迟。

这是一个转储线程信息、切换到自定义弹性线程并在 doFinally 中将其关闭的示例(添加过滤器和物化以提供更短的日志,以便更好地了解事件是如何进行的):

@Test
public void schedulerFinallyShutdown() throws InterruptedException {
    ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
    Logger logger = Loggers.getLogger("foo");
    CountDownLatch latch = new CountDownLatch(1);
    reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
    Flux.range(1, 10000)
        .publishOn(s)
        .concatWith(Flux.<Integer>empty().doOnComplete(() -> {
            for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
                System.out.println("last element\t" + ti.getThreadName() + " " + ti.getThreadState());
            }
        }))
        .doFinally(sig -> {
            s.dispose();
            logger.info("Shut down all Scheduler worker threads");
            latch.countDown();
        })
        .filter(x -> x % 1000 == 0)
        .materialize()
        .subscribe(x -> logger.info(x+ "**"));

    latch.await();
    for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
        System.out.println("after cleanup\t" + ti.getThreadName() + " " + ti.getThreadState());
    }
} 

打印出来:

11:24:36.608 [X-2] INFO  foo - onNext(1000)**
11:24:36.611 [X-2] INFO  foo - onNext(2000)**
11:24:36.611 [X-2] INFO  foo - onNext(3000)**
11:24:36.612 [X-2] INFO  foo - onNext(4000)**
11:24:36.612 [X-2] INFO  foo - onNext(5000)**
11:24:36.612 [X-2] INFO  foo - onNext(6000)**
11:24:36.612 [X-2] INFO  foo - onNext(7000)**
11:24:36.613 [X-2] INFO  foo - onNext(8000)**
11:24:36.613 [X-2] INFO  foo - onNext(9000)**
11:24:36.613 [X-2] INFO  foo - onNext(10000)**
last element    X-2 RUNNABLE
last element    elastic-evictor-1 TIMED_WAITING
last element    Monitor Ctrl-Break RUNNABLE
last element    Signal Dispatcher RUNNABLE
last element    Finalizer WAITING
last element    Reference Handler WAITING
last element    main WAITING
11:24:36.626 [X-2] INFO  foo - onComplete()**
11:24:36.627 [X-2] INFO  foo - Shut down all Scheduler worker threads
after cleanup   Monitor Ctrl-Break RUNNABLE
after cleanup   Signal Dispatcher RUNNABLE
after cleanup   Finalizer WAITING
after cleanup   Reference Handler WAITING
after cleanup   main RUNNABLE

关于java - Reactor Schedulers 在主线程完成后继续运行很长时间?如何处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47914755/

相关文章:

java - 如何在 Spring Webflux Java 中记录请求正文

javax.net.ssl.SSLHandshakeException : Handshake failed on Android 5. 0.0 禁用 SSLv2 和 SSlv3(仅限 TLS)(及更高版本)

java - 如何从集合中获取元素?

java - Jython 打印所有终端输出/将输出分配为字符串

java - 无法定位空指针异常的起源

java - WebTestClient 返回线程不支持的 IllegalStateException : block()/blockFirst()/blockLast() are blocking,

java - 如何处理 Project Reactor 中处理器订阅中引发的异常

java - 如何获取 Flux 的最后一项而不用 reduce() 或 last() 折叠它

java - 在 mule munit 中加载属性

java - 缺少 Spring Reactive Web 应用程序 POST 请求正文