scala - Spark 2.0 数据集与数据帧

标签 scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

从 spark 2.0.1 开始我有一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:

  • 和有什么区别
  • df.select("foo")
  • df.select($"foo")
  • 我理解正确吗
  • myDataSet.map(foo.someVal)是类型安全的,不会转换为 RDD但保持数据集表示/没有额外开销(2.0.0 的性能明智)
  • 所有其他命令,例如select, .. 只是语法糖。它们不是类型安全的,可以使用映射。我怎么可能df.select("foo")没有 map 语句的类型安全?
  • 为什么我应该使用 UDF/UADF 而不是 map (假设 map 保留在数据集表示中)?
  • 最佳答案

  • df.select("foo") 之间的区别和 df.select($"foo")是签名。前者至少需要一个String , 后一个零个或多个 Columns .除此之外没有实际区别。
  • myDataSet.map(foo.someVal)类型检查,但与任何 Dataset 一样操作使用 RDD对象,并与 DataFrame 相比操作,有很大的开销。我们来看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    

    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    如您所见,此执行计划需要访问所有字段并且必须访问 DeserializeToObject .
  • 不。一般来说,其他方法不是语法糖,并且会生成明显不同的执行计划。例如:

    ds.select($"foo").explain
    

    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前显示的计划相比,它可以直接访问列。与其说是 API 的限制,不如说是操作语义不同的结果。
  • How could I df.select("foo") type-safe without a map statement?



    没有这样的选择。虽然类型化的列允许您静态转换 Dataset进入另一个静态类型Dataset :
    ds.select($"bar".as[Int])
    

    没有类型安全。还有一些其他尝试包括类型安全优化操作,like typed aggregations ,但是这个实验性的 API。
  • why should I use a UDF / UADF instead of a map



    这完全取决于你。 Spark 中的每个分布式数据结构都有自己的优点和缺点(参见示例 Spark UDAF with ArrayType as bufferSchema performance issues)。

  • 就个人而言,我发现静态类型 Dataset是最没用的:
  • 不要提供与 Dataset[Row] 相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但它并不能完全受益于代码生成或堆外存储)也不能访问 DataFrame 的所有分析功能.
  • 类型化转换是黑盒,有效地为优化器创建分析障碍。例如,选择(过滤器)不能被推送到类型转换:
    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
    

    == Physical Plan ==
    *Filter (foo#133 = 1)
    +- *Filter <function1>.apply
       +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
          +- Exchange hashpartitioning(foo#133, 200)
             +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
                +- LocalTableScan [foo#133, bar#134]
    

    相比:
    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
    

    == Physical Plan ==
    *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
    +- Exchange hashpartitioning(foo#133, 200)
       +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
          +- *Filter (foo#133 = 1)
             +- LocalTableScan [foo#133, bar#134] 
    

    这会影响谓词下推或投影下推等功能。
  • 没有RDDs那么灵活仅原生支持一小部分类型。
  • “类型安全”与 Encoders是有争议的 Dataset使用 as 转换方法。由于未使用签名对数据形状进行编码,因此编译器只能验证 Encoder 的存在。 .

  • 相关问题:
  • Perform a typed join in Scala with Spark Datasets
  • Spark 2.0 DataSets groupByKey and divide operation and type safety
  • 关于scala - Spark 2.0 数据集与数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40596638/

    相关文章:

    hadoop - Spark对象运行时错误

    scala - 如何访问 Spark UDF 函数中的完整 Row?

    apache-spark - 两个非常相似的 Spark Dataframe 之间性能差异的可能原因

    amazon-dynamodb - 从 Spark 程序连接 DynamoDB 以使用 Python 从一张表中加载所有项目?

    scala - 从宏中获取具有匿名类方法的结构类型

    Scala 将隐式函数应用于集合

    scala - 如何将一个加特林请求返回到另一个请求中-Scala

    scala - ssl - 自签名证书获取链时出错

    csv - 将 Spark DataFrame 的内容保存为单个 CSV 文件

    apache-spark - 如何使指针为四个字节而不是八个字节