apache-kafka - 仅启用一次时,Kafka 流中的 UnknownProducerIdException

标签 apache-kafka apache-kafka-streams

在 Kafka 流应用程序上启用一次处理后,日志中出现以下错误:

ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
due to the following error:

org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort 
sending since an error caught with a previous record (key 222222 value 
some-value timestamp 1519200902670) to topic exactly-once-test-topic- 
v2 due to This exception is raised by the broker if it could not 
locate the producer metadata associated with the producerId in 
question. This could happen if, for instance, the producer's records 
were deleted because their retention time had elapsed. Once the last 
records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will 
return this exception.
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
  at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
  at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
  at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
  at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
  at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
  at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
  at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
  at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
  at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
  at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
  at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException
我们用一个最小的测试用例重现了这个问题,我们将消息从一个源流移动到另一个流,而没有任何转换。源流包含数月内产生的数百万条消息。 KafkaStreams 对象是使用以下 StreamsConfig 创建的:
  • StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once"
  • StreamsConfig.APPLICATION_ID_CONFIG = "一些应用程序 ID"
  • StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1
  • ProducerConfig.BATCH_SIZE_CONFIG = 102400

  • 该应用程序能够在异常发生之前处理一些消息。
    上下文信息:
  • 我们正在运行一个带有 5 个 zookeeper 节点的 5 节点 Kafka 1.1.0 集群。
  • 有多个应用程序实例正在运行

  • 有没有人以前见过这个问题,或者可以给我们任何可能导致这种行为的提示?
    更新
    我们从头开始创建了一个新的 1.1.0 集群,并开始毫无问题地处理新消息。然而,当我们从旧集群导入旧消息时,我们在一段时间后遇到了相同的 UnknownProducerIdException。
    接下来我们尝试设置 cleanup.policy关于水槽话题到 compact同时保留 retention.ms在 3 年。现在错误没有发生。但是,消息似乎已丢失。源偏移为1.06亿,汇偏移为1亿。

    最佳答案

    正如评论中所解释的那样,当前似乎存在一个错误,在重播早于(最大可配置?)保留时间的消息时可能会导致问题。

    在撰写本文时,此问题尚未解决,始终可以在此处查看最新状态:

    https://issues.apache.org/jira/browse/KAFKA-6817

    关于apache-kafka - 仅启用一次时,Kafka 流中的 UnknownProducerIdException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49872827/

    相关文章:

    apache-kafka - 列出 Apache Kafka 中所有可用代理的命令是什么?

    java - 如何在同一个盒子上运行多个相互独立的kafka消费者?

    c++ - kafka消费者在zookeeper中的注册列表

    apache-kafka - 与 KafkaStreams 的窗口结束外连接

    apache-kafka - 在 Kafka-python 中的消费者组中重置 kafka LAG(更改偏移量)

    apache-kafka - 有没有办法在 Apache Kafka 2.0 中确定消息的优先级?

    apache-kafka - 在应用映射函数时在同一类上获取 Kafka Streams Class Cast Exception

    apache-kafka - 如何处理kafka KStream并直接写入数据库而不是向其发送另一个主题

    apache-kafka - 卡夫卡流 : PolicyViolationException: Topic replication factor must be 3

    java - 并行 KafkaStream 处理的更好方法?