java - 使用 Java 和 Kafka 的 Apache Spark Streaming

标签 java apache-spark apache-kafka

我正在尝试从 official Spark website 运行 Spark Streaming 示例

这些是我在 pom 文件中使用的依赖项:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

这是我的 Java 代码:

package com.myproject.spark;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.myproject.spark.serialization.JsonDeserializer;

import scala.Tuple2;

public class MainEntryPoint {
  public static void main(String[] args) {
    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer",JsonDeserializer.class.getName());
    kafkaParams.put("group.id", "ttk-event-listener");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("topic1", "topic2");

    SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("EMSStreamingApp");
    JavaStreamingContext streamingContext =
        new JavaStreamingContext(conf, Durations.seconds(1));

    JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        streamingContext,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );

    stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));


    streamingContext.start();
    try {
      streamingContext.awaitTermination();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

当我尝试从 Eclipse 运行它时出现以下异常:

18/07/16 13:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.106, 51604, None)
18/07/16 13:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.106, 51604, None)
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:168)
at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.myproject.spark.MainEntryPoint.main(MainEntryPoint.java:47)
18/07/16 13:35:28 INFO SparkContext: Invoking stop() from shutdown hook

我从我的 IDE (eclipse) 运行它。我是否必须创建 JAR 并将其部署到 spark 中才能使其运行。如果有人知道异常,请分享您的经验。提前致谢

最佳答案

也尝试对 spark-streaming-kafka 依赖项使用 2.3.1。

还检查关于 java.lang.AbstractMethodError 的其他相关问题及其答案。

这通常意味着使用的库与其接口(interface)/实现不匹配。

关于java - 使用 Java 和 Kafka 的 Apache Spark Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51360347/

相关文章:

java - eclipse - 如何为多个开发人员创建一个java项目(github)

java - 如何使用 boolean 方法从购物车中删除对象?

mongodb - Kafka Mongodb 接收器连接器 - 更新文档

apache-kafka - kafka 幂等和事务性生产者设置之间的区别?

java - 截断 Java 堆栈

java - 如何获取计划任务调用批处理文件的名称,并将其传递给 Java 程序?

python - 用 pyspark 替换数据框中一列的所有值

java - 如何控制每个区域服务器读取 HBase 表的映射器数量

python - 如何将向量转换为数组以进行频繁模式分析

apache-kafka - Kafka Streams 持久存储清理