java - react 堆项目 : Do I need a Processor?

标签 java reactive-programming project-reactor reactor reactive-streams

我正在尝试在 Reactor 之上设计一个管道框架。

在每个阶段(不考虑第一个和最后一个),我们都有转换对象的任务(即字符串到它的长度或 url 到它的 HTML 内容等)。这是一个例子:

enter image description here

你可以看到中间层有 3 个任务,每个任务将一个 X 对象转换为一个 Y 对象(顺便说一下,它总是一个全连接层)

我的问题/困境: 我的第一个想法是,我只需要 Flux.merge(),然后将其连接到每个订阅者。例如:

Flux<X> source = Flux.merge(x1Flux, x2Flux)  
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)

另一种选择是放置一个处理器(TopicProcessor?)作为中间件(就像在发布-订阅模式中一样)

我不了解哪种解决方案最适合我的问题。逻辑上是一样的,但每种架构的实际含义是什么?

谢谢!

最佳答案

我这里的一般方法是使用 ConnectableFlux 来延迟发布,直到您完成整个管道设置,然后对每个通量调用一次 connect()您已经设置了管道。

可以使用处理器,但我建议尽可能避免使用。

一般要点(未检查语法)类似于:

ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();

ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();

ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();

x1.connect();
x2.connect();
y1.connect();
//...etc.

另请注意,您可能希望使用 concat()mergeSequential() 而不是 merge(),具体取决于您的用例(merge() 会急切地订阅发布者,concat() 不会,mergeSequential() 会按收到的顺序合并,可能会交错值。)

关于java - react 堆项目 : Do I need a Processor?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57181412/

相关文章:

java - Reactor 3 发射极/用户并联

java - RxJava 中的超时

spring - 有没有办法打印出 Flux 中所有操作的链条?

java - 如何从异步值创建千分尺?

swift - 带有 MVVM 和 Action 的 ReactiveSwift

java - 如何解析多个整数

java - TCP 客户端接受对象

java - 如何在默认 Android Studio 抽屉导航中的 Fragments 之间切换

java - Flux 是如何垃圾收集的?

java - ArrayList 'squash' 对其元素的操作