java - Confluent Cloud 上的 Kafka 流 : 'segment.ms' with value '600000' exceeded min limit of 14400000 for internal repartition topic

标签 java apache-kafka apache-kafka-streams confluent-platform

使用 kafka 流版本 2.1.0 在 Confluent Cloud 上运行 kafka streams app启动时出现如下错误:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Config property 'segment.ms' with value '600000' exceeded min limit of 14400000.

完整的调用堆栈:

at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:143)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:967)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:525)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:403)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:569)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:95)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:521)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:870)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:850)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:397)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:818)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

无效值来自 RepartitionTopicConfig

 private static final Map<String, String> REPARTITION_TOPIC_DEFAULT_OVERRIDES;
    static {
        final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
        tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800");               // 50 MB
        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800");                     // 50 MB
        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000");                          // 10 min
        tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Long.MAX_VALUE));  // Infinity
        REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
    }

最佳答案

最终通过将 StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG) -> "14400000" 添加到 StreamsConfig 来修复

关于java - Confluent Cloud 上的 Kafka 流 : 'segment.ms' with value '600000' exceeded min limit of 14400000 for internal repartition topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54503224/

相关文章:

java - 为什么在类文件中使用 UTF-8 而在运行时使用 UTF-16?

apache-kafka - 在kafka中创建主题时出错

java - 如果添加新分区,会丢失 Kafka Streams 中的消息吗?

java - Kafka Streams : topic. Compression.type 不是已知配置

apache-kafka - 无法锁定任务 0_13 的状态目录

java - 为什么我们只在 Java 中使用 (String args[ ])?

java - Hazelcast -executeOnEntries 在第一次执行后停止

java - 从不同位置执行 gradle exe 会抛出 "Error: Could not find or load main class"

apache-kafka - Kafka 控制台生产者 - 向主题 : TimeoutException: Expiring 1 record 发送消息时出错

jms - ActiveMQ、Apollo、Kafka