Python Kafka消费者读取已读消息

标签 python apache-kafka kafka-consumer-api

Kafka 消费者代码 -

def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, "data")
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    consumer_timeout_ms=1000,
    group_id="Group2",
    enable_auto_commit=False,
    auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
consumer.seek_to_beginning(topic_partition)
for message in consumer:
    print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()

预期行为 它应该只读取生产者写入的最后一条消息。它应该只打印:

file_data key=None value=b'data'

当前行为 运行代码后打印:

file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'

最佳答案

from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import KafkaProducer

def test():
    TOPIC = "file_data"
    producer = KafkaProducer()
    producer.send(TOPIC, b'data')
    consumer = KafkaConsumer(
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='latest',
        consumer_timeout_ms=1000,
        group_id="Group2",
        enable_auto_commit=False,
        auto_commit_interval_ms=1000
    )
    topic_partition = TopicPartition(TOPIC, 0)
    assigned_topic = [topic_partition]
    consumer.assign(assigned_topic)
    # consumer.seek_to_beginning(topic_partition)
    for message in consumer:
        print("%s key=%s value=%s" % (message.topic, message.key, message.value))
    consumer.commit()
test()

这正在按照您的预期进行。如果您希望它从头开始,则只需调用 seekToBeginning

引用号:seek_to_beginning

关于Python Kafka消费者读取已读消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62482984/

相关文章:

apache-kafka - kafka 中的消息路由

python - 如何在 python 中使用 tail 按值拆分字符串

python - Django - Mac 开发,环境 hell

performance - 什么可能导致 Kafka `__consumer_offsets` 主题中的巨大负载?

java - 卡夫卡 : How to re-consume un committed/not Acknowledged message

apache-kafka - 为什么 Kafka Consumer 不断收到相同的消息(offset)

python - 我正在尝试通过电子邮件发送随机数,但我不断收到错误消息

python - 使用 Python 计算目录中的代码行数

apache-kafka - Kafka Connect - 无法刷新,在等待生产者刷新未完成的消息时超时

apache-kafka - 平衡 Kafka 消费者