pyspark - Spark Dataframe 分组和分区键具有一定数量的分区。

标签 pyspark apache-spark-sql

我有一个 spark 数据框,其中包含多个标签和对应于每个标签的功能,如下所示:

+----------------+--------------------+
|           label|       feature_paths|
+----------------+--------------------+
|         person1|[-0.015756417, 0....|
|         person1|[-0.05177306, 0.1...|
|         person1|[-0.11631858, 0.1...|
|         person2|[-0.058303248, 0....|
|         person2|[-0.03415013, 0.0...|
+----------------+--------------------+

我想为每个标签(人)训练一个聚类模型,所以基本上,我想为每个标签创建一个 rdd,然后运行像 rdd.map(service) 这样的映射操作最终将为每个实体保存一个 gmm 模型。

代码如下:

def service(rddentry):

    label = rddentry[0]
    features = rddentry[1]

    print(label)

    from sklearn.mixture import BayesianGaussianMixture
    from sklearn.externals import joblib

    gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
    model = gmm.fit(features)
    joblib.dump(model, str(label)+'.joblib') 

    return model

我的目标,我想要实现的是:

  1. 创建一个 rdd,其中分区数等于唯一标签数,这样:rdd.getNumPartition() = no_of_unique_labels。 每个 rdd 条目将具有多个特征,属于一个标签。

  2. 将每个 rdd 分区发送到服务函数。

到目前为止我的实验:

  1. 在执行 sdf.repartition('label') 时,它会创建几个空数据帧。

  2. sdf.partionBy('label') 也不起作用。它创建随机数量的分区。

我已经花了将近两天的时间,但直到现在还没有具体的结果。在正确方向上的任何帮助或指导都会有所帮助。

最佳答案

您可以将 partitionBynew HashPartitioner(number_of_partitions) 一起使用

需要一个额外的操作来计算唯一标签计数,您可以将其用作所需分区的数量。

这是示例,注意:您需要成对的 RDD 才能执行此操作。因此,在重新分区之后,您可以map 从元组中获取必要的时间

scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))

如果有帮助,请告诉我。

关于pyspark - Spark Dataframe 分组和分区键具有一定数量的分区。,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53313030/

相关文章:

python - Pyspark - 在 groupby 和 orderBy 之后选择列中的不同值

python - 当值缺失时,将列值添加到另一列中的数组中

java - Spark SQL : using collect_set over array values?

apache-spark - DataFrame join 优化 - Broadcast Hash Join

python - PySpark DataFrame 上分组数据的 Pandas 样式转换

apache-spark - 不支持 spark sql 上下文中的 WITH 子句

python - PySpark 在 Synapse 链接服务之间切换

python - 如何在 Apache Spark 预构建版本中添加任何新库,如 spark-csv

apache-spark - 将常量值列添加到 Spark 数据框

apache-spark - Apache Spark 2.0 : Expression-string to orderBy()/sort() column in descending order