scala - 如何定义DataFrame的分区?

标签 scala apache-spark dataframe apache-spark-sql partitioning

我已经开始在 Spark 1.4.0 中使用 Spark SQL 和 DataFrames。我想在 Scala 中的 DataFrames 上定义自定义分区器,但不知道如何执行此操作。

我正在使用的数据表之一包含一个交易列表,按帐户,类似于以下示例。

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

至少在最初,大多数计算将发生在帐户内的交易之间。所以我希望对数据进行分区,以便一个帐户的所有事务都在同一个 Spark 分区中。

但我没有看到定义这一点的方法。 DataFrame 类有一个名为“repartition(Int)”的方法,您可以在其中指定要创建的分区数。但是我没有看到任何可用于为 DataFrame 定义自定义分区器的方法,例如可以为 RDD 指定的方法。

源数据存储在 Parquet 中。我确实看到在将 DataFrame 写入 Parquet 时,您可以指定要分区的列,所以大概我可以告诉 Parquet 按“帐户”列对其数据进行分区。但是可能有数百万个帐户,如果我正确理解 Parquet,它会为每个帐户创建一个不同的目录,因此这听起来不是一个合理的解决方案。

有没有办法让 Spark 对这个 DataFrame 进行分区,以便帐户的所有数据都在同一个分区中?

最佳答案

Spark >= 2.3.0

SPARK-22614公开范围分区。

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389Data Source API v2 中公开外部格式分区.

Spark >= 1.6.0

在 Spark >= 1.6 中,可以使用按列分区进行查询和缓存。见:SPARK-11410SPARK-4849使用 repartition方法:
val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

不像 RDDs Spark Dataset (包括 Dataset[Row] 又名 DataFrame )目前无法使用自定义分区器。您通常可以通过创建人工分区列来解决这个问题,但它不会为您提供相同的灵活性。

Spark < 1.6.0:

您可以做的一件事是在创建 DataFrame 之前对输入数据进行预分区。
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

DataFrame来自 RDD 的创作只需要一个简单的 map 阶段就应该保留现有的分区布局*:
assert(df.rdd.partitions == partitioned.partitions)

以相同的方式重新分区现有 DataFrame :
sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

所以看起来也不是没有可能。问题仍然是它是否有意义。我会争辩说,大多数时候它不会:
  • 重新分区是一个昂贵的过程。在典型的场景中,大多数数据必须被序列化、混洗和反序列化。另一方面,可以从预分区数据中受益的操作数量相对较少,如果内部 API 没有设计为利用此属性,则会进一步受到限制。
  • 在某些情况下加入,但它需要内部支持,
  • 窗口函数调用匹配的分区器。同上,仅限于单个窗口定义。虽然它已经在内部进行了分区,因此预分区可能是多余的,
  • 使用 GROUP BY 的简单聚合- 可以减少临时缓冲区**的内存占用,但总体成本要高得多。或多或少相当于 groupByKey.mapValues(_.reduce) (当前行为)vs reduceByKey (预分区)。在实践中不太可能有用。
  • 使用 SqlContext.cacheTable 进行数据压缩.由于看起来它正在使用运行长度编码,因此应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩比。
  • 性能高度依赖于 key 的分布。如果它有偏差,将导致资源利用率不理想。在最坏的情况下,根本不可能完成工作。
  • 使用高级声明性 API 的一个重点是将自己与低级实现细节隔离开来。正如 @dwysakowicz 已经提到的和 @RomiKuntsman优化是 Catalyst Optimizer 的工作.这是一个非常复杂的野兽,我真的怀疑你可以轻松地改进它,而无需深入研究它的内部结构。

  • 相关概念

    使用 JDBC 源进行分区 :

    JDBC 数据源支持 predicates argument .它可以按如下方式使用:
    sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
    

    它为每个谓词创建一个 JDBC 分区。请记住,如果使用单个谓词创建的集合不是不相交的,您将在结果表中看到重复项。

    partitionBy DataFrameWriter 中的方法 :

    Spark DataFrameWriter提供 partitionBy可用于在写入时“分区”数据的方法。它使用提供的一组列在写入时分离数据
    val df = Seq(
      ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
    ).toDF("k", "v")
    
    df.write.partitionBy("k").json("/tmp/foo.json")
    

    这使得基于键的查询的谓词下推读取:
    val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
    df1.where($"k" === "bar")
    

    但它不等同于 DataFrame.repartition .特别是像这样的聚合:
    val cnts = df1.groupBy($"k").sum()
    

    仍然需要 TungstenExchange :
    cnts.explain
    
    // == Physical Plan ==
    // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
    // +- TungstenExchange hashpartitioning(k#90,200), None
    //    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
    //       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
    

    bucketBy DataFrameWriter 中的方法 ( Spark >= 2.0):
    bucketBypartitionBy 有类似的应用但它仅适用于表 ( saveAsTable )。分桶信息可用于优化连接:
    // Temporarily disable broadcast joins
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df.write.bucketBy(42, "k").saveAsTable("df1")
    val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
    df2.write.bucketBy(42, "k").saveAsTable("df2")
    
    // == Physical Plan ==
    // *Project [k#41, v#42, v2#47]
    // +- *SortMergeJoin [k#41], [k#46], Inner
    //    :- *Sort [k#41 ASC NULLS FIRST], false, 0
    //    :  +- *Project [k#41, v#42]
    //    :     +- *Filter isnotnull(k#41)
    //    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
    //    +- *Sort [k#46 ASC NULLS FIRST], false, 0
    //       +- *Project [k#46, v2#47]
    //          +- *Filter isnotnull(k#46)
    //             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
    

    * 我所说的分区布局仅指数据分布。 partitioned RDD 不再有分区器。
    ** 假设没有提前预测。如果聚合仅涵盖列的一小部分,则可能没有任何 yield 。

    关于scala - 如何定义DataFrame的分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30995699/

    相关文章:

    scala - 如何从 spark shell 设置 spark.local.dir 属性?

    Scala/Spark-将整数与数据帧列中的每个值相乘

    scala - 了解 scala 中 tailrec 带注释的递归方法的性能

    apache-spark - PySpark 数据帧操作导致 OutOfMemoryError

    python - 将每年日期更改为每月日期并添加新值以填充每月日期

    scala - 在 Scala Spark Dataframe 中填充空值

    python - python numpy 将对象数组中的元素从 int 转换为 float

    scala - 在ScalaCheck中生成选项[T]

    scala - 在没有具体化的情况下确保卫生

    apache-spark - 如果用户 ID 是字符串而不是连续整数,如何使用 mllib.recommendation?