我有以下配置:
<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactoryDefault"/>
</bean>
<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
<int:queue message-store="mongoDbMessageStore"/>
</int:channel>
<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
<int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>
消息处理程序定义为
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof LogEntry) {
LogEntry logEntry = (LogEntry) payload;
String app = (String) message.getHeaders().get("app");
logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
logEntryPostProcessService.postProcess(app, logEntry);
} else {
throw new MessageRejectedException(message, "Unknown data type has been received.");
}
}
我想要的是类似的东西
@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}
所以轮询器基本上在一次调用中发送所有 10 条消息,而不是每条消息调用该方法 10 次。
这样做的原因是可以批量处理 block 中的所有消息,从而提高性能。
最佳答案
这是真的,因为 ( AbstractPollingEndpoint
):
taskExecutor.execute(new Runnable() {
@Override
public void run() {
int count = 0;
while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
if (!pollingTask.call()) {
break;
}
...
}
});
因此您的所有消息 ( max-messages-per-poll
) 都在同一个线程中处理。
然而,它们被一个一个地发送给处理程序,而不是作为一整串。
要并行处理,您应该使用 ExecutorChannel
在你的 logEntryPostProcessorReceiver
之前.像这样:
<channel id="executorChannel">
<dispatcher task-executor="threadPoolExecutor"/>
</channel>
<bridge input-channel="logEntryChannel" output-channel="executorChannel">
<poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>
<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>
更新
要将消息作为一批处理,您应该 aggregate
他们。因为它们都是 polling endpoint
的结果, 没有 sequenceDetails
在消息中。你可以用 correlationId
的一些假值来克服它:
<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 10"/>
在哪里size() == 10
应该等于 max-messages-per-poll
.
在那之后你的 logEntryPostProcessorReceiver
必须申请 list
的 payload
秒。或者只有一条消息,payload
是 <aggregator>
的结果列表.
关于xml - Spring 集成 : how to process multiple messages at one time?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25564291/