在 Spring Integration 中,我们有一个看起来像这样的Setup:
--->
--->
(dispatcher) Messages --> Gateway ----> QueueChannel ---> MessageHandler (worker)
--->
--->
所以我们有一个 Dispatcher Thread,它从 MQTT-Broker 获取消息并将它们转发到队列中。 Queue 的 Poller 提供了一个 TaskExecuter,因此 Consumer 是多线程的。 我们设法实现了所有功能。所以刚刚描述的设置已经实现。
现在保证没有数据丢失我们想做两件事:
1.: 我们希望我们的队列能够持久化数据,这样当 Programm 异常关闭时,队列中的所有数据仍会存在。 这对我们也很有效,我们使用 MongoDB 作为数据库,因为我们在您的文档中的某个地方读到这是推荐的方法。
2.: 我们要确保的第二件事是工作线程是事务性的。因此,只有当工作线程正确返回时,消息才会从队列中永久删除(因此持久性 MessageStore)。如果程序在处理消息(由工作线程)期间关闭,则消息将在下次启动时仍在队列中。 此外,例如,如果工作人员在处理消息期间抛出异常,它将被放回队列中。
我们的实现:
如前所述,程序的基本设置已经实现。然后,我们使用队列的消息存储实现扩展了基本实现。
队列 channel :
@Bean
public PollableChannel inputChannel(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new QueueChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "inputChannel"));
}
由 Messagestore 支持:
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
匹配的轮询器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
PollerMetadata poll = Pollers.fixedDelay(10).get();
poll.setTaskExecutor(consumer);
return poll;
}
执行者:
private Executor consumer = Executors.newFixedThreadPool(5);
我们尝试了什么? 正如现在所解释的,我们想用事务功能扩展这个实现。我们尝试像解释的那样使用 setTransactionSynchronizationFactory here但它没有工作(没有收到错误或任何东西,但行为仍然与我们添加 TransactionSynchronizer 之前一样):
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
PollerMetadata poll = Pollers.fixedDelay(10).get();
poll.setTaskExecutor(consumer);
BeanFactory factory = mock(BeanFactory.class);
ExpressionEvaluatingTransactionSynchronizationProcessor etsp = new ExpressionEvaluatingTransactionSynchronizationProcessor();
etsp.setBeanFactory(factory);
etsp.setAfterRollbackChannel(inputChannel());
etsp.setAfterRollbackExpression(new SpelExpressionParser().parseExpression("#bix"));
etsp.setAfterCommitChannel(inputChannel());
etsp.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
DefaultTransactionSynchronizationFactory dtsf = new DefaultTransactionSynchronizationFactory(etsp);
poll.setTransactionSynchronizationFactory(dtsf);
return poll;
}
在 Spring Integration 中实现我们的需求的最佳方式是什么?
编辑: 按照答案中的建议,我选择使用 JdbcChannelMessageStore 来执行此操作。所以我尝试转换描述的 XML 实现 here (18.4.2) 进入 Java。我不太确定该怎么做,这是我迄今为止尝试过的:
我创建了 H2 数据库并运行了显示的脚本 here在上面。
创建的 JDBCChannelMessageStore Bean:
@Bean
public JdbcChannelMessageStore store() {
JdbcChannelMessageStore ms = new JdbcChannelMessageStore();
ms.setChannelMessageStoreQueryProvider(queryProvider());
ms.setUsingIdCache(true);
ms.setDataSource(dataSource);
return ms;
}
创建了 H2ChannelMessageStoreQueryProvider
@Bean
public ChannelMessageStoreQueryProvider queryProvider() {
return new H2ChannelMessageStoreQueryProvider();
}
调整轮询器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() throws Exception {
PollerMetadata poll = Pollers.fixedDelay(10).get();
poll.setTaskExecutor(consumer);
poll.setAdviceChain(Collections.singletonList(transactionInterceptor()));
return poll;
}
Autowiring 我的 PlaatformTransactionManager:
@Autowired
PlatformTransactionManager transactionManager;
并从 TransactonManager 创建了 TransactionInterceptor:
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRED)
.build();
}
最佳答案
如果您需要将队列作为事务处理,您绝对应该查看事务处理MessageStore
。只有 JDBC 是这样的。只是因为只有 JDBC 支持事务。因此,当我们执行 DELETE
时,只有提交了 TX 才可以。
MongoDB 和任何其他 NoSQL 数据库都不支持这种模型,因此您只能使用 TransactionSynchronizationFactory
将失败的消息推送回数据库。
更新
@RunWith(SpringRunner.class)
@DirtiesContext
public class So47264688Tests {
private static final String MESSAGE_GROUP = "transactionalQueueChannel";
private static EmbeddedDatabase dataSource;
@BeforeClass
public static void init() {
dataSource = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("classpath:/org/springframework/integration/jdbc/schema-drop-h2.sql")
.addScript("classpath:/org/springframework/integration/jdbc/schema-h2.sql")
.build();
}
@AfterClass
public static void destroy() {
dataSource.shutdown();
}
@Autowired
private PollableChannel transactionalQueueChannel;
@Autowired
private JdbcChannelMessageStore jdbcChannelMessageStore;
@Autowired
private PollingConsumer serviceActivatorEndpoint;
@Autowired
private CountDownLatch exceptionLatch;
@Test
public void testTransactionalQueueChannel() throws InterruptedException {
GenericMessage<String> message = new GenericMessage<>("foo");
this.transactionalQueueChannel.send(message);
assertTrue(this.exceptionLatch.await(10, TimeUnit.SECONDS));
this.serviceActivatorEndpoint.stop();
assertEquals(1, this.jdbcChannelMessageStore.messageGroupSize(MESSAGE_GROUP));
Message<?> messageFromStore = this.jdbcChannelMessageStore.pollMessageFromGroup(MESSAGE_GROUP);
assertNotNull(messageFromStore);
assertEquals(message, messageFromStore);
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public PlatformTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public ChannelMessageStoreQueryProvider queryProvider() {
return new H2ChannelMessageStoreQueryProvider();
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(queryProvider());
return jdbcChannelMessageStore;
}
@Bean
public PollableChannel transactionalQueueChannel() {
return new QueueChannel(new MessageGroupQueue(jdbcChannelMessageStore(), MESSAGE_GROUP));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder()
.transactionManager(transactionManager())
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRED)
.build();
}
@Bean
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
return threadPoolTaskExecutor;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10)
.advice(transactionInterceptor())
.taskExecutor(threadPoolTaskExecutor())
.get();
}
@Bean
public CountDownLatch exceptionLatch() {
return new CountDownLatch(2);
}
@ServiceActivator(inputChannel = "transactionalQueueChannel")
public void handle(Message<?> message) {
System.out.println(message);
try {
throw new RuntimeException("Intentional for rollback");
}
finally {
exceptionLatch().countDown();
}
}
}
}
关于java - Spring 集成 : Persistent and transactional QueueChannel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47264688/