python - Kafka Consumer未获取所有消息

标签 python apache-kafka producer-consumer

每当在 Kafka 中创建新主题时,我都尝试启动动态消费者,但动态启动的消费者总是缺少起始/第一条消息,但从那里开始消费消息。我正在使用 kafka-python 模块,并使用更新的 KafkaConsumer 和 KafkaProducer。

生产者的代码是

producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)

消费者的代码是

consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')

请提出一些建议来解决此问题或我必须在生产者和消费者实例中包含的任何配置。

最佳答案

能否将 auto_offset_reset 设置为最早。

当创建新的消费者流时,它从最新的偏移量(这是 auto_offset_reset 的默认值)开始,您将错过消费者未启动时发送的消息。

您可以在 kafka python doc 中阅读相关内容。相关部分如下

auto_offset_reset (str) – A policy for resetting offsets on OffsetOutOfRange errors: ‘earliest’ will move to the oldest available message, ‘latest’ will move to the most recent. Any ofther value will raise the exception. Default: ‘latest’.

关于python - Kafka Consumer未获取所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35431128/

相关文章:

java - 在 JavaME : how to optimize it? 中实现阻塞队列

rest - Kafka 消息 VS REST 调用

c++ - rdkafka 消费者查询分区大小

python - 如何在各个字符串中分隔其间具有可变数量空格的长字符串

python - 整个 python 应用程序中的单个数据库连接(遵循单例模式)

python - 如何判断函数来自哪个模块?

python - 将Python与opencv结合使用以实现图像拼接

Spring Boot Kafka : Commit cannot be completed since the group has already rebalanced

python : Kafka consumer offset commit in the background

c# - 如何限制 BlockingCollection 大小但不断添加新的 itens(.NET 限制大小 FIFO)?