java - Spark Java Streaming - 创建 Spark Java 数据集时出现空指针异常

标签 java apache-kafka apache-spark-sql spark-streaming apache-spark-dataset

我正在尝试使用 JDBC 处理 Presto 上的查询,并将结果集传递回 Spark 以在其上创建临时表。我的结果集在列表中

我从 kafka 生产者处获取 json 消息形式的查询。所以在spark中创建了kafka消费者来获取消息并进行进一步处理。

以下是我的主要功能:

public static void main(String[] args) throws InterruptedException {

    SparkConf conf = new SparkConf();
     conf.setAppName("Wordcount Background");
     conf.setMaster("local");


    //SparkContext sc = SparkContext.getOrCreate(conf);
     SparkSession spark = 
SparkSession.builder().config(conf).getOrCreate();

     JavaSparkContext sc = new 
JavaSparkContext(spark.sparkContext());
     JavaStreamingContext ssc = new JavaStreamingContext(sc, 
Durations.seconds(5));
     SQLContext sqc = new SQLContext(sc);

     Set<String> topics = Collections.singleton("TestTopic");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "172.20.3.189:9092");

        JavaPairInputDStream<String, String> directKafkaStream = 
KafkaUtils.createDirectStream(ssc,
                String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topics);

        directKafkaStream.foreachRDD(rdd -> {
            //System.out.println("--- New RDD with " + 
rdd.partitions().size()
                  //  + " partitions and " + rdd.count() + " 
records");
            rdd.foreach(record -> {

                SparkkafkaJson sk = new SparkkafkaJson();

                Dataset<String> dfrdd = 
spark.createDataset(sk.process_query(record._2), Encoders.STRING());
                System.out.print(dfrdd);
                //Dataset<Row> df = spark.read().json(dfrdd);
                //df.show();

            });
        });

        ssc.start();
        ssc.awaitTermination();

}

以下是 process_query 方法,它将结果集返回到主函数:

public List<String> process_query(String queryjson) {
    String resstr="";
    String columnValue="";
    List<String> jsonList = new ArrayList<>();
    //List<String> list=new ArrayList<String>();
    try {

        Class.forName(JDBC_DRIVER);
        //Open a connection
        conn = DriverManager.getConnection(DB_URL, USER, PASS);
        //Execute a query
        stmt = conn.createStatement();
        String sql = process_json(queryjson);
        ResultSet res = stmt.executeQuery(sql);
        ResultSetMetaData rsmd = res.getMetaData();
        int columnsNumber = rsmd.getColumnCount();
        //Extract data from result set
        while (res.next()) {
          //System.out.println(res.getString(""));
            Gson userGson=new GsonBuilder().create();
            JsonObject params = new JsonObject();
            for (int i = 1; i <= columnsNumber; i++) {

                String ColName = rsmd.getColumnName(i);
                String ColVal = res.getString(i);


                params.addProperty(ColName, ColVal);

            }

            resstr = userGson.toJson(params);
            jsonList.add(resstr);
        }


        //Clean-up environment
        res.close();
        stmt.close();
        conn.close();
      } catch (SQLException se) {
        //Handle errors for JDBC
        se.printStackTrace();
      } catch (Exception e) {
        //Handle errors for Class.forName
        e.printStackTrace();
      } finally {
        //finally block used to close resources
        try {
          if (stmt != null) stmt.close();
        } catch (SQLException sqlException) {
          sqlException.printStackTrace();
        }
        try {
          if (conn != null) conn.close();
        } catch (SQLException se) {
          se.printStackTrace();
        }
      }
    return jsonList;
}

但我仍然收到此错误输出

    2019-05-30 13:17:41 INFO  ContextCleaner:54 - Cleaned accumulator 42
    2019-05-30 13:17:41 INFO  ContextCleaner:54 - Cleaned accumulator 109
    2019-05-30 13:17:43 INFO  CodeGenerator:54 - Code generated in 216.222798 
    ms
    2019-05-30 13:17:43 ERROR Executor:91 - Exception in task 1.0 in stage 9.0 
    (TID 19)
    java.lang.NullPointerException
        at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:183)
        at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:474)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:511)
        at SparkkafkaJson.SparkkafkaJson.lambda$1(SparkkafkaJson.java:213)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-05-30 13:17:43 WARN  TaskSetManager:66 - Lost task 1.0 in stage 9.0 (TID 19, localhost, executor driver): java.lang.NullPointerException
        at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
        at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:183)
        at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:474)
        at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:511)
        at SparkkafkaJson.SparkkafkaJson.lambda$1(SparkkafkaJson.java:213)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)

请帮忙

最佳答案

通过如下逻辑解决了上述问题:

    directKafkaStream.foreachRDD(rdd -> {
            //System.out.println("--- New RDD with " + rdd.partitions().size()
                  //  + " partitions and " + rdd.count() + " records");     
            rdd.collect().forEach(record -> {
                List<String> jsonList2 = new ArrayList<>();
                //System.out.print(sk.process_query(record._2,sk,spark));
                jsonList2 = sk.process_query(record._2,sk,spark);

                 if(jsonList2.size() > 0) {
                     //System.out.print("came here");
                     sk.jsonList3 = jsonList2; 
                     String JsonStr = jsonList2.toString();
                     //System.out.print(JsonStr);
                     System.out.print("Sending Data to API");
                     System.out.print(" ");
                     try {
                        sk.Send_query_data_SCDF(JsonStr);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                     }else {                 
                         System.out.print("came into else");
                         sk.jsonList3= new ArrayList<String>();
                     }


            });

我试图在 RDD 的 foreach 下创建一个数据集,这就是为什么给出空指针异常,因为数据集创建需要在驱动程序端完成,而不是在执行程序端完成。因此收集了 RDD 结果并将其用于 outside foreach 以使其持久化

            Dataset<String> dfrdd = spark.createDataset(sk.jsonList3,           
            Encoders.STRING());

            Dataset<Row> wordsDataFrame = spark.read().json(dfrdd);             

            wordsDataFrame.createOrReplaceTempView("words");


              Dataset<Row> wordCountsDataFrame =
                spark.sql("select * from words limit 10");
              wordCountsDataFrame.show();
              String jsonToReturn = 
              wordCountsDataFrame.toJSON().collectAsList().toString();
              System.out.print(jsonToReturn);


              sk.jsonList3= new ArrayList<String>();
              rdd.unpersist();

        });

关于java - Spark Java Streaming - 创建 Spark Java 数据集时出现空指针异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56379333/

相关文章:

java - 如何在 Java 中将 TIF 转换为 PNG?

amazon-web-services - 本地代理背后的 KAFKA 消费者设置。生产者在 AWS

apache-spark - 在 pyspark 的数据框列中验证日期格式

Java:获取对象的唯一属性(如哈希码,但防碰撞)

java - 为什么Java编译器允许通过空对象访问静态变量?

java - 来自 Java 和 Objective-C 的 C++

sql-server - 从 Kafka feed 读入 SQL Server

apache-kafka - 使用Debezium和Kafka Connect JDBC sink connector同步数据库时如何重命名主键?

java - 在spark scala或spark java中选择列和列值的频率

python - 忽略pyspark中不存在的路径