java - Spring Integration - 具有消息存储的队列 - 如何检查数据库中的持久消息

标签 java spring spring-integration

<bean id="sessionQueuePersistance"
    class="com.emph.SessionQueuePersistance">
    <constructor-arg name="sessionInboundChannel" ref="sessionInboundChannel" />
</bean>

<int:channel id="sessionInboundChannel">
    <int:queue message-store="sessionStore" />
    <int:interceptors>
        <int:ref bean="messageListener"/>
    </int:interceptors>
</int:channel>

<bean id="messageListener" class="com.emph.MessageListener" />

<!-- int:logging-channel-adapter id="logger" log-full-message="true" level="INFO" /-->

<bean id="sessionStore"
    class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="queueDataSource" />
    <property name="channelMessageStoreQueryProvider" ref="queryProvider" />
    <property name="tablePrefix" value="QUEUE_" />
</bean>

<bean id="queryProvider"
    class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />

<bean id="queueDataSource"
    class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:XE" />
    <property name="username" value="test" />
    <property name="password" value="test" />
</bean>

<!-- the bridge polls the persisted messages and forwards them to the output 
    channel -->
<int:bridge input-channel="sessionInboundChannel"
    output-channel="sessionOutboundChannel">
    <int:poller fixed-rate="1000" max-messages-per-poll="-1">
        <int:transactional transaction-manager="txManager" />
    </int:poller>
</int:bridge>

<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="queueDataSource"/>
</bean>

<int:channel id="sessionOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<bean id="sessionQueueReader" class="com.emph.SessionReader" />

<int:service-activator input-channel="sessionOutboundChannel"
    ref="sessionQueueReader" method="handleMessage" />

<bean id="taskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="4" />
    <property name="maxPoolSize" value="4" />
</bean>

关于此事的测试用例:

public class SessionQueuePersistanceTest {

  public static void main(String[] args) {

    AbstractApplicationContext context =
        new ClassPathXmlApplicationContext("/session-queue-config.xml",
            SessionQueuePersistanceTest.class);

    SessionQueuePersistance sessionQueuePersistance =
        (SessionQueuePersistance) context.getBean("sessionQueuePersistance");

    sessionQueuePersistance.addSessionToQueue("testSession1");
    sessionQueuePersistance.addSessionToQueue("testSession2");
    sessionQueuePersistance.addSessionToQueue("testSession3");
    sessionQueuePersistance.addSessionToQueue("testSession4");
    sessionQueuePersistance.addSessionToQueue("testSession5");
    sessionQueuePersistance.addSessionToQueue("testSession6");
    sessionQueuePersistance.addSessionToQueue("testSession7");
    sessionQueuePersistance.addSessionToQueue("testSession8");
    sessionQueuePersistance.addSessionToQueue("testSession9");

    try {
       TimeUnit.SECONDS.sleep(360);
     }catch(InteruptedException ex){

     }
    context.close();
  }

}

public class SessionQueuePersistance {

  private final QueueChannel sessionInboundChannel;


  public SessionQueuePersistance(QueueChannel sessionInboundChannel) {
    super();
    this.sessionInboundChannel = sessionInboundChannel;
  }

  public boolean addSessionToQueue(String sessionString) {
    Message<?> message = MessageBuilder.withPayload(sessionString).build();
    executorChannel.send(message);
    System.out.println(" --- addSessionToQueue: " + sessionString);
    return true;
  }

}

public class SessionReader {

  public void handleMessage(String inputSession) {
    System.out.println(" --- handling: " + inputSession);
  }

}

我是 Spring 集成的新手,也许我的配置存在误解。

我的用例如下。我从 SessionQueuePersistance Bean 向 sessionInboundChannel 发送消息。该 channel 包装在具有消息存储属性的队列中 - 消息应存储在数据库中。

日志:

   2015-12-17 09:15:37,726 [INFO ] org.springframework.integration.config.IntegrationRegistrar - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2015-12-17 09:15:37,736 [DEBUG] org.springframework.integration.config.IntegrationRegistrar - SpEL function '#jsonPath' isn't registered: there is no jayway json-path.jar on the classpath.
2015-12-17 09:15:37,736 [DEBUG] org.springframework.integration.config.IntegrationRegistrar - SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
2015-12-17 09:15:37,806 [INFO ] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2015-12-17 09:15:37,816 [INFO ] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2015-12-17 09:15:38,046 [DEBUG] org.springframework.integration.handler.ServiceActivatingHandler - Unable to attempt conversion of Message payload types. Component 'null' has no explicit ConversionService reference, and there is no 'integrationConversionService' bean within the context.
2015-12-17 09:15:38,056 [DEBUG] org.springframework.integration.config.GlobalChannelInterceptorProcessor - No global channel interceptors.
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.endpoint.PollingConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'sessionOutboundChannel' channel
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@5dccce3c.sessionOutboundChannel' has 1 subscriber(s).
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@5dccce3c.errorChannel' has 1 subscriber(s).
2015-12-17 09:15:38,066 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - started _org.springframework.integration.errorLogger
2015-12-17 09:15:38,066 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession1][Headers={id=a60e5680-9025-b62a-1eb9-5f032e9f7997, timestamp=1450340138066}]
2015-12-17 09:15:38,267 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=a60e5680-9025-b62a-1eb9-5f032e9f7997
2015-12-17 09:15:38,268 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession1][Headers={id=a60e5680-9025-b62a-1eb9-5f032e9f7997, timestamp=1450340138066}]
 --- addSessionToQueue: testSession1
2015-12-17 09:15:38,268 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession2][Headers={id=1e6e2629-6c80-e6a7-7748-1be25aac4e02, timestamp=1450340138268}]
2015-12-17 09:15:38,318 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:38,349 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=1e6e2629-6c80-e6a7-7748-1be25aac4e02
2015-12-17 09:15:38,349 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession2][Headers={id=1e6e2629-6c80-e6a7-7748-1be25aac4e02, timestamp=1450340138268}]
 --- addSessionToQueue: testSession2
2015-12-17 09:15:38,349 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession3][Headers={id=5ba804a1-a309-ed55-de9b-744e9b2f5c23, timestamp=1450340138349}]
2015-12-17 09:15:38,369 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=5ba804a1-a309-ed55-de9b-744e9b2f5c23
2015-12-17 09:15:38,379 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession3][Headers={id=5ba804a1-a309-ed55-de9b-744e9b2f5c23, timestamp=1450340138349}]
 --- addSessionToQueue: testSession3
2015-12-17 09:15:38,379 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession4][Headers={id=58d85c60-4f43-3dbf-fe9d-dc4ab4853315, timestamp=1450340138379}]
2015-12-17 09:15:38,389 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=58d85c60-4f43-3dbf-fe9d-dc4ab4853315
2015-12-17 09:15:38,399 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession4][Headers={id=58d85c60-4f43-3dbf-fe9d-dc4ab4853315, timestamp=1450340138379}]
 --- addSessionToQueue: testSession4
2015-12-17 09:15:38,399 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession5][Headers={id=75b03190-1dfd-9c32-8339-f23e1ba5afd6, timestamp=1450340138399}]
2015-12-17 09:15:38,409 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=75b03190-1dfd-9c32-8339-f23e1ba5afd6
2015-12-17 09:15:38,409 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession5][Headers={id=75b03190-1dfd-9c32-8339-f23e1ba5afd6, timestamp=1450340138399}]
 --- addSessionToQueue: testSession5
2015-12-17 09:15:38,409 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession6][Headers={id=f7826aa2-d0f2-008e-9ed5-d8ab7a20a38d, timestamp=1450340138409}]
2015-12-17 09:15:38,429 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=f7826aa2-d0f2-008e-9ed5-d8ab7a20a38d
2015-12-17 09:15:38,429 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession6][Headers={id=f7826aa2-d0f2-008e-9ed5-d8ab7a20a38d, timestamp=1450340138409}]
 --- addSessionToQueue: testSession6
2015-12-17 09:15:38,429 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession7][Headers={id=85daa925-bdba-eed3-84e2-31a131f9530f, timestamp=1450340138429}]
2015-12-17 09:15:38,439 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=85daa925-bdba-eed3-84e2-31a131f9530f
2015-12-17 09:15:38,449 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession7][Headers={id=85daa925-bdba-eed3-84e2-31a131f9530f, timestamp=1450340138429}]
 --- addSessionToQueue: testSession7
2015-12-17 09:15:38,449 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession8][Headers={id=aca22ee7-4e04-be38-ef75-0b9ed241af4c, timestamp=1450340138449}]
2015-12-17 09:15:38,459 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=aca22ee7-4e04-be38-ef75-0b9ed241af4c
2015-12-17 09:15:38,469 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession8][Headers={id=aca22ee7-4e04-be38-ef75-0b9ed241af4c, timestamp=1450340138449}]
 --- addSessionToQueue: testSession8
2015-12-17 09:15:38,469 [DEBUG] org.springframework.integration.channel.QueueChannel - preSend on channel 'sessionInboundChannel', message: [Payload String content=testSession9][Headers={id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:38,479 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=5ce08183-8026-53e6-9bf7-55e1a4905e53
2015-12-17 09:15:38,479 [DEBUG] org.springframework.integration.channel.QueueChannel - postSend (sent=true) on channel 'sessionInboundChannel', message: [Payload String content=testSession9][Headers={id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
 --- addSessionToQueue: testSession9
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Message with id '5ce08183-8026-53e6-9bf7-55e1a4905e53' was deleted.
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.channel.QueueChannel - postReceive on channel 'sessionInboundChannel', message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Poll resulted in Message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.handler.BridgeHandler - org.springframework.integration.handler.BridgeHandler#0 received message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.handler.BridgeHandler - handler 'org.springframework.integration.handler.BridgeHandler#0' sending reply Message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.channel.ExecutorChannel - preSend on channel 'sessionOutboundChannel', message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:39,111 [DEBUG] org.springframework.integration.channel.ExecutorChannel - postSend (sent=true) on channel 'sessionOutboundChannel', message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:39,131 [DEBUG] org.springframework.integration.handler.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@77954f89] received message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
 --- handling started testSession9
2015-12-17 09:15:40,161 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
 --- handling finished testSession9
2015-12-17 09:15:41,134 [DEBUG] org.springframework.integration.handler.ServiceActivatingHandler - handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@77954f89]' produced no reply for request Message: [Payload String content=testSession9][Headers={JdbcChannelMessageStore.SAVED=true, JdbcChannelMessageStore.CREATED_DATE=1450340138469, id=5ce08183-8026-53e6-9bf7-55e1a4905e53, timestamp=1450340138469}]
2015-12-17 09:15:41,194 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:42,235 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:43,256 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:44,277 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:45,298 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:46,320 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:47,341 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:48,363 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:49,385 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:50,406 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:51,427 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:52,448 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:53,469 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:54,490 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:55,522 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:56,556 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:57,577 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:58,609 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:15:59,641 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:00,672 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:01,703 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:02,724 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:03,745 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:04,776 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:05,797 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:06,821 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:07,852 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:08,874 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:09,896 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:10,920 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:11,952 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:12,973 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:13,994 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.endpoint.PollingConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2015-12-17 09:16:14,487 [DEBUG] org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - Removing {service-activator} as a subscriber to the 'sessionOutboundChannel' channel
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@5dccce3c.sessionOutboundChannel' has 0 subscriber(s).
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#1
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@5dccce3c.errorChannel' has 0 subscriber(s).
2015-12-17 09:16:14,487 [INFO ] org.springframework.integration.endpoint.EventDrivenConsumer - stopped _org.springframework.integration.errorLogger

最佳答案

好吧,您没有提及任何自定义 ChannelInterceptor,但这并不重要:根据您的日志我们看到

[DEBUG] org.springframework.integration.jdbc.store.JdbcChannelMessageStore - Inserting message with id key=

等等。

所以,一切都按预期进行。

我在此处复制我的评论以结束问题:

我不确定它应该如何工作:您将 QueueChannel 注入(inject) sessionInboundChannel,但期望 ExecutorChannel executorChannel。所以,看起来你在在这里发布之前修改了很多。因此,请在运行测试后分享整个 org.springframework.integration 类别的 DEBUG 。同时我没有发现您的配置有问题...

你的回答在这里没有意义。请删除它。

关于java - Spring Integration - 具有消息存储的队列 - 如何检查数据库中的持久消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34317376/

相关文章:

java - 如何在 spring-integration-kafka 2.1.0.RELEASE 和 Kafka 0.10.0 中为不同的主题配置不同的生产者?

java - Java 8 默认接口(interface)方法上的 Spring Integration @ServiceActivator

java - 使用来自另一个类的变量在构造函数中创建数组列表?

java - JTable 列宽仅在两次请求后更改

java - Spring NullPointerException 尽管使用了相关注释

java - H2数据库是否兼容Oracle 'Insert All'语句?

spring-integration - 对 HTTP 出站网关进行异步调用

java - 创建具有以 base 64 编码的字节数组的 PrivateKey 和 PublicKey

java - 从单元测试启动和停止 hsqldb

java - spring boot 启动后有问题