我是 Reactive 编程的新手,有很多问题。 我认为这不是缺少示例或文档,而是我的理解有误。
我正在尝试模拟慢速订阅者;
代码示例如下
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(MILLIS);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next(it);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(MILLIS + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
系统输出是
Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]
我想如果订阅者很慢,我应该看到更多的线程,因为 Schedulers.elastic()
我还尝试制作 publishOn()
,看起来我让它异步了,但仍然无法在多个线程中处理结果。
感谢评论和回答。
最佳答案
如果你想让它在不同的线程中运行,你需要像这样使用 .parallel() 并且发射将在不同的线程中进行
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(100);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.parallel()
.runOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(100 + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
;
}
关于java - Reactor 3 发射极/用户并联,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54802472/