xml - Spring 集成 : how to process multiple messages at one time?

标签 xml spring spring-integration message-queue

我有以下配置:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactoryDefault"/>
</bean>

<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
    <int:queue message-store="mongoDbMessageStore"/>
</int:channel>

<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
    <int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>

消息处理程序定义为

@Override
public void handleMessage(Message<?> message) throws MessagingException {
    Object payload = message.getPayload();
    if (payload instanceof LogEntry) {
        LogEntry logEntry = (LogEntry) payload;
        String app = (String) message.getHeaders().get("app");
        logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
        logEntryPostProcessService.postProcess(app, logEntry);
    } else {
        throw new MessageRejectedException(message, "Unknown data type has been received.");
    }
}

我想要的是类似的东西

@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}

所以轮询器基本上在一次调用中发送所有 10 条消息,而不是每条消息调用该方法 10 次。

这样做的原因是可以批量处理 block 中的所有消息,从而提高性能。

最佳答案

这是真的,因为 ( AbstractPollingEndpoint ):

taskExecutor.execute(new Runnable() {
    @Override
    public void run() {
        int count = 0;
        while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
...
            if (!pollingTask.call()) {
                break;
            }
...
    }
});

因此您的所有消息 ( max-messages-per-poll ) 都在同一个线程中处理。 然而,它们被一个一个地发送给处理程序,而不是作为一整串。

要并行处理,您应该使用 ExecutorChannel在你的 logEntryPostProcessorReceiver 之前.像这样:

<channel id="executorChannel">
   <dispatcher task-executor="threadPoolExecutor"/>
</channel>

<bridge input-channel="logEntryChannel" output-channel="executorChannel">
   <poller max-messages-per-poll="10" fixed-rate="6000"/>
</bridge>

<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/>

更新

要将消息作为一批处理,您应该 aggregate他们。因为它们都是 polling endpoint 的结果, 没有 sequenceDetails在消息中。你可以用 correlationId 的一些假值来克服它:

<aggregator correlation-strategy-expression="T(Thread).currentThread().id"
        release-strategy-expression="size() == 10"/>

在哪里size() == 10应该等于 max-messages-per-poll .

在那之后你的 logEntryPostProcessorReceiver必须申请 listpayload秒。或者只有一条消息,payload<aggregator> 的结果列表.

关于xml - Spring 集成 : how to process multiple messages at one time?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25564291/

相关文章:

java.lang.NoClassDefFoundError : org/slf4j/LoggerFactory even though I have the right dependencies

java - 如何将EntityManager注入(inject)到Spring Integration bean中?

xml - xslt编程时 'or'和 '|'有什么区别?

c# - 从 XML 文档中删除节点时出现神秘故障

c# - 使用 XPath 排序 - 不是 XSL

java - Jackson - 将 JSON 反序列化为类

java - Spring 3 - 为 NoSuchRequestHandlingMethodException 创建 ExceptionHandler

xml - 组织 XML 文件 - FIlezilla

java - 如何检查或等待消息处理完成?

java - Spring Integration - 外部化 JDBC 查询