java - RxJava 调度程序始终与 sleep 在同一线程中工作

标签 java multithreading rx-java reactive-programming rx-java2

我尝试在不同的线程上运行每个计算,但无论我使用什么调度程序,它总是在单线程上运行。

PublishProcessor processor = PublishProcessor.create();

    processor
        .doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
        .observeOn(Schedulers.newThread()).subscribe(i -> {
            System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
            Thread.currentThread().sleep(5000);
        });
    processor.onNext(2);
    processor.onNext(3);
    processor.onNext(4);
    processor.onNext(5);
    processor.onNext(6);


    while (true) {}

输出将是:

2 emitted on 1
3 emitted on 1
4 emitted on 1
5 emitted on 1
6 emitted on 1
2 received on 13
3 received on 13
4 received on 13
5 received on 13
6 received on 13

线程 13 仅在 hibernate 后处理下一个值,但在这种情况下我希望有几个单独的 hibernate 线程。 有人可以解释一下我做错了什么吗?

最佳答案

.observeOn(...) 通过将项目流更改为另一个线程来生效,但它始终是同一个线程。

如果您想为每个项目创建一个新线程,您可以这样做

processor
    .doOnNext(i ->System.out.println(i.toString()+" emitted on "+Thread.currentThread().getId()))
    .flatMap(item -> Observable.just(item)
                         .subscribeOn(Schedulers.newThread()))    // make every item change to a new thread
    .subscribe(i -> {
        System.out.println(i.toString()+" received on "+Thread.currentThread().getId());
        Thread.currentThread().sleep(5000);
    });

关于java - RxJava 调度程序始终与 sleep 在同一线程中工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44308409/

相关文章:

python - 使用 pyudev 的监视器终止 USBdetector 线程

java - 如何使用 switchIfEmpty RxJava

android - RxAndroid 可观察循环

java - Hadoop - 使用 Java 将 reducer 输出合并到单个文件

java - 如何在android中创建android库项目?

Java多线程应用程序不运行方法

android - SmsManager:从 BroadcastReceiver 获取消息接收者电话号码

java - 在 Redis 上放置 Java 对象 : is JavaNode a correct approach?

java - 我可以修改 Java Swing JTextArea 使其看起来像 JPasswordField 吗?

c - 带 sleep 的 C 线程