python - 在 confluence-kafka-python 中设置主题日志保留

标签 python apache-kafka confluent-platform

我在文档中找不到如何在使用 confluence-kafka 创建生产者时设置保留时间。

如果我只指定“bootstrap-servers”,则默认保留时间为 1 天。我希望能够改变这一点。

(我想在 python API 中而不是在命令行上执行此操作。)

最佳答案

保留时间是在您创建主题时设置的,而不是在生产者配置上设置的。

如果您的server.properties允许自动创建主题,然后您将获得其中设置的默认值。

否则,您可以使用 AdminClient API发送 NewTopic支持 config 的请求dict<str,str> 的属性

from confluent_kafka.admin import AdminClient, NewTopic

# a = AdminClient(...) 

topics = list()
t = NewTopic(topic, num_partitions=3, replication_factor=1, config={'log.retention.hours': '168'})
topics.append(t)

# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
fs = a.create_topics(topics)

# Wait for operation to finish.
# Timeouts are preferably controlled by passing request_timeout=15.0
# to the create_topics() call.
# All futures will finish at the same time.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

在同一链接中,您可以找到更改主题请求

关于python - 在 confluence-kafka-python 中设置主题日志保留,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44036398/

相关文章:

apache-kafka - 使用 Flink 自定义 avro 消息反序列化

elasticsearch - Kafka Elasticsearch 连接器时间戳

python - 根据 map 中键的存在有选择地检索

ssl - Kafka设置SSL是否影响Zookeeper通信

mysql - Kafka Connect JDBC 连接器查询 + 在初始轮询时使用大数据集增加模式阻塞

java - Teradata 和 Apache Kafka 使用 Kafka 连接

go - 在 pkg-config 搜索路径中找不到包 rdkafka

python - 优化列表理解的并行实现

python - 在scrapy中剥离\n\t\r

python - 如何从名为section的标签中提取数据