java - 如何使用 Spark 的 Direct Stream for Kafka 设置消费者组提交的偏移量?

标签 java apache-spark apache-kafka spark-streaming

我正在尝试使用 Spark's Direct Approach (No Receivers) for Kafka ,我有以下 Kafka 配置图:

configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");

现在我的目标是,如果我的 Spark 管道崩溃并再次启动,流应该从消费者组提交的最新偏移量开始。因此,为此,我想为消费者指定起始偏移量。我有关于每个分区中提交的偏移量的信息。我如何将此信息提供给流功能。目前我正在使用

JavaPairInputDStream<byte[], byte[]> kafkaData =
   KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
     DefaultDecoder.class, DefaultDecoder.class,configMap,topic); 

最佳答案

查看 Spark API docs 中 createDirectStream 的第二种形式- 它允许你传入 Map<TopicAndPartition, Long> ,其中 Long 是偏移量。

请注意,当使用 DirectInputStream 时,Spark 不会自动更新 Zookeeper 中的偏移量 - 您必须自己将它们写入 ZK 或其他数据库。除非你对 exactly-once 语义有严格的要求,否则使用 createStream 方法取回一个 DStream 会更容易,在这种情况下,Spark 将更新 ZK 中的偏移量,并在失败的情况下从上次存储的偏移量恢复。

关于java - 如何使用 Spark 的 Direct Stream for Kafka 设置消费者组提交的偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31286909/

相关文章:

java - 如何在 Java 中压缩对象流?

java - 使用带有 SSL 的 JacORB (Java/CORBA) 的客户端策略错误

java - 为什么LinkedList的javadoc不保证pop和push的恒定时间性能?

scala - 如何使用 withColumn 创建新列以将两个数字列集中为 String ?

job-scheduling - Spark Streaming 中的作业是如何分配给执行者的?

python - 从 Kafka 轮询几条消息

elasticsearch - 尝试为 Elasticsearch Sink 配置 Debezium 镜像

java - 如何保存用户选择的 Intent ?

scala - 匹配向量 Spark Scala 中的 Dataframe 分类变量

apache-kafka - Kafka使用Key决定分区与手动发送消息