python - Azure 服务总线 - Python 客户端消息的 AutoRenewTimeout

标签 python azure azureservicebus azure-servicebus-queues azure-servicebus-topics

我正在查看服务 AutoRenewTimeout 功能的文档,并在 Lock Renewal Policy on service-bus messages 上发现了这篇文章。 。

它讨论了AutoRenewTimeout功能,其中消息被锁定一段时间,以便订阅者可以完成消息的处理,或者消息超时(无法在给定的时间内处理消息)时间段),在该时间段内,从同一订阅中阅读的其他订阅者将可以看到该消息。

我无法在 Microsoft Azure SDK for Python 中找到此功能。 我浏览了源代码,它只谈到Manual renewal of the lock针对特定消息。

我的用例如下

  • 我必须读取并处理来自服务总线的消息。
  • 将那些已处理的消息转储到数据库中(在我的例子中为 MongoDB)
  • 推送到服务总线的消息每小时可能高达 100 万个事件(因此不可能跟踪给定消息的发送时间) 超时并触发手动更新)。
  • 所有已处理的消息都会被推送到临时列表中。
  • 每当上述列表增长超过特定阈值时,就会在数据库上进行批量插入

这是我想到的。它没有我所说的锁续订政策。我只是在处理消息时触发删除。

class Event:
    def __read_subscription_message(self):
        try:
            message = self.bus_service.receive_subscription_message(
                self.topic_name, self.subscription_name, peek_lock=True)
            return message
        except Exception as e:
            self.logger.exception("Exception occurred!!!")

    def start_listner(self, task_number=0):
        self.logger.info('Task: %s, started listening to service bus messages' % task_number)
        while True:
            msg = self.__read_subscription_message()
            if msg and msg.body is not None:
                self.currentBackOff = 0
                self.process_event(msg, task_number)
                gevent.sleep(0)

    def process_event(self, msg, task_number=0):
        try:
            if msg.body:
                # message = json.loads(msg.body.decode())
                message = self.deserialize_message_body(msg.body)
                custom_properties = msg.custom_properties
                # Business logic implemented................
                # After processing a message append this to a temp list. Make
                # an insert which length of this list reaches a given threshold
                # by calling "write_to_storage(self, task_number=0)"
                self.bulk_records.append(record)
                msg.delete()
            else:
                self.logger.info("Message received: %s, is of type: %s" % (msg.body, type(msg.body)))
            self.total += 1
        except DeSerializationException as e:
            self.logger.info("Not able to de-serialize message: %s" % msg.body)
            self.logger.exception(e)
        except Exception as e:
            self.logger.exception(e)

    def write_to_storage(self, task_number=0):
        # Write to DB

它工作正常,但如果我的订阅者进程被终止,那么我的临时存储桶中的所有消息(未写入数据库)都会丢失。我希望在将消息写入数据库时​​对消息触发手动“message.delete()”。我认为 AutoRenewLock 是可行的方法,因为消息的锁定持续时间的最大值是 5 分钟,这对我的情况没有帮助。

谢谢

最佳答案

因此您不需要将消息写入临时日志中。如果您使用窥视锁并且锁过期,则消息应该返回队列并在下次被拾取。如果您使用 peek lock,则需要调用您在代码中执行的 message.delete() ,然后才应将其从代理中删除。 这个 .net 示例展示了它应该如何工作: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

为了方便起见: //完成消息,以便不会再次收到该消息。 //只有在 ReceiveMode.PeekLock 模式(默认)下创建queueClient 时才能执行此操作。

等待queueClient.CompleteAsync(message.SystemProperties.LockToken);

另请参阅此处了解窥视锁的工作原理:https://learn.microsoft.com/en-us/rest/api/servicebus/peek-lock-message-non-destructive-read

因此,除非您调用 msg.delete,否则该消息不应算作已完成。您是否在同一订阅或主题上运行多个订阅者?

无论如何,只要您不调用删除,消息就应该返回到队列,并且您的下一个接收调用将接听它们。如果您多次未能收到,他们可能会收到死信。 https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues

请使用服务总线资源管理器查看您是否确实丢失了消息,或者它们是否只是返回队列或变成死信: https://github.com/paolosalvatori/ServiceBusExplorer/releases

关于python - Azure 服务总线 - Python 客户端消息的 AutoRenewTimeout,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48180658/

相关文章:

Python 扭曲 : Functions are not called properly?

python - 设置matplotlib表格的行边缘颜色

python - 将 pd 数据框填写到现有的 excel 表中(使用 openpyxl v2.3.2)

azure - 将 Azure blob 升级到 DataLake Gen 2 时验证失败

c# - 限制服务总线消息接收的 Azure Functions 速率

python - 用 Python 调整终端大小?

azure - 将 Azure 策略定义部署到管理组权限

sql - 插入值语句只能包含 SQL 数据仓库中的常量文字值或变量引用

.net - Azure.Messaging.ServiceBus 无法配置 TransportType

azure - 将大于 256KB 的事件推送到 Azure EventHub