java - basicAck 不会从代理中删除消息 - RabbitMQ

标签 java rabbitmq amqp

我在我的应用程序中执行以下流程:

  1. 从 broker 获取 1 条消息(手动确认)

  2. 做一些处理

  3. 在数据库和代理上开始交易

  4. 在数据库中插入一些记录并发布一些消息 broker(不同队列)

  5. 提交数据库和代理

  6. 确认您在第 1 步中从代理处获得的消息。

经纪人的所有操作都是通过单一 channel 完成的。这是准备代码:

Connection brokerConnection = factory.newConnection();              
Channel channel = brokerConnection.createChannel();
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("receive-queue", false, consumer);

以下是我的代码。我删除了 trycatch 部分以使其清晰。我将所有异常记录到文件中。 第 1 步:

QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Request request = (Request) SerializationUtils.deserialize(delivery.getBody());

第 2、3、4、5 步:

dbConnection.setAutoCommit(false);
channel.txSelect();

stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);

dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);

第 6 步:

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

在一次迭代后,我可以看到数据库和代理中的记录(这意味着它在第 5 步之前工作正常)。问题是接收队列中的消息在第 6 步后未被删除,管理插件显示一条未确认的消息。我也没有在日志文件中看到任何异常。谁能帮忙?

[更新1]

现在我创建一个发布 channel 和另一个接收 channel 。现在正在工作。那么如何使用单一 channel 来接收和发布(与交易)?我以前使用单一 channel 进行接收和发布,但没有交易。

[更新2]

我将第 6 步移到了事务中,它现在可以正常工作了。

dbConnection.setAutoCommit(false);
channel.txSelect();

stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 

dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);

我有点困惑。我只希望发布部分在事务内。

最佳答案

您已将 channel 置于事务模式 - 确认是事务性的事情。因此,您要么需要在单独的非事务 channel 上消费和确认,要么接受您的确认需要在 tx.commit 之前。

关于java - basicAck 不会从代理中删除消息 - RabbitMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18247632/

相关文章:

使用辅助方法的 Java 排列递归

rabbitmq - 在 RabbitMQ 3+ 中删除不存在的队列不会产生预期的错误;为什么?

rabbitmq - AMQP basic.get 从队列中拉取并发消费者

java - Spring AMQP - 在启动时将队列注册到容器的正确方法

rabbitmq ReturnCallback 在 NO_ROUTE(312) 上声明队列时卡住

java - JAX-B 仅从输出中排除字段,但允许输入

java - 如何从 MySql PhpMyadmin 本地主机访问用户电子邮件和密码

java - Jetpack 数据绑定(bind)在同一文件中生成了与同一类相同的重复类

java - 从队列中消费时,所有 boolean 字段均为 false

rabbitmq - 允许客户端直接连接到rabbitmq并使用队列是否安全?