python - NoBrokersAvailable : NoBrokersAvailable Error in Kafka

标签 python apache-kafka

我在使用以下代码的 Jupyter 笔记本中偶然发现了“NoBrokersAvailable: NoBrokersAvailable”错误:

from kafka import KafkaProducer
from kafka.errors import KafkaError

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
INTERVAL =10
while True:
    data_points = get_realtime_stock('AAPL')
    data = {'updated_on': data_points['updated_on'], 'ticker': data_points['security']['ticker'] ,'last_price': data_points['last_price']}
    message = data_points
    producer.send('data1', value=data).add_callback(on_send_success).add_errback(on_send_error)
    time.sleep(INTERVAL)

这里各自的错误:
---------------------------------------------------------------------------
NoBrokersAvailable                        Traceback (most recent call last)
<ipython-input-8-cab724428b84> in <module>
     11     # handle exception
     12 
---> 13 producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
     14 INTERVAL =10
     15 while True:

~/anaconda3/lib/python3.7/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
    379         client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
    380                              wakeup_timeout_ms=self.config['max_block_ms'],
--> 381                              **self.config)
    382 
    383         # Get auto-discovered version from client if necessary

~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in __init__(self, **configs)
    237         if self.config['api_version'] is None:
    238             check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 239             self.config['api_version'] = self.check_version(timeout=check_timeout)
    240 
    241     def _can_bootstrap(self):

~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
    890         else:
    891             self._lock.release()
--> 892             raise Errors.NoBrokersAvailable()
    893 
    894     def wakeup(self):

NoBrokersAvailable: NoBrokersAvailable

代码工作得很好,但不知何故它突然停止工作。
有谁知道问题可能是什么?

最佳答案

我遇到了同样的错误,我通过在函数 KafkaProducer 上指定 API 版本来解决它。这是我的代码中的一个示例。

如果错误仍然存​​在,请指定您的 kafka-python 库的版本。

producer = KafkaProducer(
    bootstrap_servers=#####,
    client_id=######,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

对于 API 版本,您应该放置您的 Kafka 版本。

关于python - NoBrokersAvailable : NoBrokersAvailable Error in Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60092690/

相关文章:

Python + 外籍人士 : Error on &#0; entities

api - Kubernetes POD的主机名为env

mysql - 提高 pentaho 表输入步骤性能

python - 错误:#<Chef::Node::Attribute:0x00000003f78ef8> 的未定义方法 `python'

python - 正则表达式匹配 - Python - 任意数量的字符

来自外部部分的 Python ConfigParser 插值

python - 当依赖项更改时, 'intelligently' 在 Python 中重置记忆化属性值的最佳方法

go - 在 kubernetes 中处理 kafka 客户端更新

authentication - 使用带有 SSL 加密但无身份验证的 Kafka(无服务器验证或客户端身份验证)

java - Apache Kafka 异常 : org. apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;)V