rx-java - 单流和多个订阅者

标签 rx-java reactive-programming rx-java2 java-9 reactive-streams

我正在使用 Java9 响应式(Reactive)流和 RxJava2 进行测试。我对两者都没有真正的偏好,但我正在寻找一些关于这是否可能的指导。

  1. 我正在创建预先配置的订阅者数量,如下所示:

    for(int i = 0; i<MAX_SUBSCRIBERS; i++) {  
         System.out.println("Creating subscriber: " + i);  
         publisher.subscribe(new MySubscriber<>(i + "-subscriber"));   
    }
    
  2. 我正在从目录中读取文件列表,以便并发上传到某些第三方系统。

    Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
    paths
    .filter((Files::isRegularFile))
    .forEach(pathName -> publisher.submit(pathName.toString()));
    

我收到以下输出:

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    0-subscriber: /my/dir/with/files/test1.txt received in onNext
    1-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

理想情况下,我们应该看到以下行为。每个订阅者都应该在唯一的文件上执行工作。

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

这可能吗?任何提示都会很棒!

最佳答案

Java 9 Flow API 由 4 个接口(interface)和 SubmissionPublisher 类组成,该类将每个提交的值分派(dispatch)给其所有订阅者。目前没有 JDK 工具支持您的数据流。

相比之下,RxJava 是一个丰富的流畅库,拥有数百个运算符,您可以在其中执行并行处理而无需重复:

    ParallelFlowable<Path> pf = 
            Flowable.<Path, Stream<Path>>using(
                () -> Files.list(Paths.get("/my/dir/with/files")),
                files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
                AutoCloseable::close
            )
            .parallel(2)
            .runOn(Schedulers.computation())
            .filter(Files::isRegularFile);

pf.subscribe(new Subscriber[] {
    new MySubscriber<>("0-subscriber"),
    new MySubscriber<>("1-subscriber"),
});

关于rx-java - 单流和多个订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48160005/

相关文章:

java - 使用 RxJava 与 WebSockets 进行请求响应层

java - 超时如何在 Hystrix 中与 Observables 一起工作?

android - RxJava : How to wait for all subscriptions to complete?

javascript - Rx 如何在 web 上真正工作(客户端)

android - Rxjava 2 相机异常

java - 如何使用 RxJava/RxAndroid 在 OnNext 中抛出异常

java - 端到端响应式(Reactive)流式处理 RESTful 服务

r - Shiny :renderUI react 性问题

android - 向 CompositeDisposable 添加大量 Disposable 的潜在危害

java - RxJava 与用户创建的 Flowable 合并只发出最后一个 Flowable