在 Kafka 流应用程序上启用一次处理后,日志中出现以下错误:
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer
due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort
sending since an error caught with a previous record (key 222222 value
some-value timestamp 1519200902670) to topic exactly-once-test-topic-
v2 due to This exception is raised by the broker if it could not
locate the producer metadata associated with the producerId in
question. This could happen if, for instance, the producer's records
were deleted because their retention time had elapsed. Once the last
records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will
return this exception.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException
我们用一个最小的测试用例重现了这个问题,我们将消息从一个源流移动到另一个流,而没有任何转换。源流包含数月内产生的数百万条消息。 KafkaStreams 对象是使用以下 StreamsConfig 创建的:该应用程序能够在异常发生之前处理一些消息。
上下文信息:
有没有人以前见过这个问题,或者可以给我们任何可能导致这种行为的提示?
更新
我们从头开始创建了一个新的 1.1.0 集群,并开始毫无问题地处理新消息。然而,当我们从旧集群导入旧消息时,我们在一段时间后遇到了相同的 UnknownProducerIdException。
接下来我们尝试设置
cleanup.policy
关于水槽话题到 compact
同时保留 retention.ms
在 3 年。现在错误没有发生。但是,消息似乎已丢失。源偏移为1.06亿,汇偏移为1亿。
最佳答案
正如评论中所解释的那样,当前似乎存在一个错误,在重播早于(最大可配置?)保留时间的消息时可能会导致问题。
在撰写本文时,此问题尚未解决,始终可以在此处查看最新状态:
https://issues.apache.org/jira/browse/KAFKA-6817
关于apache-kafka - 仅启用一次时,Kafka 流中的 UnknownProducerIdException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49872827/