java - RxJava 调度程序不会通过 sleep 更改线程

标签 java multithreading rx-java

我面临着非常奇怪的 RxJava 行为,我无法理解。

假设我想并行处理元素。我正在使用 flatMap 来实现:

public static void log(String msg) {
    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("%s - %s", threadName, msg));
}

public static void sleep(int ms) {
    try {
        Thread.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) throws InterruptedException {

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

    Observable.create(s -> {
        while (true) {
            log("start");
            s.onNext(Math.random());
            sleep(10);
        }
    }).subscribeOn(sA)
            .flatMap(r -> Observable.just(r).subscribeOn(sB))
            .doOnNext(r -> log("process"))
            .subscribe((r) -> log("finish"));
}

输出是相当可预测的:

pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-2 - process
pool-2-thread-2 - finish
pool-1-thread-1 - start
pool-2-thread-3 - process
pool-2-thread-3 - finish

好吧,但是如果我在 flatMap 并行化调度程序停止更改线程后将 n > 10 的 sleep 添加到映射中。

public static void main(String[] args) throws InterruptedException {

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

    Observable.create(s -> {
        while (true) {
            log("start");
            s.onNext(Math.random());
            sleep(10);
        }
    }).subscribeOn(sA)
            .flatMap(r -> Observable.just(r).subscribeOn(sB))
            .doOnNext(r -> sleep(15))
            .doOnNext(r -> log("process"))
            .subscribe((r) -> log("finish"));
}

给出以下内容:

pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-1-thread-1 - start
pool-2-thread-1 - process
pool-2-thread-1 - finish
pool-1-thread-1 - start
pool-2-thread-1 - process

为什么???为什么在 flatMap 之后所有元素都在同一线程(pool-2-thread-1)中处理?

最佳答案

FlatMap 将所有并行任务序列化回单个线程,并且您正在查看该线程。试试这个吧

public static void main(String[] args) throws InterruptedException {

Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));

Observable.create(s -> {
    while (!s.isUnsubscribed()) {
        log("start");
        s.onNext(Math.random());
        sleep(10);
    }
}).subscribeOn(sA)
        .flatMap(r -> 
            Observable.just(r)
            .subscribeOn(sB)
            .doOnNext(r -> sleep(15))
            .doOnNext(r -> log("process"))
        )
        .subscribe((r) -> log("finish"));
}

关于java - RxJava 调度程序不会通过 sleep 更改线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40559085/

相关文章:

java - Jackson:避免尾随逗号

java - BlockingQueues 和线程访问顺序

java - 简单来说,什么是工厂?

java - 在没有 Spring 的情况下在 aop.xml 中组合 AOP 切入点

java - 我的程序想要继续从命令行获取输入

python - 在单线程 Python 脚本中有一个控制台

java - RxJava SerializedObserver 实现

java - RxJava : OnNext Unsubscribe is not working

java - RxJava中如何同步异步方法? RxJava 中的异步瀑布

java - Log4j 在写入文件时跳过内容