apache-spark - 分组数据的 Spark 并行处理

标签 apache-spark apache-spark-sql apache-spark-mllib scala-breeze

最初,我有很多数据。但是使用 spark-SQL 尤其是 groupBy 可以将其缩减到可管理的大小。 (适合单个节点的RAM)

我该怎么办 对所有组执行功能(并行) (分布在我的节点之间)?

我如何确保 的数据单个组收集到单个节点 ?例如。我可能想使用 local matrix用于计算,但不想遇到有关数据局部性的错误。

最佳答案

假设你有 x 没有。执行程序(在您的情况下每个节点可能有 1 个执行程序)。并且您希望以这样的方式对键上的数据进行分区,使每个键都落入一个独特的存储桶中,这将是一个完美的分区器。没有通用的方法这样做,但如果有一些特定于您的数据的固有分布/逻辑,则有可能实现这一点。
我处理过一个特定的案例,我发现 Spark 的内置哈希分区器在均匀分配 key 方面做得不好。所以我使用 Guava 编写了一个自定义分区器,如下所示:

  class FooPartitioner(partitions: Int) extends org.apache.spark.HashPartitioner(partitions: Int) {
    override def getPartition(key: Any): Int = {
      val hasherer = Hashing.murmur3_32().newHasher()
      Hashing.consistentHash(
        key match {
          case i: Int => hasherer.putInt(i).hash.asInt()
          case _ => key.hashCode
          },PARTITION_SIZE)
  }
 }

然后我将此分区器实例添加为我正在使用的 combineBy 的参数,以便生成的 rdd 以这种方式进行分区。
这在将数据分发到 x 个桶方面做得很好,但我想不能保证每个桶只有 1 个键。

如果您使用的是 Spark 1.6 并使用数据帧,您可以像这样定义一个 udfval hasher = udf((i:Int)=>Hashing.consistentHash(Hashing.murmur3_32().newHasher().putInt(i) .hash.asInt(),PARTITION_SIZE))并做 dataframe.repartition(hasher(keyThatYouAreUsing)) 希望这提供了一些开始的提示。

关于apache-spark - 分组数据的 Spark 并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36735730/

相关文章:

apache-spark - 如何通过 Spark SQL 作为 JDBC 分布式查询引擎访问 RDD 表?

scala - 应用 PCA 并保留总方差的百分比

jdbc - SPARK SQL - 使用 DataFrames 和 JDBC 更新 MySql 表

amazon-ec2 - 用于 Ubuntu(或可能是 Centos)的 Spark AMI - 不是 amazon linux?

azure - 是否可以在 Azure databricks 中使用基于 Parquet 文件名的增量表跟踪器?

scala - 如何在 Scala 中将 Array[(Double, Double)] 转换为 Array[Double]?

java - 如何在 Java Spark 中读取文本文件并将其转换为数据集?

apache-spark - 如何在PySpark管道中使用XGboost

PySpark 如何根据行值创建列

python - Synapse 工作区中的 PySpark Windows 函数(超前、滞后)