我发现 PyKafka 有异常行为,这是我最近才开始使用的客户端。
错误如下:
Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}
错误的来源在这几行:
self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
consumer_group=bytes(self.consumer_group,"UTF-8"),
auto_commit_enable=True
)
调试我看到客户端使用正确的字符串 IP 连接到种子代理但是当检索代理列表时,它们的 IP 是二进制的,当 PyKafka 尝试再次连接以创建消费者时,这些 IP 显然不'工作。
另一个可能相关的问题是我需要自己将主题名称和消费者组名称转换为字节(与其他客户端一样),但文档中的所有示例都显示了字符串的用法。
Kafka 代理版本:0.10.1.0 PyKafka版本:2.7.0
最佳答案
好吧,我完全被误导了:那不是 IP,而是 base64 中的主机名(由 Docker 生成)。
关于python - PyKafka 元数据以字节而不是字符串为单位,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50295995/