java - Reactor 3 发射极/用户并联

标签 java reactive-programming project-reactor

我是 Reactive 编程的新手,有很多问题。 我认为这不是缺少示例或文档,而是我的理解有误。

我正在尝试模拟慢速订阅者;

代码示例如下

Flux.create(sink -> {
    int i = 0;
    while (true) {
        try {
            System.out.println("Sleep for " + MILLIS);
            Thread.sleep(MILLIS);
            int it = i++;
            System.out.println("Back to work, iterator " + it);
            sink.next(it);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
    try {
        System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
        Thread.sleep(MILLIS + 4000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

系统输出是

Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]

我想如果订阅者很慢,我应该看到更多的线程,因为 Schedulers.elastic()

我还尝试制作 publishOn(),看起来我让它异步了,但仍然无法在多个线程中处理结果。

感谢评论和回答。

最佳答案

如果你想让它在不同的线程中运行,你需要像这样使用 .parallel() 并且发射将在不同的线程中进行

Flux.create(sink -> {
        int i = 0;
        while (true) {
            try {
                System.out.println("Sleep for " + MILLIS);
                Thread.sleep(100);
                int it = i++;
                System.out.println("Back to work, iterator " + it);
                sink.next("a");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    })

            .parallel()
            .runOn(Schedulers.elastic())

            .subscribe(x -> {
                try {
                    System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
                    Thread.sleep(100 + 4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
    ;
}

关于java - Reactor 3 发射极/用户并联,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54802472/

相关文章:

java - 如何在运行时知道占用的堆大小

java - Spring Boot 应用程序正在从 JUnit 测试启动,但无法通过 URL 访问

reactjs - 后端有响应式编程的框架吗?

java - 添加新组件时(每当按下按钮时)是否有更简单的方法将 'older' 组件向下移动?

java - 修复 sdkmanager java.lang.NoClassDefFoundError 批处理文件

javascript - 为什么当 react 变量改变值时这个函数不运行?

ios - ReactiveCocoa : Chain a signal with a repeating signal

spring-boot - 如何使用 netty-reactor 从阻塞调度程序切换回以前的调度程序?

java - 如何保留过滤掉的 Flux 元素

java - 在返回通量数据库实体之前运行异步任务