我在使用以下代码的 Jupyter 笔记本中偶然发现了“NoBrokersAvailable: NoBrokersAvailable”错误:
from kafka import KafkaProducer
from kafka.errors import KafkaError
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
INTERVAL =10
while True:
data_points = get_realtime_stock('AAPL')
data = {'updated_on': data_points['updated_on'], 'ticker': data_points['security']['ticker'] ,'last_price': data_points['last_price']}
message = data_points
producer.send('data1', value=data).add_callback(on_send_success).add_errback(on_send_error)
time.sleep(INTERVAL)
这里各自的错误:
---------------------------------------------------------------------------
NoBrokersAvailable Traceback (most recent call last)
<ipython-input-8-cab724428b84> in <module>
11 # handle exception
12
---> 13 producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
14 INTERVAL =10
15 while True:
~/anaconda3/lib/python3.7/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
379 client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
380 wakeup_timeout_ms=self.config['max_block_ms'],
--> 381 **self.config)
382
383 # Get auto-discovered version from client if necessary
~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in __init__(self, **configs)
237 if self.config['api_version'] is None:
238 check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 239 self.config['api_version'] = self.check_version(timeout=check_timeout)
240
241 def _can_bootstrap(self):
~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
890 else:
891 self._lock.release()
--> 892 raise Errors.NoBrokersAvailable()
893
894 def wakeup(self):
NoBrokersAvailable: NoBrokersAvailable
代码工作得很好,但不知何故它突然停止工作。
有谁知道问题可能是什么?
最佳答案
我遇到了同样的错误,我通过在函数 KafkaProducer 上指定 API 版本来解决它。这是我的代码中的一个示例。
如果错误仍然存在,请指定您的 kafka-python 库的版本。
producer = KafkaProducer(
bootstrap_servers=#####,
client_id=######,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1)
)
对于 API 版本,您应该放置您的 Kafka 版本。
关于python - NoBrokersAvailable : NoBrokersAvailable Error in Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60092690/