根据我之前的问题,我创建了以下流程:
<int:channel id="output.buffer.channel"/>
<int:chain id="pollingBaseChain" input-channel="pollingInput">
<int:poller ref="pollingInputPoller"/>
<int:gateway id="inputChainGateway" request-channel="input.buffer.channel" reply-channel="output.buffer.channel"/>
<int:service-activator id="outboundRoutingService" ref="outboundMessageRouterService" method="forwardMessage"/>
</int:chain>
<int:chain id="input.chain" input-channel="input.buffer.channel" output-channel="output.buffer.channel">
<int:poller ref="inputPoller"/>
<!-- Various service activators/transfomers -->
<int:splitter id="messageSplitter" ref="messageSplitterSequence" apply-sequence="false"/>
<int:transformer id="outboundEntries" ref="routingService" method="prepareOutboundEntries"/>
</int:chain>
在服务激活器outboundRoutingService
内部,发生的代码很少:
public void forwardMessage(Message message, @Header(value = "nextChannel", required = false) MessageChannel channel) {
logger.info("Received message for routing. Channel is: {}, message is: {}", channel, message);
if(channel != null) {
channel.send(message);
}
}
现在我向流程中发送 25 条消息,它们被 pollingBaseChain
接收,并通过网关
转发到 input.chain
并在那里进行处理。在该链内,它们被分成 5 条消息,并且 125 条消息离开 input.chain
。来自 outboundRoutingService
的 forwardMessage
记录了 25 条消息。我相信这是因为网关尝试通过 ID 来匹配消息,因此只有 25 条原始消息被拾取(其他消息会怎样?)并转发。
这是我的第一个问题,有没有办法让input.chain
发出的所有消息继续在pollingBaseChain
中流动并转发到outboundRoutingService
服务激活器?
我的第二个问题是,虽然 25 条消息通过非空 channel
到达 forwardMessage
,但只有 1 条消息被它们发送到的 channel 接收在forwardMessage
内。该 channel 是一个 QueueChannel
,其消息队列大小远超过 25 个。这些消息会在哪里丢失?
最佳答案
网关请求-回复基于 replyChannel
header 的 TemporaryReplyChannel
,本质上是一个 private final CountDownLatch replyLatch = new CountDownLatch(1) ;
。所以,这实际上是一个请求的一个回复。网关不了解下游及其产生大量消息的可能性。
为了履行“一对一”契约(Contract),您需要考虑在发送到回复 channel 之前聚合所有这些消息。
在文档中查看有关聚合器的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-routing-chapter.html#aggregator
您可以考虑将结果分割到网关之后的消息束中。
关于java - 从网关调用的嵌套链回复多条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54672064/