我正在使用 Java9 响应式(Reactive)流和 RxJava2 进行测试。我对两者都没有真正的偏好,但我正在寻找一些关于这是否可能的指导。
我正在创建预先配置的订阅者数量,如下所示:
for(int i = 0; i<MAX_SUBSCRIBERS; i++) { System.out.println("Creating subscriber: " + i); publisher.subscribe(new MySubscriber<>(i + "-subscriber")); }
我正在从目录中读取文件列表,以便并发上传到某些第三方系统。
Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files")); paths .filter((Files::isRegularFile)) .forEach(pathName -> publisher.submit(pathName.toString()));
我收到以下输出:
0-subscriber: /my/dir/with/files/test0.txt received in onNext
0-subscriber: /my/dir/with/files/test1.txt received in onNext
1-subscriber: /my/dir/with/files/test0.txt received in onNext
1-subscriber: /my/dir/with/files/test1.txt received in onNext
理想情况下,我们应该看到以下行为。每个订阅者都应该在唯一的文件上执行工作。
0-subscriber: /my/dir/with/files/test0.txt received in onNext
1-subscriber: /my/dir/with/files/test1.txt received in onNext
这可能吗?任何提示都会很棒!
最佳答案
Java 9 Flow API 由 4 个接口(interface)和 SubmissionPublisher
类组成,该类将每个提交的值分派(dispatch)给其所有订阅者
。目前没有 JDK 工具支持您的数据流。
相比之下,RxJava 是一个丰富的流畅库,拥有数百个运算符,您可以在其中执行并行处理而无需重复:
ParallelFlowable<Path> pf =
Flowable.<Path, Stream<Path>>using(
() -> Files.list(Paths.get("/my/dir/with/files")),
files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
AutoCloseable::close
)
.parallel(2)
.runOn(Schedulers.computation())
.filter(Files::isRegularFile);
pf.subscribe(new Subscriber[] {
new MySubscriber<>("0-subscriber"),
new MySubscriber<>("1-subscriber"),
});
关于rx-java - 单流和多个订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48160005/