apache-kafka - 卡夫卡流 : PolicyViolationException: Topic replication factor must be 3

标签 apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream

我目前正在构建一个应用程序,它写入一个 kafka 主题并监听同一个主题以从中生成一个 ktable 并将其具体化到一个商店中。我正在运行的代码基于以下 sample .我几乎复制了其中的大部分内容(除了 PageViewEventSource 之外的所有内容)并将名称重构到我的用例中。我还使用示例中使用的 key 更新了我的 application.properties

运行应用程序时出现以下错误:

2020-02-12 17:54:31.982 ERROR 69005 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager         : stream-thread [restartedMain] Unexpected error during topic creation for pairing-events-pcmv-changelog.
Error message was: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
2020-02-12 17:54:31.986 ERROR 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:

org.apache.kafka.streams.errors.StreamsException: Could not create topic pairing-events-pcmv-changelog.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148) ~[kafka-streams-2.3.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049) ~[kafka-streams-2.3.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:635) ~[kafka-streams-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963) ~[kafka-streams-2.3.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:859) ~[kafka-streams-2.3.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) ~[kafka-streams-2.3.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) ~[kafka-streams-2.3.1.jar:na]
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3

2020-02-12 17:54:31.986  INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
2020-02-12 17:54:31.986  INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Shutting down
2020-02-12 17:54:31.986  INFO 69005 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-02-12 17:54:31.986  INFO 69005 --- [-StreamThread-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-02-12 17:54:31.991  INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2020-02-12 17:54:31.991  INFO 69005 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c] State transition from REBALANCING to ERROR
2020-02-12 17:54:31.991 ERROR 69005 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c] All stream threads have died. The instance will be in error state and should be closed.
2020-02-12 17:54:31.991  INFO 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Shutdown complete
Exception in thread "pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic pairing-events-pcmv-changelog.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:635)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:859)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3

任何修复方法的线索?

最佳答案

您的代理设置要求最小复制因子为 3。

您可以为绑定(bind)设置 ... topic.replication-factor 属性。

参见 Consumer Properties在 Binder 文档中。

关于apache-kafka - 卡夫卡流 : PolicyViolationException: Topic replication factor must be 3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60194656/

相关文章:

scala - ERROR Uncaught throwable from user code : java. lang.IllegalStateException in spark Streaming

kotlin - KafkaStreams : Getting Window Final Results

java - Kafka 流,分支输出到多个主题

apache-kafka - 如何重新处理 Kafka 中未提交的记录

apache-kafka - 了解 Confluent 控制中心

apache-kafka - 在 VirtualBox 中连接到 Kafka

java - Kafka Producer将消息发布到单个分区

java - 使用自定义 TimestampExtractor 的 Kafka Streams 窗口

java - 使用 SpringBoot 使用 Batch 从 Kafka 读取数据无法正常工作

java - 卡夫卡 : How to re-consume un committed/not Acknowledged message