apache-kafka - KafkaStreams EXACTLY_ONCE 保证 - 跳过 kafka 偏移量

标签 apache-kafka spark-streaming offset apache-kafka-streams

我正在使用 Spark 2.2.0 和 kafka 0.10 Spark-streaming 库来读取充满 Kafka-Streams scala 应用程序的主题。 Kafka Broker 版本是 0.11,Kafka-streams 版本是 0.11.0.2。

当我在 Kafka-Stream 应用程序中设置 EXACTLY_ONCE 保证时:

 p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)

我在 Spark 中收到此错误:

java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-<group.id> <topic> 0 even after seeking to offset 24
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.to(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toBuffer(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toArray(KafkaRDD.scala:189)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

如果未设置 EXACTLY_ONCE 属性,则它可以正常工作。

编辑1: 充满 kafka-streams 应用程序的主题(恰好启用一次)具有错误的结束偏移量。当我运行 kafka.tools.GetOffsetShell 时,它给出结束偏移量 18,但主题中只有 12 条消息(保留已禁用)。当禁用恰好一次保证时,这些偏移量是匹配的。我尝试根据 this 重置 kafka-streams ,但问题仍然存在。

编辑2: 当我使用 --print-offsets 选项运行 SimpleConsumerShell 时,输出如下:

next offset = 1
{"timestamp": 149583551238149, "data": {...}}
next offset = 2
{"timestamp": 149583551238149, "data": {...}}
next offset = 4
{"timestamp": 149583551238149, "data": {...}}
next offset = 5
{"timestamp": 149583551238149, "data": {...}}
next offset = 7
{"timestamp": 149583551238149, "data": {...}}
next offset = 8
{"timestamp": 149583551238149, "data": {...}}
...

当启用一次性交付保证时,某些偏移量显然会被跳过。

有什么想法吗?什么会导致这种情况?谢谢!

最佳答案

我发现偏移间隙是 Kafka(版本 >= 0.11)中的预期行为,这些是由提交/中止事务标记引起的。

有关 kafka 事务和控制消息的更多信息 here :

These transaction markers are not exposed to applications, but are used by consumers in read_committed mode to filter out messages from aborted transactions and to not return messages which are part of open transactions (i.e., those which are in the log but don’t have a transaction marker associated with them).

here .

Kafka事务是在Kafka 0.11中引入的,所以我假设spark-streaming-kafka库0.10与此消息格式不兼容,并且新版本的spark-streaming-kafka尚未实现。

关于apache-kafka - KafkaStreams EXACTLY_ONCE 保证 - 跳过 kafka 偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48344055/

相关文章:

apache-kafka - 如何在卡夫卡消费者群体的情况下引入重新平衡延迟?

compression - Kafka消息编解码器-压缩和解压缩

java - 未提交的消息不会再次被消耗

scala - 为什么使用 Kafka 的 Spark Streaming 应用程序失败并显示 "ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition"?

apache-spark - 将数据从 dstream 写入 parquet

C++ , Cheat Engine/OllyDBG 从多级指针中找到基址 "static"

apache-kafka - AWS DMS Kafka 目标端点测试失败,显示 "application error 1020912: failed to connect to database"

java - 将 JavaStreamingContext 从 INITIALIZED 切换到 ACTIVE

c - 如何获取大型(二进制)文件中字符串的偏移量?

php - 使用 MySQL LIMIT、OFFSET 进行分页