python - 如何在Kafka脚本中正确删除然后创建主题?

标签 python apache-kafka confluent-kafka-python

我正在编写一个脚本来刷新我在 AWS 托管的 Kafka 集群上的主题。每当运行脚本时,我都需要清除现有数据,我通过删除并再次创建相同的主题来完成此操作。我希望当我重复运行该脚本时,该脚本能够打印出成功删除和成功创建的信息。但是删除/创建会失败,这让我很困惑。

以下是我的脚本:

# manage_topics.py
import sys
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import KafkaError, KafkaException
if __name__ == '__main__':
    kafka_cfg = '.....' # omitted  
    admin_client = AdminClient(kafka_cfg)

    deletion_ret = admin_client.delete_topics(['my-test-topic1'])
    for topic, delete_fut in deletion_ret.items():
        try:
            status = delete_fut.result()
            print(f'{topic} deletion is successful. status={status}')
        except KafkaException as e:
            print(f'could not delete topic: {topic}, error: {str(e)}')
            if e.args[0].code() != KafkaError.UNKNOWN_TOPIC_OR_PART:
                print('exiting...')
                sys.exit(1)
            else:
                print('ignoring UNKNOWN_TOPIC_OR_PART error')

    # I have two brokers for the Kafka instance I was given
    creation_ret = admin_client.create_topics([NewTopic('my-test-topic1', 5, 2)])
    for topic, create_fut in creation_ret.items():
        try:
            status = create_fut.result()
            print(f'{topic} creation is successful. status={status}')
        except KafkaException as e:
            print(f'could not create topic: {topic}, error: {str(e)}')

这是它生成的日志。每次运行之间等待多长时间并不重要。在我看来,当成功删除后进行创建时,删除需要花费一些时间,因此接下来的创建将会失败。当我再次运行时,之前的删除已经完成,然后当前的删除失败,创建成功。

如果有人能帮助我理解和改进这个脚本,我将非常感激。

$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$
$ python manage_topics.py
could not delete topic: my-test-topic1, error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Broker: Unknown topic or partition"}
ignoring UNKNOWN_TOPIC_OR_PART error
my-test-topic1 creation is successful. status=None
$
$
$ python manage_topics.py
my-test-topic1 deletion is successful. status=None
could not create topic: my-test-topic1, error: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'my-test-topic1' already exists."}
$
$

最佳答案

主题删除是服务器端的异步操作。您 future 的结果仅捕获请求的响应(主题被标记为删除),而不实际集群删除所有副本

相关Kafka - delete a topic completely

关于python - 如何在Kafka脚本中正确删除然后创建主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73115898/

相关文章:

java - 如何测试 ConsumerAwareRebalanceListener?

java - org.apache.kafka.common.errors.TimeoutException : Topic not present in metadata after 60000 ms

apache-spark - Spark 结构化流 - 限制? (源性能、不支持的操作、Spark UI)

python - confluence_kafka : how to reliably seek before reading data (avoiding Erroneous state)

python,如何将 pandas 系列转换为 pandas DataFrame?

python - 如何在 pyspark 的高基数分类列中有效地对低频计数级别进行分组?

python - 使用子路径在 Apache 反向代理后面重定向 Flask 登录

python - Tensorflow:使用 as_default().__enter__() 设置默认 session

python - 从 python-kafka 转换为 confluent kafka - 如何使用 SASL_SSL、OAUTHBEARER 和 Tokens 创建奇偶校验

python - 如何以编程方式更新 Confluent Schema Registry 中的主题架构和兼容性