apache-spark - Spark : Most efficient way to sort and partition data to be written as parquet

标签 apache-spark pyspark apache-spark-sql pyspark-sql

我的数据原则上是一个表,除了其他“数据”之外,它还包含一列 ID 和一列 GROUP_ID

在第一步中,我将 CSV 读入 Spark,进行一些处理以准备第二步的数据,并将数据写入 Parquet 。
第二步做了很多 groupBy('GROUP_ID')Window.partitionBy('GROUP_ID').orderBy('ID')

现在的目标是——为了避免在第二步中改组——在第一步中有效地加载数据,因为这是一个一次性的。

问题第 1 部分: AFAIK,Spark 在从 parquet 加载时保留分区(这实际上是要进行的任何“优化写入考虑”的基础)- 对吗?

我想出了三种可能:

  • df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
  • df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
  • df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')

  • 我会设置 n 使得单个 Parquet 文件大约为 100MB。

    问题第 2 部分: 三个选项在目标方面产生“相同”/相似的结果是否正确(避免在第二步中洗牌)?如果不是,有什么区别?哪个“更好”?

    问题第 3 部分: 三个选项中哪一个在步骤 1 中表现更好?

    感谢您分享您的知识!

    编辑 2017-07-24

    经过一些测试(写入和读取 parquet)后,Spark 似乎无法在第二步默认恢复 partitionByorderBy 信息。分区数(从 df.rdd.getNumPartitions() 获得的似乎由内核数和/或 spark.default.parallelism(如果设置)决定,而不是由 Parquet 分区数决定。所以 问题 1 的答案将是 _0x104560795 _0x104560795问题 2 和问题 3 无关紧要。

    所以事实证明 真正的问题 是:有没有办法告诉 Spark,数据已经按列 X 分区并按列 _0x109104040565 排序?

    最佳答案

    您可能会对 Spark 中的分桶支持感兴趣。

    在此处查看详细信息
    https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html

    large.write
      .bucketBy(4, "id")
      .sortBy("id")
      .mode(SaveMode.Overwrite)
      .saveAsTable(bucketedTableName)
    

    注意 Spark 2.4 添加了对 bucket pruning 的支持(如 partition pruning )

    您正在查看的更直接的功能是 Hive 的分桶排序表
    https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables
    这在 Spark 中尚不可用(请参阅下面的 PS 部分)

    还要注意,排序信息不会由 Spark 自动加载,但是由于数据已经排序了..对它的排序操作实际上会快得多,因为没有太多工作要做 - 例如一次传递数据只是为了确认它已经排序。

    附注。
    Spark 和 Hive 分桶略有不同。
    这是在 Spark 中为在 Hive 中创建的分桶表提供兼容性的伞票 -
    https://issues.apache.org/jira/browse/SPARK-19256

    关于apache-spark - Spark : Most efficient way to sort and partition data to be written as parquet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45224820/

    相关文章:

    arrays - 用 PySpark 中的对应元素替换数组中的元素

    apache-spark - 在 EMR 上的 PySpark 中运行自定义 Java 类

    apache-spark - scala和python之间的API兼容性?

    apache-spark - PySpark 特征选择和可解释性

    scala - 如何分组并连接 Dataframe Spark Scala 中的列表

    java - 连接 2 个 Spark 数据帧,以列表形式获取结果

    scala - 将数据框转换为强类型数据集?

    apache-spark - PySpark 将 ArrayType(ArrayType(NoneType)) 转换为 ArrayType(ArrayType(IntegerType))

    hadoop - 如何将spark/hadoop任务的输入设置为一系列文件

    scala - Spark 写入: CSV data source does not support null data type