apache-spark - 无法在kafka direct stream、Spark streaming中手动提交offset

标签 apache-spark apache-kafka spark-streaming dstream

我正在尝试验证手动偏移提交的工作情况。

当我尝试通过使用 thread.sleep()/jssc.stop()/在 while 循环中抛出异常来退出作业时,我看到正在提交偏移量。

我只是为了测试而发送了几条消息,但是一旦作业开始处理批处理,我就看到了 0 滞后。

spark 什么时候真正提交偏移量?

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

    kafkaStream.foreachRDD(kafkaStreamRDD -> {
                // fetch kafka offsets for manually committing it later
                OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();


                // filter unwanted data
                kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {

                    //filter logic here

                }).foreachPartition(kafkaRecords -> {

                    //Initializing DB connections

                    while (kafkaRecords.hasNext()) {

                        //doing some work here

                        //-----> EXCEPTION
                        throw new Exception();
                    }

                });
                // commit offsets saveOffsets after processing
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
                if (exception != null) {
                    System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
                    exception.printStackTrace(); // need this for driver
                } else {
                    System.out.println("Successfully committed offsets"); // need this for driver
                    for (OffsetRange offsetRange : offsetRanges) {
                        System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
                    }
                }
            });

enable.auto.commit : false

观察 while 循环中的 throw new Exception();。即使批处理由于异常而失败,我也看到提交的偏移量,我预计这里会出现一些延迟,因为处理失败,这里有什么问题?

最佳答案

Kafka 上的 Spark 结构化流的美妙之处在于它提供了 Kafka Stream 中不可用的手动偏移量。 Spark 流提交是线程安全的,本质上是异步的,并且由于 Kafka 不是事务性的,因此您的输出必须仍然是幂等的。这意味着当您开始使用消息时,您的偏移量会不断增加,而提交可能会在稍后出现。 与 HasOffsetRanges 一样,只有在调用 createDirectStream 的结果时才会成功转换为 CanCommitOffsets,而不是在转换之后。 commitAsync 调用是线程安全的,但必须在输出之后发生。

您可以使用回调来检查您的提交执行情况,如下所示

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, new OffsetCommitCallback() {
            def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
              m.foreach(f => {
                if (null != e) {
                  logger.info("Failed to cmomit:" + f._1 + "," + f._2)
                  logger.info("Error while commitAsync. Retry again"+e.toString)
                  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
                } else {
                  println("Offset commit:" + f._1 + "," + f._2)
                }
              })
            }
          })
          

关于apache-spark - 无法在kafka direct stream、Spark streaming中手动提交offset,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58459436/

相关文章:

scala - 使用 scala sbt 对 kafka + flink 示例进行故障排除?

python - 在Python环境中使用Kafka进行合约测试?

java - 在Spark Streaming应用程序中,如何在一行lines.foreachRDD()完成执行后执行lines.map()函数

java - 计算 Apache Spark DStream 中的元素

sql-生成64位的随机整数

python - 如何在具有不同日期格式的列上将字符串转换为日期

apache-spark - 将具有 UTC 偏移量的字符串转换为 Spark 时间戳

python - 在 Apache Spark (pyspark 2.4) 中同一行的数据帧集合列表中获取重复项

apache-kafka - 弗林克 : join file with kafka stream

java - 获取 Spark 的流窗口时间戳