java - 如何使用直接流在 Kafka Spark Streaming 中指定消费者组

标签 java apache-spark apache-kafka spark-streaming kafka-consumer-api

如何使用直接流 API 为 kafka spark 流指定消费者组 ID。

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc, 
            String.class, 
            String.class,
            StringDecoder.class, 
            StringDecoder.class, 
            kafkaParams, 
            topicsSet
    );

虽然我已经指定了配置但不确定是否遗漏了什么。使用 spark1.3

kafkaParams.put("group.id", "app1");

最佳答案

直接流 API 使用低级别的 Kafka API,因此无论如何都不使用消费者组。如果您想将消费者组与 Spark Streaming 一起使用,则必须使用基于接收器的 API。

Full details are available in the doc !

关于java - 如何使用直接流在 Kafka Spark Streaming 中指定消费者组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36508553/

相关文章:

java sql查询从csv文件导入的数据

java - 启动后的 Android Studio 错误

java - android - 蓝牙适配器 - 消息处理程序缓冲区限制

hadoop - 如何从程序中获取 Spark 作业状态?

scala - 如何在大窗口上优化窗口聚合?

scala - 不止一个 Spark 上下文错误

java - Spring Kafka、Spring Cloud Stream 和 Avro 兼容性 Unknown magic byte

java - 在 android 版本 4.0 上单击通知时, Activity 永远不会启动

apache-kafka - 在 Apache Kafka 中使用 producer.properties 和 consumer.properties 文件

kubernetes - 没有自定义指标的水平Pod自动缩放