我在我的应用程序中执行以下流程:
从 broker 获取 1 条消息(手动确认)
做一些处理
在数据库和代理上开始交易
在数据库中插入一些记录并发布一些消息 broker(不同队列)
提交数据库和代理
确认您在第 1 步中从代理处获得的消息。
经纪人的所有操作都是通过单一 channel 完成的。这是准备代码:
Connection brokerConnection = factory.newConnection();
Channel channel = brokerConnection.createChannel();
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("receive-queue", false, consumer);
以下是我的代码。我删除了 try
、catch
部分以使其清晰。我将所有异常记录到文件中。
第 1 步:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Request request = (Request) SerializationUtils.deserialize(delivery.getBody());
第 2、3、4、5 步:
dbConnection.setAutoCommit(false);
channel.txSelect();
stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);
dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);
第 6 步:
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
在一次迭代后,我可以看到数据库和代理中的记录(这意味着它在第 5 步之前工作正常)。问题是接收队列中的消息在第 6 步后未被删除,管理插件显示一条未确认的消息。我也没有在日志文件中看到任何异常。谁能帮忙?
[更新1]
现在我创建一个发布 channel 和另一个接收 channel 。现在正在工作。那么如何使用单一 channel 来接收和发布(与交易)?我以前使用单一 channel 进行接收和发布,但没有交易。
[更新2]
我将第 6 步移到了事务中,它现在可以正常工作了。
dbConnection.setAutoCommit(false);
channel.txSelect();
stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);
我有点困惑。我只希望发布部分在事务内。
最佳答案
您已将 channel 置于事务模式 - 确认是事务性的事情。因此,您要么需要在单独的非事务 channel 上消费和确认,要么接受您的确认需要在 tx.commit 之前。
关于java - basicAck 不会从代理中删除消息 - RabbitMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18247632/