java - 创建 Observables 树并添加订阅者作为叶子

标签 java rx-java reactive-programming

我有以下问题:

我正在构建一个异步过滤消息的应用程序。我想构建这样的东西,使用 Observables 作为过滤器(使用 .filter()):

输入--过滤器1--过滤器2--订阅者1
|__filter3--filter4--subscriber2
|__filter5--filter6--subscriber3
|__filter7--订阅者4

输入将是一个链接列表,因此我可以使用 Observable.from() 创建可观察链。然后我想创建一棵可观察量树,如上图所示。我遇到的问题是我不知道如何将信息广播到下一个节点,例如:什么通过filter5,发送到filter6和filter7。这应该发生在所有可观察量上,直到到达树的末尾,并且我可以拥有消耗流的订阅者。

非常感谢!

最佳答案

我不确定你的意思,但你可以简单地在树结构中创建和链接 Observables:

Observable input = ...

Observable o1 = input.filter(f1).filter(f2);
Observable o2 = input.filter(f3).filter(f4);
Observable o3 = input.filter(f5);
Observable o4 = o3.filter(f6);
Observable o5 = o3.filter(f7);

如果您不想多次使用输入,您可以发布它:

ConnectableObservable input = Observable.from(list).publish();

// ... chain as before

input.connect();

关于java - 创建 Observables 树并添加订阅者作为叶子,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37546448/

相关文章:

java - 我应该在 GetIntArrayElements 之后调用 ReleaseIntArrayElements 吗?

java - Swagger + Spring Boot,如何基于Spring Boot Validation设置API中要传递的示例值

java - Java 存储 native 对象的位置

java - CompletableFuture、Future 和 RxJava 的 Observable 的区别

javascript - 合并两个 observables,单一输出

java - 如何在 Stream 上重用过滤器和映射的应用程序?

用于解构的 Kotlin 四元组、五元组等

java - 在 RxJava 中什么算作异步边界?

spring - 使用 Spring Webflux 恢复文件下载以及 Spring 中的静态文件服务

ios - 没有订阅者时停止发布,有订阅者时自动开始