我想知道 spring-xd 如何处理流中的处理器。我真正想知道的是处理器是否阻塞代码,或者它们与 react 器(https://github.com/reactor/reactor/wiki/Processor)如何处理处理器有关。
如果我需要执行昂贵的阻塞操作(也称为调用外部系统),最好的方法是什么?我很乐意为此使用 react 堆或任何其他响应式(Reactive)框架,但如何在 XD 管道架构中做到这一点?
问候
最佳答案
Spring XD 流中的术语处理器具有特定含义 - 它基本上是从名为 input 的 channel 到名为 的 channel 的 Spring Integration 消息流输出。按照惯例,这些 channel 是生成和消耗 XD 流中的有效负载的 channel 。例如,如果流 mystream 定义为 someSource |一些处理器 | someSink
,处理器模块可能会异步执行昂贵的操作,但流仍然必须等待处理器输出 channel 上的消息,因此您不会看到吞吐量的提高。
但是,在某些情况下,实现接收器异步运行会有所帮助。在这种情况下,当消息到达接收器的输入 channel 时,流不会阻塞。异步接收器(有点像有一个环)可以连接到流上的水龙头:
mystream = someSource | ... | someSink
mytap = tap:stream:mystream > asyncSink
或命名队列(或主题):
mystream = someSource | ... | > queue:myQueue
queue:myQueue > asyncSink
或者它可能是主流的接收器。
要实现异步接收器,需要配置 Spring Integration 端点,例如,用于调用外部服务的 ServiceActivator,以及接收器模块内的轮询器和任务执行器。端点轮询可轮询 channel (例如,输入 channel 本身可能被声明为队列 channel )。请参阅http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html了解详情。
关于reactive-programming - 流 |处理器和线程模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22214110/