python - 如何从 confluent_python AVRO 消费者获取最新的偏移值

标签 python confluent-platform kafka-python

我是 confluent_kafka 的新手,但我已经获得了一些使用 kafka-python 的经验。我想要做的是更改开始使用消息的偏移量。这就是为什么我想构建一个能够返回到以前的消息以返回将填充仪表板的数据的消费者客户端。说使用 kafka-python 包我可以使用 seek_to_end ( https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L788 ) 方法来获取最新提交的位置值。有了它,我可以减去值并使用 seek 方法(https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L738)返回到以前的消息

另一方面,conflient_kafka 似乎没有类似的功能,到目前为止我发现的是使用值为 -1 的变量 OFFSET_END 并且它不返回我的偏移数值最新最大的一个。我也可以使用“搜索”功能,但我需要一种方法来获得最新偏移量的数值,而不是 -1

我的 avro 消费者看起来像

from confluent_kafka.avro import AvroConsumer

if __name__ == '__main__':
     c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
                  'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'smallest'}})

def my_assign (consumer, partitions):
    for p in partitions:
        p.offset = confluent_kafka.OFFSET_END
        print("offset=",p.offset)
    print('assign', partitions)
    print('position:',consumer.position(partitions))
    consumer.assign(partitions)

c.subscribe(["mytopic"],on_assign=my_assign)

while True:
    m = c.poll(1)
    if m is None:
        continue

    if m.error() is None:
        print('Received message', m.value(),m.offset())
c.close()

产生以下结果:

offset= -1
assign [TopicPartition{topic=mytopic,partition=0,offset=-1,error=None}]
position: [TopicPartition{topic=mytopic,partition=0,offset=-1001,error=None}]

并等待下一条消息。我想知道是否有人可以帮助我。谢谢

最佳答案

您可以使用 Consumer.get_watermark_offsets(参见 docs)

例子:

cfg = {
    # ... ...
    "group.id": str(uuid4())
}
consumer = AvroConsumer(cfg)
topic_partition = TopicPartition("topic-name", partition=123)
low, high = consumer.get_watermark_offsets(topic_partition)
print("the latest offset is {}".format(high))

关于python - 如何从 confluent_python AVRO 消费者获取最新的偏移值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49968377/

相关文章:

python - 无法使用 Kafka-Python 的反序列化器从 Kafka 消费 JSON 消息

python - 为什么我的 Kafka 消费者比我的 Kafka 生产者慢得多?

python - Python subprocess.Popen运行ssh的结果异常

linux - 嵌套脚本在 Docker 镜像中无法找到彼此

json - JSONConverter 的 Kafka Connect 架构格式

postgresql - Kafka Connect - JSON 转换器 - JDBC 接收器连接器 - 列类型 JSON

python - Kafka 10 - 具有身份验证和授权的 Python 客户端

python - 导入错误 : cannot import name HTTPSHandler installing get-pip.

python - 忽略 Django 模型中的一个字段

python - Boto 在 ec2 实例上执行 shell 命令