每当在 Kafka 中创建新主题时,我都尝试启动动态消费者,但动态启动的消费者总是缺少起始/第一条消息,但从那里开始消费消息。我正在使用 kafka-python 模块,并使用更新的 KafkaConsumer 和 KafkaProducer。
生产者的代码是
producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
消费者的代码是
consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
请提出一些建议来解决此问题或我必须在生产者和消费者实例中包含的任何配置。
最佳答案
能否将 auto_offset_reset 设置为最早。
当创建新的消费者流时,它从最新的偏移量(这是 auto_offset_reset 的默认值)开始,您将错过消费者未启动时发送的消息。
您可以在 kafka python doc 中阅读相关内容。相关部分如下
auto_offset_reset (str) – A policy for resetting offsets on OffsetOutOfRange errors: ‘earliest’ will move to the oldest available message, ‘latest’ will move to the most recent. Any ofther value will raise the exception. Default: ‘latest’.
关于python - Kafka Consumer未获取所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35431128/