scala - 由 org.apache.spark.sql.Dataset 处的 : java. lang.NullPointerException 引起

标签 scala apache-spark dataframe apache-spark-sql

下面我提供我的代码。我遍历 DataFrame prodRows并为每个 product_PK我从 prodRows 中找到了一些匹配的 product_PK 子列表.

  numRecProducts = 10
  var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
  prodRows.foreach{ row : Row =>
      val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
      val gender = row.get(row.fieldIndex("gender_PK")).toString
      val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
      var productList: Array[(Long, Int)] = Array()
      if (!selection.rdd.isEmpty()) {
        productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
      }
    listOfProducts = listOfProducts + (product_PK -> productList)
  }

但是当我执行它时,它给了我以下错误。它看起来像 selection在某些迭代中为空。但是,我不明白如何处理此错误:
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
    at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
    at org.test.ComputeNumSim.run(ComputeNumSim.scala:69)
    at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19)
    at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61)
    at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877)
    at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74)
    at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

这是什么意思,我该如何处理?

最佳答案

您无法从传递给 Spark 的 DataFrame/RDD 转换之一的函数中访问任何 Spark 的“驱动程序端”抽象(RDD、DataFrame、Dataset、SparkSession...)。您也无法从这些函数中更新驱动程序端可变对象。

在您的情况下 - 您正在尝试使用 prodRowsselection (两者都是数据帧)在传递给 DataFrame.foreach 的函数中.您也在尝试更新 listOfProducts (本地驱动程序端变量)来自同一个函数。

为什么?

  • DataFrames、RDDs 和 SparkSession 只存在于你的 Driver 应用程序中。它们充当访问分布在工作机器集群上的数据的“句柄”。
  • 传递给 RDD/DataFrame 转换的函数被序列化并发送到该集群,以在每个工作机器上的数据分区上执行。当序列化的 DataFrames/RDD 在这些机器上被反序列化时——它们是无用的,它们仍然不能代表集群上的数据,因为它们只是在驱动程序应用程序上创建的数据的空心副本,实际上维护与集群的连接机器
  • 出于同样的原因,尝试更新驱动程序端变量将失败:变量(在大多数情况下从空开始)将被序列化,在每个工作人员上反序列化,在工作人员上本地更新,并留在那里。 . 原司机端变量不变

  • 你怎么能解决这个问题?
    使用 Spark 时,尤其是使用 DataFrame 时,您应该尽量避免对数据进行“迭代”,而应使用 DataFrame 的声明性操作。在大多数情况下,当您想为 DataFrame 中的每个记录引用另一个 DataFrame 的数据时,您需要使用 join使用组合来自两个 DataFrame 的数据的记录创建一个新的 DataFrame。

    在这种特定情况下,如果我设法正确地得出结论,这里有一个大致等效的解决方案,可以完成您要执行的操作。尝试使用它并阅读 DataFrame 文档以找出详细信息:
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    val numRecProducts = 10
    
    val result = prodRows.as("left")
      // self-join by gender:
      .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
      // limit to 10 results per record:
      .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
      .filter($"rn" <= numRecProducts).drop($"rn")
      // group and collect_list to create products column:
      .groupBy($"left.product_PK" as "product_PK")
      .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")
    

    关于scala - 由 org.apache.spark.sql.Dataset 处的 : java. lang.NullPointerException 引起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47358177/

    相关文章:

    scala - 如何以 DRY 方式过滤/禁用 SBT 中所有子项目的 scalac 选项

    scala - 在 specs2 框架中,为什么使用 Scope 会阻止执行 forAll 量词?

    scala - Scala 集合的内存管理如何工作?

    scala - S3A 文件系统上的 Spark 历史服务器 : ClassNotFoundException

    python - 如果列中的值在一组值列表中,则过滤数据框行

    scala - sbt 网络插件 : Not a valid key: jetty-run (similar: jetty-port, 码头上下文,运行)

    apache-spark - 如何在 Spark Standalone 上调试 Spark 应用程序?

    apache-spark - Airflow + Kubernetes VS Airflow + Spark

    python - 在 pandas 数据框中,根据其他两列的结果生成第三列数据

    python - 测试 pandas DataFrame 是否存在