reactive-programming - 流 |处理器和线程模型

标签 reactive-programming spring-xd

我想知道 spring-xd 如何处理流中的处理器。我真正想知道的是处理器是否阻塞代码,或者它们与 react 器(https://github.com/reactor/reactor/wiki/Processor)如何处理处理器有关。

如果我需要执行昂贵的阻塞操作(也称为调用外部系统),最好的方法是什么?我很乐意为此使用 react 堆或任何其他响应式(Reactive)框架,但如何在 XD 管道架构中做到这一点?

问候

最佳答案

Spring XD 流中的术语处理器具有特定含义 - 它基本上是从名为 input 的 channel 到名为 的 channel 的 Spring Integration 消息流输出。按照惯例,这些 channel 是生成和消耗 XD 流中的有效负载的 channel 。例如,如果流 mystream 定义为 someSource |一些处理器 | someSink,处理器模块可能会异步执行昂贵的操作,但流仍然必须等待处理器输出 channel 上的消息,因此您不会看到吞吐量的提高。

但是,在某些情况下,实现接收器异步运行会有所帮助。在这种情况下,当消息到达接收器的输入 channel 时,流不会阻塞。异步接收器(有点像有一个环)可以连接到流上的水龙头:

mystream = someSource | ... | someSink
mytap =  tap:stream:mystream > asyncSink  

或命名队列(或主题):

 mystream = someSource | ... | > queue:myQueue
 queue:myQueue > asyncSink

或者它可能是主流的接收器。

要实现异步接收器,需要配置 Spring Integration 端点,例如,用于调用外部服务的 ServiceActivator,以及接收器模块内的轮询器和任务执行器。端点轮询可轮询 channel (例如,输入 channel 本身可能被声明为队列 channel )。请参阅http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html了解详情。

关于reactive-programming - 流 |处理器和线程模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22214110/

相关文章:

c# - 在 Reactive Extensions 中按预定义的顺序对 Observable 进行排序

reactive-programming - 如何编写带有动态 throttle 的供应?

java - 部署自定义 Spring XD 处理器时出错

spring - Spring-xd Stream正在将空文件写入我的HDFS

hadoop - Spring-xd 容器(1.3.1) 与 Spark 1.6.1+ Hadoop 2.7.2 的兼容性

java - spring-XD 不读取 logback.xml

java - CompletionStage、CompletableFuture Void - 返回什么?

java - 缓存和使缓存的 Mono 失效

javascript - 在 rxjs Observable 中抛出错误

spring xd 流定义动态参数