java - Spark Java Map 函数被执行两次

标签 java apache-spark apache-spark-sql rdd

我将上述代码作为 Spark 驱动程序,当我执行我的程序时,它可以正常工作,将所需数据保存为 Parquet 文件。

String indexFile = "index.txt";
JavaRDD<String> indexData = sc.textFile(indexFile).cache();
JavaRDD<String> jsonStringRDD = indexData.map(new Function<String, String>() {
  @Override
  public String call(String patientId) throws Exception {
   return "json array as string"
  }   
}); 

//1. Read json string array into a Dataframe (execution 1)
DataFrame dataSchemaDF = sqlContext.read().json(jsonStringRDD );
//2. Save dataframe as parquet file (execution 2)
dataSchemaDF.write().parquet("md.parquet");

但我观察到我在 RDD indexData 上的映射器函数被执行了两次。 首先,当我使用 SQLContextjsonStringRdd 读取为 DataFrame 时 其次,当我将 dataSchemaDF 写入 parquet 文件时

你能指导我如何避免这种重复执行吗?还有其他更好的方法可以将 JSON 字符串转换为 Dataframe 吗?

最佳答案

我认为原因是缺少 JSON 读取器的模式。当你执行时:

sqlContext.read().json(jsonStringRDD);

Spark 必须为新创建的 DataFrame 推断架构。为此,它具有扫描输入 RDD,并且急切地执行此步骤

如果你想避免它,你必须创建一个 StructType 来描述 JSON 文档的形状:

StructType schema;
...

并在创建 DataFrame 时使用它:

DataFrame dataSchemaDF = sqlContext.read().schema(schema).json(jsonStringRDD);

关于java - Spark Java Map 函数被执行两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40073299/

相关文章:

java - 选择哪个 log4j facade?

apache-spark - GCP Dataproc 无法在没有 NAT 的情况下访问 GKE 上的 Kafka 集群 - 两者都位于同一 VPC 上

apache-spark - 如何在 DataFrame Spark 1.6 中加载特定的 Hive 分区?

apache-spark - 为什么 Zeppelin 在 %spark.sql 段落中以 "mismatched input ' ;' expecting <EOF>"失败?

apache-spark - spark中每个工作节点运行多少个执行程序进程?

scala - Spark- “sbt package”- “value $ is not a member of StringContext”-缺少Scala插件吗?

java - 是否可以在数组上使用共享 Intent

java - 如何使用 Java 中的 SIGAR 或 OSHI API 获取特定应用程序的操作系统进程详细信息?

java - 使用多个线程处理单个 HTTP 请求

memory-management - Spark中如何处理executor内存和driver内存?