apache-kafka - Python confluent kafka 在代理连接断开时引发异常

标签 apache-kafka python-3.7 confluent-kafka-python

我正在使用 python 3.7 和 confluent-kafka。

以下是我用来轮询 kafka 服务器并读取消息的伪代码。

        while True:
            MSG = CONSUMER.poll(0.1)
            if MSG is None:
                CONSUMER.commit()
                print('No msg')
                continue
            if MSG.error():
                print("Consumer error: {}".format(MSG.error()))
                CONSUMER.commit()
                continue
            try:
                rawMsg = format(MSG.value().decode('utf-8'))
                testmsg = json.loads(rawMsg)
            except:
                print('invalid json format msg')
                CONSUMER.commit()

如果 kafka 服务器由于某种原因宕机或断开连接,我希望抛出异常。

目前,如果发生上述情况,while 循环将继续运行而不会出现任何错误并打印No msg

如何在循环中每次获取异常或检查是否可以连接 kafka 服务器(如果要进行一些检查,它应该是轻量级的)

最佳答案

创建消费者时,您可以在反序列化器中指定 a callback for error .

这是在生产者中使用相同机制的示例:

import confluent_kafka
def error_callback(err):
    print("callback hit!")
    raise(err)
p = confluent_kafka.Producer({
    "bootstrap.servers": "localhost:9092",
    "message.max.bytes": 10_000_000,
    "error_cb": error_callback,
    "debug": "msg",
})
p.produce("test-topic", "a" * int(2e6))
p.flush()

From github issues .这可能会有所帮助,但不能解决问题。

关于apache-kafka - Python confluent kafka 在代理连接断开时引发异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64662926/

相关文章:

apache-spark - 在无限流中计数不同

node.js - 有没有办法将融合模式注册表与 kafka-node 模块一起使用?

java - 将 BlockingQueue 传递给 Spring KafkaListener

python - confluence_kafka : how to reliably seek before reading data (avoiding Erroneous state)

apache-kafka - 解释 Kafka 中的复制偏移检查点和恢复点偏移

python - 使用 PEP 563 检查签名

python - dataclasses.asdict() 没有按预期工作

python - 从列表中随机选择时如何在字符串中使用变量?

python - Confluence-Kafka-Python:获取每个主题分区的滞后

python - Confluence kafka python API - 如何获取主题中的分区数量