python - 如何在 Python 中存储 azure.eventhub.common.Offset?

标签 python azure serialization offset azure-eventhub

根据official documentation对于 Azure 事件中心,消费者有责任管理偏移量。引用:

Consumers are responsible for storing their own offset values outside of the Event Hubs service.

但是看看 API doc for event hub Offset class ,很明显它无法提供序列化或其他存储方式。

所以我的问题是:我将如何存储事件中心偏移量?

最佳答案

请仔细引用common.py的源码GitHub 存储库 Azure/azure-event-hubs-pythonOffset class 在第 253 行定义如下。

class Offset(object):
    """
    The offset (position or timestamp) where a receiver starts. Examples:
    Beginning of the event stream:
      >>> offset = Offset("-1")
    End of the event stream:
      >>> offset = Offset("@latest")
    Events after the specified offset:
      >>> offset = Offset("12345")
    Events from the specified offset:
      >>> offset = Offset("12345", True)
    Events after a datetime:
      >>> offset = Offset(datetime.datetime.utcnow())
    Events after a specific sequence number:
      >>> offset = Offset(1506968696002)
    """

    def __init__(self, value, inclusive=False):
        """
        Initialize Offset.
        :param value: The offset value.
        :type value: ~datetime.datetime or int or str
        :param inclusive: Whether to include the supplied value as the start point.
        :type inclusive: bool
        """
        self.value = value
        self.inclusive = inclusive

    def selector(self):
        """
        Creates a selector expression of the offset.
        :rtype: bytes
        """
        operator = ">=" if self.inclusive else ">"
        if isinstance(self.value, datetime.datetime):
            timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
            return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
        if isinstance(self.value, six.integer_types):
            return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
        return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')

根据Offset类的源代码,它只是一个普通的Python类,具有两个属性valueinclusive。您可以简单地将其属性的值存储为 json 字符串或其他字符串,或者只是提取这些值,如下面的示例代码。

from azure.eventhub.common import Offset
offset = Offset("-1")
print(offset.value, offset.inclusive)
# -1 False
print(offset.__dict__)
# {'value': '-1', 'inclusive': False}
import json
offset_json = json.dumps(offset.__dict__)
# '{"value": "-1", "inclusive": false}'

注意:将来,GitHub 存储库 Azure/azure-event-hubs-python将完成向 GitHub 存储库的迁移 Azure/azure-sdk-for-pythonOffset 类的更改被重命名为 EventPosition具有相同属性 valueinclusive 的类。

关于python - 如何在 Python 中存储 azure.eventhub.common.Offset?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58267623/

相关文章:

azure - 如何使用 Azure OAuth 或 Azure API 网关保护 Open xpage REST API

python - 如何调试 `ast.literal_eval` 中的错误?

python - Pandas:多索引二级整数切片

python - 将每年日期更改为每月日期并添加新值以填充每月日期

python - 在 Python 3 中创建随机空白矩阵(二维数组)?

python - Django - 使用 full_clean() 验证管理错误

azure - 如何知道特定 Azure VM 类型是否支持高级磁盘?

azure - 是否可以使用 Azure AD Graph Api 仅列出某个组中的组织联系人?

java - java序列化问题

java - 使用 Jackson/Java 来确保所有序列化到 JSON 都在单引号或双引号内分隔不受信任的数据以转义任何特殊字符?