java - 使用 Java 更新 kafka 中特定主题的 TTL

标签 java apache-kafka kafka-producer-api

更新主题的 TTL,以便记录在主题中保留 10 天。我必须通过保留所有其他主题 TTL 相同的当前配置来针对特定主题执行此操作,我必须使用 java 执行此操作,因为我正在将主题推送到 kafka 通过 Java。我正在设置以下属性以将主题推送到 kafka

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
props.put("acks", ACKS);
props.put("retries", RETRIES);
props.put("linger.ms", new Integer(LINGER_MS));
props.put("buffer.memory", new Integer(BUFFER_MEMORY));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

最佳答案

您可以使用 AdminClient 执行此操作,遵循获取当前配置的代码片段(仅用于测试),然后更新“retention.ms”配置关于名为“test”的主题。

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient adminClient = AdminClient.create(props);

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test");

// get the current topic configuration
DescribeConfigsResult describeConfigsResult  =
        adminClient.describeConfigs(Collections.singleton(resource));

Map<ConfigResource, Config> config = describeConfigsResult.all().get();

System.out.println(config);

// create a new entry for updating the retention.ms value on the same topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "50000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(updateConfig);
alterConfigsResult.all();

describeConfigsResult  = adminClient.describeConfigs(Collections.singleton(resource));

config = describeConfigsResult.all().get();

System.out.println(config);

adminClient.close();

关于java - 使用 Java 更新 kafka 中特定主题的 TTL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45205969/

相关文章:

java - Android 和 Guice - 注入(inject)通用类型类?

java - 如何使用 TouchAction 在 Appium 1.7.1 中滚动

java - 线程中出现异常 "streaming-job-executor-11"java.lang.ClassFormatError

java - maprsteam 与 spring 集成 java 客户端

java - 由 : java. io.NotSerializedException : org. apache.kafka.clients. Producer.KafkaProducer 引起

java - 如何使用 ThymeLeaf 将对象的属性绑定(bind)到隐藏字段?

java - 在面板上添加ActionListener。如何映射到主框架?

spring-boot - 嵌入式kafka无法启动-错误

apache-kafka - Kafka Connect 5.5.0 - 无法重置 max.request.size

apache-kafka - Kafka 主题的理想分区数