python - kafka-python消费者没有收到消息

标签 python apache-kafka kafka-consumer-api kafka-python

我在使用 KafaConsumer 时遇到问题,无法让它从头读取,或者从任何其他显式偏移读取。

为同一主题的消费者运行命令行工具,我确实看到带有 --from-beginning 选项的消息,否则它会挂起

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning

如果我通过 python 运行它,它会挂起,我怀疑这是由不正确的消费者配置引起的

consumer = KafkaConsumer(topic_name,
                     bootstrap_servers=['localhost:9092'],
                     group_id=None,
                     auto_commit_enable=False,
                     auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
    print "Message", message
    if message is not None:
        print message.offset, message.value

print "Quit"

输出:

消费来自给定主题的消息 (在那之后挂起)

我使用的是 kafka-python 0.9.5,代理运行的是 kafka 8.2。不确定确切的问题是什么。

按照 dpkp 的建议设置 _group_id=None_ 以模拟控制台消费者的行为。

最佳答案

控制台消费者和您发布的 python 消费者代码之间的区别是 python 消费者使用消费者组来保存偏移量:group_id="test-consumer-group"。相反,如果您设置 group_id=None,您应该会看到与控制台消费者相同的行为。

关于python - kafka-python消费者没有收到消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35217603/

相关文章:

apache-kafka - 合并kafka流中的记录

message-queue - Kafka中如何同时实现分布式处理和高可用?

apache-kafka - 什么是 kafka 中的 delete.topic.enable

windows - Kafka 1.0 因 FATAL SHUTDOWN 错误而停止。日志目录失败

java - 优雅地关闭具有静态成员身份的 Kafka Consumer

java - 如何在 Kafka 中为消费者发送 OffsetCommitRequest?

python - 打开大结果 mat-File 会引发缓冲区对于请求的数组来说太小

python - 如何从左到右解包元组?

Python 字符串内的乘法值

python - 获取 Jinja for 循环中的当前值和下一个值