Java Kafka adminClient 主题配置。配置值被覆盖

标签 java apache-kafka

尝试使用 java kafka adminClient 配置新创建的 kafka 主题时,值被覆盖。

我尝试使用控制台命令设置相同的主题配置,并且它有效。不幸的是,当我尝试使用 Java 代码时,一些值发生冲突并被覆盖。

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Map<ConfigResource, Config> updateConfig = new HashMap<>();

// update retention Bytes for this topic
ConfigEntry retentionBytesEntry = new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(retentionBytes));
updateConfig.put(resource, new Config(Collections.singleton(retentionBytesEntry)));

// update retention ms for this topic
ConfigEntry retentionMsEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs));
updateConfig.put(resource, new Config(Collections.singleton(retentionMsEntry)));

// update segment Bytes for this topic
ConfigEntry segmentBytesEntry = new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(segmentbytes));
updateConfig.put(resource, new Config(Collections.singleton(segmentBytesEntry)));

// update segment ms for this topic
ConfigEntry segmentMsEntry = new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(segmentMs));
updateConfig.put(resource, new Config(Collections.singleton(segmentMsEntry)));

// Update the configuration
client.alterConfigs(updateConfig);

我希望主题正确地包含所有给定的配置值。

最佳答案

您的逻辑无法正常工作,因为您使用同一键多次调用 Map.put() 。因此仅保留最后一个条目。

指定多个主题配置的正确方法是将它们添加到ConfigEntry对象中。仅在将 ConfigEntry 添加到 Map 后。

例如:

// Your Topic Resource
ConfigResource cr = new ConfigResource(Type.TOPIC, "mytopic");

// Create all your configurations
Collection<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(segmentbytes)));
entries.add(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(retentionBytes)));
...

// Create the Map
Config config = new Config(entries);
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(cr, config);

// Call alterConfigs()
admin.alterConfigs(configs);

关于Java Kafka adminClient 主题配置。配置值被覆盖,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54024829/

相关文章:

java - 在java中使用kafka文件连接器读取文件

apache-spark - 将 Apache Spark 结果发布到另一个应用程序/Kafka

java - 如何让 Java Iterable 从扩展泛型返回基本子类型

java - java AES解密函数有什么问题?

java - 在 Tomcat 和 Websphere 上部署时有什么区别?

java - Zookeeper 属性仍然是 Kafka 消费者的一个要求吗?

apache-kafka - Kafka 生产者和消费者信息

java - java中的二维数组列表

java - 如何在 Vaadin 8 中添加网格过滤器?

pyspark - 在 spark streaming/structured streaming 中读取来自 Kafka 的 avro 消息