我正在编写一个脚本来刷新我在 AWS 托管的 Kafka 集群上的主题。每当运行脚本时,我都需要清除现有数据,我通过删除并再次创建相同的主题来完成此操作。我希望当我重复运行该脚本时,该脚本能够打印出成功删除和成功创建的信息。但是删除/创建会失败,这让我很困惑。
以下是我的脚本:
# manage_topics.py
import sys
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaError, KafkaException
if __name__ == '__main__':
kafka_cfg = '.....' # omitted
admin_client = AdminClient(kafka_cfg)
deletion_ret = admin_client.delete_topics(['my-test-topic1'])
for topic, delete_fut in deletion_ret.items():
try:
status = delete_fut.result()
print(f'{topic} deletion is successful. status={status}')
except KafkaException as e:
print(f'could not delete topic: {topic}, error: {str(e)}')
if e.args[0].code() != KafkaError.UNKNOWN_TOPIC_OR_PART:
print('exiting...')
sys.exit(1)
else:
print('ignoring UNKNOWN_TOPIC_OR_PART error')
# I have two brokers for the Kafka instance I was given
creation_ret = admin_client.create_topics([NewTopic('my-test-topic1', 5, 2)])
for topic, create_fut in creation_ret.items():
try:
status = create_fut.result()
print(f'{topic} creation is successful. status={status}')
except KafkaException as e:
print(f'could not create topic: {topic}, error: {str(e)}')
这是它生成的日志。每次运行之间等待多长时间并不重要。在我看来,当成功删除后进行创建时,删除需要花费一些时间,因此接下来的创建将会失败。当我再次运行时,之前的删除已经完成,然后当前的删除失败,创建成功。
如果有人能帮助我理解和改进这个脚本,我将非常感激。
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
最佳答案
主题删除是服务器端的异步操作。您 future 的结果仅捕获请求的响应(主题被标记为删除),而不实际集群删除所有副本。
关于python - 如何在Kafka脚本中正确删除然后创建主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73115898/