python - KafkaError Unsupported compression codec 0x3 with Confluent Python Kafka consumer

标签 python linux apache-kafka confluent-platform

我在笔记本电脑(Ubuntu 17)上设置了 Confluent Python Kafka 消费者,一切正常,我可以收听远程主题并接收消息。

但是当我尝试在服务器 (Ubuntu 16) 上设置它时,似乎出现了压缩问题。数据来自Divolte,使用LZ4压缩。

第一次连接到主题时,数据接收成功,没有任何错误,但是在关闭并重新打开消费者后,接收到第一条消息并抛出错误:

<cimpl.Message object at 0x7f089db67180>
KafkaError{code=_NOT_IMPLEMENTED,val=-170,str="Unsupported compression codec 0x3"}

我认为它不是来自 Divolte 数据源,而更像是来自 Kafka 的消息,但我无法读取它的值,因为错误发生在之前(打印 msg 然后我们跳转到 elif 以获取错误):

c = Consumer({'bootstrap.servers': server['server'], 'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe([server['topic']])
running = True
while running:
    msg = c.poll()
    print(msg)
    if not msg.error():
        msg_value = msg.value()
        print(msg_value)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

最佳答案

听起来像是应该在 0.10.0 或更高版本中修复的问题

https://issues.apache.org/jira/browse/KAFKA-3160

关于python - KafkaError Unsupported compression codec 0x3 with Confluent Python Kafka consumer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44498308/

相关文章:

python - 使用索引向后循环

c++ - Kinect:如何使用 openni 将深度坐标映射到世界坐标?

linux - linux进程调度器如何防止进程饥饿

node.js - kafka 客户端正在向发生故障的代理分区发送请求

apache-kafka - 为什么 KSQL 查询从流-流连接创建的流中返回空值?

python - 在 geopandas 中使用多多边形 shapefile 剪切 shapefile

python - Elasticsearch-DSL过滤器出现意外结果

python - Beautifulsoup html 解析损坏了 <link> 标签

linux - 像 grep 一样的 perl 单行代码?

java - 如何选择一个Kafka transaction.id