apache-kafka - kafka-python 消费者给出错误

标签 apache-kafka kafka-consumer-api kafka-python

我对 kafka 和 kafka-python 相当陌生。安装 kafka-python 后,我尝试从这里简单地实现消费者代码 - http://kafka-python.readthedocs.io/en/master/usage.html

我一直在 kafka 的 bin 目录中编写消费者代码,并尝试从那里运行 python 代码。但是我收到以下错误:

Traceback (most recent call last): File "KafkaConsumer.py", line 4, in for message in consumer: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 915, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 876, in _message_generator for msg in self._fetcher: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 520, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 477, in _message_generator for msg in self._unpack_message_set(tp, messages): File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 372, in _unpack_message_set inner_mset = msg.decompress() File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 121, in decompress assert has_snappy(), 'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported

这是我一直在尝试运行的代码:

from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
    print("%s:%d%d: key=%s value=%s"  % (message.topic, message.partition, message.offset, message.key, message.value))

因为我对 Kafka 很陌生,所以我很难理解我做错了什么。

最佳答案

您似乎缺少 python-snappy,它需要读取以 snappy 格式压缩的数据。

您需要 snappysnappy-devel,您可以使用 yum、apt-get 等安装它们。 然后尝试 pip install python-snappy

关于apache-kafka - kafka-python 消费者给出错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39669024/

相关文章:

java - 我们可以在断电的情况下丢失 kafka 消息吗?

java - Kafka对同一主题的consumer数量有限制吗?

apache-kafka - 如何使用 Python-kafka 管理客户端在 Kafka 中创建主题?

apache-kafka - 有没有办法为 kafka 生产者发送的消息设置延迟?

apache-kafka - Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

java - 如何修复Gradle Build中的 “Failed to process kafka-clients-1.1.1.jar”错误

c# - Kafka 非常高的延迟 C#

apache-kafka - 对于 AvroProducer 到 Kafka, "key"和 "value"的 avro 模式在哪里?

用于消费的python kafka不起作用

java - Kafka Consumer不消费java中的所有记录