java - 确保至少一个消费者收到在主题交换上发布的消息

标签 java python rabbitmq distributed-computing rabbitmq-exchange

TLDR;在消费者动态创建的主题交换和队列的上下文中,如何在没有消费者消费消息时重新传递消息/通知生产者?

我有以下组件:

  • 主要服务,生成文件。每个文件都有特定的类别(例如,pictures.profile、pictures.gallery)
  • 一组工作人员,使用文件并从中产生文本输出(例如文件的大小)

  • 我目前有一个 RabbitMQ 主题交换。
  • 生产者使用 routing_key = file_category 向交换发送消息.
  • 每个消费者创建一个队列并将交换绑定(bind)到该队列以获得一组路由键(例如pictures.* 和videos.trending)。
  • 当消费者处理完一个文件时,它会将结果推送到 processing_results 队列中。

  • 现在 - 这工作正常,但它仍然有一个主要问题。目前,如果发布者发送带有没有消费者绑定(bind)的路由键的消息,则该消息将丢失。这是因为即使消费者创建的队列是持久的,一旦消费者断开连接它就会被销毁,因为它是这个消费者唯一的 .

    消费者代码(python):
    channel.exchange_declare(exchange=exchange_name, type='topic', durable=True)
    result = channel.queue_declare(exclusive = True, durable=True)
    queue_name = result.method.queue
    
    topics = [ "pictures.*", "videos.trending" ]
    for topic in topics:
        channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=topic)
    
    channel.basic_consume(my_handler, queue=queue_name)
    channel.start_consuming()
    

    在我的用例中,在这种情况下丢失消息是 Not Acceptable 。

    尝试的解决方案

    但是,“丢失”消息是可以接受的 如果通知生产者没有消费者收到消息 (在这种情况下,它可以稍后重新发送)。我发现必填字段可能会有所帮助,因为 AMQP 的规范指出:

    This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method.



    这确实有效 - 在制作人中,我可以注册 ReturnListener :
    rabbitMq.confirmSelect();  
    
    rabbitMq.addReturnListener( (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) -> {
        log.info("A message was returned by the broker");
    });
    rabbitMq.basicPublish(exchangeName, "pictures.profile", true /* mandatory */, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBytes);
    

    这将按预期打印 A message was returned by the broker如果使用路由键发送消息,则没有消费者绑定(bind)到。

    现在,我还想知道消息何时被消费者正确接收。所以我尝试注册一个 ConfirmListener还有:
    rabbitMq.addConfirmListener(new ConfirmListener() {
        void handleAck(long deliveryTag, boolean multiple) throws IOException {
            log.info("ACK message {}, multiple = ", deliveryTag, multiple);
        }
    
        void handleNack(long deliveryTag, boolean multiple) throws IOException {
            log.info("NACK message {}, multiple = ", deliveryTag, multiple);
        }
    });
    

    这里的问题是发送了 ACK 经纪人 ,而不是消费者本身。所以当生产者发送带有路由键 的消息时K :
  • 如果消费者绑定(bind)到这个路由键,代理只发送一个 确认
  • 否则,代理发送 basic.return 后跟一个 确认

  • 参见文档:

    For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).



    因此,虽然我的问题理论上可以使用它来解决,但它会使了解消息是否被正确使用的逻辑变得非常复杂(尤其是在多线程、数据库中的持久性等上下文中):
    send a message
    
    on receive ACK:
        if no basic.return was received for this message
           the message was correctly consumed
        else
           the message wasn't correctly consumed
    
    on receive basic.return
        the message wasn't correctly consumed
    

    可能的其他解决方案
  • 每个文件类别都有一个队列,即队列pictures_profile、pictures_gallery等。不好,因为它为消费者消除了很多灵活性
  • 在生产者中有一个“响应超时”逻辑。生产者发送消息。它期望在 processing_results 队列中为该消息提供“答案”。一个解决方案是,如果在 X 秒后仍未得到答复,则重新发送该消息。不过我不喜欢它,它会在生产者中创建一些额外的棘手逻辑。
  • 生产 TTL 为 0 的消息,并让生产者监听死信交换。这是官方suggested solution替换在 RabbitMQ 3.0 中删除的“立即”标志(参见“立即”标志的删除段落)。根据 the docs在死信交换中,死信交换只能按队列配置。所以它在这里不起作用
  • [编辑] 我看到的最后一个解决方案是让每个消费者创建一个持久队列,当他断开连接时不会被破坏,并让它监听它。示例:consumer1创建 queue-consumer-1这与 myExchange 的消息绑定(bind)拥有路由 key abcd .我预见的问题是它意味着为每个消费者应用程序实例找到一个唯一标识符(例如它运行的机器的主机名)。


  • 我很乐意就此提供一些意见 - 谢谢!

    相关:
  • RabbitMQ: persistent message with Topic exchange (此处不适用,因为队列是“动态”创建的)
  • Make sure the broker holds messages until at least one consumer gets it
  • RabbitMQ Topic Exchange with persisted queue

  • [编辑] 解决方案

    如前所述,我最终实现了使用 basic.return 的东西。实际上实现起来并不那么棘手,你只需要确保你的方法产生消息和处理基本返回的方法是同步的(或者如果不在同一个类中则有一个共享锁),否则你最终会得到交错的执行流程会弄乱您的业务逻辑。

    最佳答案

    我相信an alternate exchange对于有关识别未路由消息的部分,将最适合您的用例。

    Whenever an exchange with a configured AE cannot route a message to any queue, it publishes the message to the specified AE instead.



    基本上在创建“主”交换时,您为其配置一个备用交换。
    对于引用的备用交换,我倾向于使用扇出,然后创建一个绑定(bind)到它的队列 (notroutedq)。
    这意味着任何未发布到至少一个绑定(bind)到“主”交换的队列的消息都将在 notroutedq 中结束

    现在关于你的陈述:

    because even if the queue created by the consumers is durable, it is destroyed as soon as the consumer disconnects since it is unique to this consumer.



    似乎您已将队列配置为将自动删除设置为 true。
    如果是这样,如果断开连接,如您所说,队列将被破坏,队列中仍然存在的消息将丢失,备用交换配置未涵盖的情况 .

    从您的用例描述中不清楚您是否希望在某些情况下消息最终出现在多个队列中,似乎更像是每种预期处理类型都有一个队列(同时保持分组灵活)。如果队列拆分确实与处理类型有关,我看不到使用自动删除设置队列的好处,希望在您想要更改绑定(bind)时可能不必进行任何清理维护。

    假设您可以使用持久队列,那么 dead letter exchange (将再次与扇出一起使用)绑定(bind)到 dlq 将覆盖丢失的情况。
  • 没有被备用交换机覆盖的路由
  • 您的 processing_result 队列已经处理了正确的处理
  • 有问题的处理或处理时间过长而无法被死信交换覆盖,在这种情况下,在死信消息中添加的附加 header 甚至可以帮助识别要采取的操作类型
  • 关于java - 确保至少一个消费者收到在主题交换上发布的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42932933/

    相关文章:

    java - DecimalFormat 和定点数的舍入

    python - 在 python 中使用链表实现堆栈。 pop 方法的问题和有关可变性的问题

    rabbitmq - 启用 RabbitMQ 管理插件失败

    java - 尝试在 Java 中简单地实现 MD5 散列

    java - 指定随机机会

    java - 在 session 中调用对象的方法而不进行类型转换

    rabbitmq - 为什么 EasyNetQ 在进行 ILMerged 后会默默地失败?

    python - 通过 ManyToManyField = Value 订购一个 Django 查询集

    python - Pyside2 我怎样才能移动盒子?

    rabbitmq - 您如何使用多条消息回复 RabbitMQ RPC 客户端?