java - Spring 集成 : Persistent and transactional QueueChannel

标签 java spring spring-integration

在 Spring Integration 中,我们有一个看起来像这样的Setup:

                                                     ---> 
                                                     ---> 
(dispatcher) Messages --> Gateway ----> QueueChannel ---> MessageHandler (worker)
                                                     ---> 
                                                     ---> 

所以我们有一个 Dispatcher Thread,它从 MQTT-Broker 获取消息并将它们转发到队列中。 Queue 的 Poller 提供了一个 TaskExecuter,因此 Consumer 是多线程的。 我们设法实现了所有功能。所以刚刚描述的设置已经实现。

现在保证没有数据丢失我们想做两件事:

1.: 我们希望我们的队列能够持久化数据,这样当 Programm 异常关闭时,队列中的所有数据仍会存在。 这对我们也很有效,我们使用 MongoDB 作为数据库,因为我们在您的文档中的某个地方读到这是推荐的方法。

2.: 我们要确保的第二件事是工作线程是事务性的。因此,只有当工作线程正确返回时,消息才会从队列中永久删除(因此持久性 MessageStore)。如果程序在处理消息(由工作线程)期间关闭,则消息将在下次启动时仍在队列中。 此外,例如,如果工作人员在处理消息期间抛出异常,它将被放回队列中。

我们的实现:

如前所述,程序的基本设置已经实现。然后,我们使用队列的消息存储实现扩展了基本实现。

队列 channel :

@Bean
public PollableChannel inputChannel(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new QueueChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "inputChannel"));
}

由 Messagestore 支持:

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

匹配的轮询器:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    PollerMetadata poll = Pollers.fixedDelay(10).get(); 
    poll.setTaskExecutor(consumer);
    return poll;
}

执行者:

private Executor consumer = Executors.newFixedThreadPool(5);

我们尝试了什么? 正如现在所解释的,我们想用事务功能扩展这个实现。我们尝试像解释的那样使用 setTransactionSynchronizationFactory here但它没有工作(没有收到错误或任何东西,但行为仍然与我们添加 TransactionSynchronizer 之前一样):

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    PollerMetadata poll = Pollers.fixedDelay(10).get(); 
    poll.setTaskExecutor(consumer);

    BeanFactory factory = mock(BeanFactory.class);
    ExpressionEvaluatingTransactionSynchronizationProcessor etsp = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    etsp.setBeanFactory(factory);
    etsp.setAfterRollbackChannel(inputChannel());
    etsp.setAfterRollbackExpression(new SpelExpressionParser().parseExpression("#bix"));
    etsp.setAfterCommitChannel(inputChannel());
    etsp.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
    DefaultTransactionSynchronizationFactory dtsf = new DefaultTransactionSynchronizationFactory(etsp);

    poll.setTransactionSynchronizationFactory(dtsf);

    return poll;
}

在 Spring Integration 中实现我们的需求的最佳方式是什么?

编辑: 按照答案中的建议,我选择使用 JdbcChannelMessageStore 来执行此操作。所以我尝试转换描述的 XML 实现 here (18.4.2) 进入 Java。我不太确定该怎么做,这是我迄今为止尝试过的:

我创建了 H2 数据库并运行了显示的脚本 here在上面。

创建的 JDBCChannelMessageStore Bean:

@Bean
public JdbcChannelMessageStore store() {
    JdbcChannelMessageStore ms =  new JdbcChannelMessageStore();
    ms.setChannelMessageStoreQueryProvider(queryProvider());
    ms.setUsingIdCache(true);
    ms.setDataSource(dataSource);
    return ms;
}

创建了 H2ChannelMessageStoreQueryProvider

    @Bean
    public ChannelMessageStoreQueryProvider queryProvider() {
        return new H2ChannelMessageStoreQueryProvider();
    }

调整轮询器:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() throws Exception {
    PollerMetadata poll = Pollers.fixedDelay(10).get();
    poll.setTaskExecutor(consumer);
    poll.setAdviceChain(Collections.singletonList(transactionInterceptor()));

    return poll;
}

Autowiring 我的 PlaatformTransactionManager:

@Autowired
PlatformTransactionManager transactionManager;

并从 TransactonManager 创建了 TransactionInterceptor:

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRED)
                .build();
}

最佳答案

如果您需要将队列作为事务处理,您绝对应该查看事务处理MessageStore。只有 JDBC 是这样的。只是因为只有 JDBC 支持事务。因此,当我们执行 DELETE 时,只有提交了 TX 才可以。

MongoDB 和任何其他 NoSQL 数据库都不支持这种模型,因此您只能使用 TransactionSynchronizationFactory 将失败的消息推送回数据库。

更新

@RunWith(SpringRunner.class)
@DirtiesContext
public class So47264688Tests {

    private static final String MESSAGE_GROUP = "transactionalQueueChannel";

    private static EmbeddedDatabase dataSource;

    @BeforeClass
    public static void init() {
        dataSource = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("classpath:/org/springframework/integration/jdbc/schema-drop-h2.sql")
                .addScript("classpath:/org/springframework/integration/jdbc/schema-h2.sql")
                .build();
    }

    @AfterClass
    public static void destroy() {
        dataSource.shutdown();
    }

    @Autowired
    private PollableChannel transactionalQueueChannel;

    @Autowired
    private JdbcChannelMessageStore jdbcChannelMessageStore;

    @Autowired
    private PollingConsumer serviceActivatorEndpoint;

    @Autowired
    private CountDownLatch exceptionLatch;

    @Test
    public void testTransactionalQueueChannel() throws InterruptedException {
        GenericMessage<String> message = new GenericMessage<>("foo");
        this.transactionalQueueChannel.send(message);

        assertTrue(this.exceptionLatch.await(10, TimeUnit.SECONDS));
        this.serviceActivatorEndpoint.stop();

        assertEquals(1, this.jdbcChannelMessageStore.messageGroupSize(MESSAGE_GROUP));
        Message<?> messageFromStore = this.jdbcChannelMessageStore.pollMessageFromGroup(MESSAGE_GROUP);

        assertNotNull(messageFromStore);
        assertEquals(message, messageFromStore);
    }

    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public PlatformTransactionManager transactionManager() {
            return new DataSourceTransactionManager(dataSource);
        }

        @Bean
        public ChannelMessageStoreQueryProvider queryProvider() {
            return new H2ChannelMessageStoreQueryProvider();
        }

        @Bean
        public JdbcChannelMessageStore jdbcChannelMessageStore() {
            JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
            jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(queryProvider());
            return jdbcChannelMessageStore;
        }

        @Bean
        public PollableChannel transactionalQueueChannel() {
            return new QueueChannel(new MessageGroupQueue(jdbcChannelMessageStore(), MESSAGE_GROUP));
        }

        @Bean
        public TransactionInterceptor transactionInterceptor() {
            return new TransactionInterceptorBuilder()
                    .transactionManager(transactionManager())
                    .isolation(Isolation.READ_COMMITTED)
                    .propagation(Propagation.REQUIRED)
                    .build();
        }

        @Bean
        public TaskExecutor threadPoolTaskExecutor() {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setCorePoolSize(5);
            return threadPoolTaskExecutor;
        }

        @Bean(name = PollerMetadata.DEFAULT_POLLER)
        public PollerMetadata poller() {
            return Pollers.fixedDelay(10)
                    .advice(transactionInterceptor())
                    .taskExecutor(threadPoolTaskExecutor())
                    .get();
        }

        @Bean
        public CountDownLatch exceptionLatch() {
            return new CountDownLatch(2);
        }

        @ServiceActivator(inputChannel = "transactionalQueueChannel")
        public void handle(Message<?> message) {
            System.out.println(message);
            try {
                throw new RuntimeException("Intentional for rollback");
            }
            finally {
                exceptionLatch().countDown();
            }
        }

    }

}

关于java - Spring 集成 : Persistent and transactional QueueChannel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47264688/

相关文章:

java.util.NoSuchElementException : No line found error java

java - 使用 Mongodb 版本 4 和副本配置 flapdoodle 嵌入式 mongo

java - 如何在 Spring-Integration 中使用相同的 MessageChannel 进行发送和接收?

java - Spring Integration LoggingHandler 中的 NPE

java - JedisPool 内存泄漏

java - 将 JQuery 数组放入 Java ArrayList 字符串中然后使用它

java - 放心 : How to disable PreAuthorize when

java - Spring中将Controller1的方法调用到Controller2的另一个方法中

spring-integration - 无法使用 Spring Boot 注册 HttpRequestHandlerServlet

java - 循环没有像预期的那样将项目添加到列表中