spring - 使用 Spring 集成设计电子邮件每日摘要功能

标签 spring aggregate spring-batch spring-integration

我目前有一个 SI 项目,它监听 JMS 队列并进行一些处理,并根据消息发送电子邮件、写入文件......

现在我想在发送电子邮件部分添加每日摘要功能。我希望将电子邮件存储在某个地方,并且每天一次对具有相同目标电子邮件的所有邮件进行分组,连接内容并发送电子邮件。

对此最好的设计解决方案是什么? 我检查了聚合器概念,但我有几个问题:

  • 我需要持久存储聚合消息。聚合消息每天只发送一次,因此我不想丢失消息。我想我应该使用 JdbcMessageStore?
  • 我需要来自输入队列 -> 聚合器的事务,然后需要从聚合器到电子邮件输出的另一个事务。这可能吗?如果是,我应该如何配置?

感谢您的帮助。

干杯

编辑

我现在尝试如下:

<int:aggregator id="templatingDailyAggregator"
    input-channel="templatingDailyAggregatorInputChannel" 
    output-channel="templatingDailyAggregatorOutputChannel"

    message-store="templatingEmailAggregatorStore"
    correlation-strategy-expression="headers['templatingEmailGroupingCategory']+payload.emailMessage.email"
    release-strategy-expression="false"

    send-partial-result-on-expiry="true"
    expire-groups-upon-completion="true"
    >

    <int:expire-transactional transaction-manager="templatingAggregatorStoreTransactionManager"/>
</int:aggregator>

<int-jdbc:message-store id="templatingEmailAggregatorStore" data-source="templatingEmailAggregatorStoreDataSource" />

<bean id="templatingAggregatorStoreTransactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="templatingEmailAggregatorStoreDataSource" />
</bean>

<!-- MySQL DB DataSource -->
<bean id="templatingEmailAggregatorStoreDataSource"
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">

    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="MYURL" />
    <property name="username" value="MYUSER" />
    <property name="password" value="MYPASS" />
</bean>

<bean id="templatingEmailAggregatorStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="templatingEmailAggregatorStore"/>
    <property name="timeout" value="10"/>
</bean>

<task:scheduled-tasks scheduler="templatingAggregatorScheduler">
    <task:scheduled ref="templatingEmailAggregatorStoreReaper" method="run" cron="0 */2 * * * * "/>
</task:scheduled-tasks>

<task:scheduler id="templatingAggregatorScheduler"/>

<int:transformer id="templatingDailyDigestTransformer" ref="templatingDailyDigestTransformerBean" input-channel="templatingDailyAggregatorOutputChannel" method="processMessage" output-channel="emailOutputChannel"/>

但是有一个问题,如果我在聚合部分之后出现异常(例如在发送电子邮件期间),我会期望 MessageGroup 消耗被回滚,因此仍然存在于数据库中。然而,情况并非如此,MessageGroup 已被消耗,不再存在于数据库中,因此丢失了。

编辑2

我在上面的 XML 中的聚合器后面添加了转换器。目前,这个变压器只是引发一个异常来测试崩溃的情况。 这是我选择的堆栈跟踪:

DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
INFO  [exec-1] - Expiring MessageGroup with correlationKey[fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - Prematurely releasing partially complete group with key [fb90fe78-c3df-3793-9ee8-acae4924bebe] to: templatingDailyAggregatorOutputChannel
DEBUG [exec-1] - Completing group with correlationKey [fb90fe78-c3df-3793-9ee8-acae4924bebe]
DEBUG [exec-1] - org.springframework.integration.transformer.MessageTransformingHandler@2fdde28c received message: GenericMessage [payload=[ch.post.ehealth.extcom.templating.core.PreAggregatorEmailMessage@563b5cce], headers={jms_timestamp=1426090799820, extcomPluginDestination=templatingPluginInputChannel, extcomId=5782d059-c88d-44ae-82a1-0b738b43e821, jms_messageId=ID:some-vm-45363-1421229178337-3:9:1:1:4, timestamp=1426090920046, id=b8330bed-fc74-c8d5-6838-a3116a05ab39, history=jmsInputAdapter,inputChannel,xmlToSpringIntTransformer,pluginRouterChannel,pluginRouter,templatingPluginInputChannel,templatingTransformer,templatingPluginOutputChannel,templatingOutputRouter,templatingEmailOutputChannel,templatingEmailGroupingRouter,templatingPreAggregatorChannel,templatingPreAggregatorTransformer,templatingDailyAggregatorInputChannel,templatingDailyAggregator,templatingDailyAggregatorOutputChannel, JdbcMessageStore.CREATED_DATE=1426090800772, jms_type=, jms_redelivered=false, priority=0, templatingEmailGrouping=DAILY, jms_correlationId=, JdbcMessageStore.SAVED=true, templatingEmailGroupingCategory=DAILY}]
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [select MESSAGE_ID, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (select MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?) ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, CREATED_DATE, MESSAGE_BYTES from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning BLOB as bytes
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE where MESSAGE_ID=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_GROUP_TO_MESSAGE where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Removing relationships for the group with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL update
DEBUG [exec-1] - Executing prepared SQL statement [DELETE from INT_MESSAGE_GROUP where GROUP_KEY=? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Marking messages with group key=fb90fe78-c3df-3793-9ee8-acae4924bebe
DEBUG [exec-1] - SQL update affected 1 rows
DEBUG [exec-1] - Returning JDBC Connection to DataSource
ERROR [exec-1] - Exception in expiry callbackjava.lang.RuntimeException: test crash
    at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) [spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) [spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) [spring-integration-core-4.1.2.RELEASE.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]

DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT MESSAGE_ID, MESSAGE_BYTES, CREATED_DATE from INT_MESSAGE where MESSAGE_ID in (SELECT MESSAGE_ID from INT_GROUP_TO_MESSAGE where GROUP_KEY = ?) and REGION=? ORDER BY CREATED_DATE]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT COMPLETE, LAST_RELEASED_SEQUENCE, CREATED_DATE, UPDATED_DATE from INT_MESSAGE_GROUP where GROUP_KEY = ? and REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource
DEBUG [exec-1] - Group expiry candidate (fb90fe78-c3df-3793-9ee8-acae4924bebe) has changed - it may be reconsidered for a future expiration
ERROR [exec-1] - Unexpected error occurred in scheduled task.java.lang.RuntimeException: test crash
    at ch.post.ehealth.extcom.templating.core.DailyDigestTransformer.processMessage(DailyDigestTransformer.java:33) ~[extcom-templating.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:63) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:95) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:44) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:84) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:112) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:111) ~[spring-expression-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:64) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:68) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
Wrapped by: org.springframework.integration.transformer.MessageTransformationException: ; nested exception is org.springframework.messaging.MessageHandlingException: ; nested exception is java.lang.RuntimeException: test crash
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:74) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.1.4.RELEASE.jar:4.1.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:657) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:642) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.expireGroup(AbstractCorrelatingMessageHandler.java:619) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.forceComplete(AbstractCorrelatingMessageHandler.java:543) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$ForceReleaseMessageGroupProcessor.processMessageGroup(AbstractCorrelatingMessageHandler.java:721) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler$1.execute(AbstractCorrelatingMessageHandler.java:168) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expire(AbstractMessageGroupStore.java:169) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.AbstractMessageGroupStore.expireMessageGroups(AbstractMessageGroupStore.java:113) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at org.springframework.integration.store.MessageGroupStoreReaper.run(MessageGroupStoreReaper.java:115) ~[spring-integration-core-4.1.2.RELEASE.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.7.0_71]
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.0.8.RELEASE.jar:4.0.8.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.7.0_71]
    at java.lang.Thread.run(Unknown Source) [na:1.7.0_71]

DEBUG [exec-1] - Expiring all messages older than timeout=10 from message group store: org.springframework.integration.jdbc.JdbcMessageStore@3e082583
DEBUG [exec-1] - Executing prepared SQL query
DEBUG [exec-1] - Executing prepared SQL statement [SELECT distinct GROUP_KEY as CREATED from INT_MESSAGE_GROUP where REGION=?]
DEBUG [exec-1] - Fetching JDBC Connection from DataSource
DEBUG [exec-1] - Creating new JDBC DriverManager Connection to [jdbc:mysql://localhost:3306/aggregator]
DEBUG [exec-1] - Returning JDBC Connection to DataSource

最佳答案

嗯,看来你走对了。

您可以将聚合器封装到 JMS 和 JDBC 的 TX,并按所需的 correlationKey 对消息进行分组。 ,例如correlation-key-expression .

既然你不想release分组直到一些日常事件(例如 cron trigget),您应该使用选项标记您的聚合器:

release-strategy-expression="false"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
  • 第一个:不允许正常release
  • 第二个:将聚合器结果发送到output-channel ,但不是discard-channelexpire
  • 第三个:从MessageStore中删除组允许形成相同的新鲜correlationKey

要使其正常工作,您应该配置 MessageGroupStoreReaper :

<bean id="reaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="10"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" cron="0 0 * * * * "/>
</task:scheduled-tasks>

<task:scheduler id="scheduler"/>

Reaper调用MessageGroupCallback它是在聚合器初始化期间注册的。该回调调用 forceComplete在聚合器上允许使用所有提到的选项。

从 SI-4.1 <aggregator> 开始支持<expire-transactional>允许包装 forceComplete 的子元素到 TX,就像你问的那样。

之前我们需要将 <tx:advice> 包装到 TX(例如使用 MessageGroupStoreReaper.run() )方法。

希望我说得清楚。

更新

抱歉,我发现我们有一个错误。在关于 <expire-transactional> 的文档中我们说:

Allows a transaction to be started for the forceComplete operation. It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add/release/discard operations. Only this sub-element or <expire-advice-chain/> is allowed.

但这不是真的。代码如下所示:

if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
    ProxyFactory proxyFactory = new ProxyFactory(processor);
    for (Advice advice : this.forceReleaseAdviceChain) {
        proxyFactory.addAdvice(advice);
    }
    return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
}

因此,我们应用 <expire-transactional>仅当 group-timeout(-expression)已提供,但这不是 MessageGroupStoreReaper 的情况.

欢迎就此事提出 JIRA 问题,我们会尽快处理。

与此同时,作为解决方法,您应该使用 <tx:advice>对于MessageGroupStoreReaper.run() .

关于spring - 使用 Spring 集成设计电子邮件每日摘要功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28761931/

相关文章:

不使用spring标签的spring mvc表单处理

spring - Spring 批的优点

java - Spring Batch 从监听器类调用 DAO 类

java - Spring MVC MongoDB 配置

java - Hibernate @ManyToOne @JoinColumn 始终为 null

python - 如何在Python中按天对时间序列数据求和? resample.sum() 没有效果

数据框中的 R 条件总和取决于列中的单词

java - 重新启动不可重新启动的作业的新实例

java - Spring 安全和身份验证提供程序

mysql - 按月分组并返回结果为 0 的月份按年过滤