python - 在 Apache Spark (pyspark 2.4) 中同一行的数据帧集合列表中获取重复项

标签 python apache-spark pyspark

在 Spark 中,使用 pyspark,我有一个包含重复项的数据框。我想使用电子邮件和 mobile_phone 等多个规则对它们进行重复数据删除。

这是我在 python 3 中的代码:

from pyspark.sql import Row

from pyspark.sql.functions import collect_list

df = sc.parallelize(
    [
        Row(raw_id='1001', first_name='adam', mobile_phone='0644556677', email='adam@gmail.fr'),
        Row(raw_id='2002', first_name='adam', mobile_phone='0644556688', email='adam@gmail.fr'),
        Row(raw_id='3003', first_name='momo', mobile_phone='0644556699', email='momo@gmail.fr'),
        Row(raw_id='4004', first_name='momo', mobile_phone='0644556600', email='mouma@gmail.fr'),
        Row(raw_id='5005', first_name='adam', mobile_phone='0644556688', email='adama@gmail.fr'),
        Row(raw_id='6006', first_name='rida', mobile_phone='0644556688', email='rida@gmail.fr')
    ]
).toDF()

我的原始数据框是:

+--------------+----------+------------+------+
|         email|first_name|mobile_phone|raw_id|
+--------------+----------+------------+------+
| adam@gmail.fr|      adam|  0644556677|  1001|
| adam@gmail.fr|      adam|  0644556688|  2002|
| momo@gmail.fr|      momo|  0644556699|  3003|
|mouma@gmail.fr|      momo|  0644556600|  4004|
|adama@gmail.fr|      adam|  0644556688|  5005|
| rida@gmail.fr|      rida|  0644556688|  6006|
+--------------+----------+------------+------+

然后,我应用重复数据删除规则:

df_mobile = df \
   .groupBy('mobile_phone') \
   .agg(collect_list('raw_id').alias('raws'))

df_email = df \
   .groupBy('email') \
   .agg(collect_list('raw_id').alias('raws'))

这是我得到的结果:

df_mobile.select('raws').show(10, False)
+------------------+                                                            
|raws              |
+------------------+
|[2002, 5005, 6006]|
|[1001]            |
|[4004]            |
|[3003]            |
+------------------+

df_email.select('raws').show(10, False)
+------------+
|raws        |
+------------+
|[3003]      |
|[4004]      |
|[1001, 2002]|
|[5005]      |
|[6006]      |
+------------+

所以,我想要的最终结果是重新组合 raws 列的公共(public)元素,如下所示:

+------------------------+
|raws                    |
+------------------------+
|[3003]                  |
|[4004]                  |
|[2002, 5005, 6006, 1001]|
+------------------------+

你知道我如何使用 pyspark 做到这一点吗? 非常感谢!

最佳答案

因此,@pault 似乎暗示您可以将其建模为一个图表,其中原始数据框 df 是顶点列表以及 df_emaildf_mobile 是连接顶点的列表。现在不幸的是 GraphX 不适用于 python,但是 GraphFrames是!

GrameFrames 有一个名为 Connected Components 的函数这将返回连接的 raw_ids 或顶点的列表。要使用它,我们必须做两件事,raw_id 必须称为 id,边必须是源 (src) 和目标 (dst) 对,而不仅仅是顶点列表。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from graphframes import GraphFrame

spark = SparkSession \
    .builder \
    .appName("example") \
    .getOrCreate()

spark.sparkContext.setCheckpointDir("checkpoints")
    # graphframes requires a checkpoint dir since v0.3.0
    # https://graphframes.github.io/user-guide.html#connected-components

spark.sparkContext.setLogLevel("WARN") # make it easier to see our output

vertices = spark.createDataFrame([
    ('1001', 'adam', '0644556677', 'adam@gmail.fr'),
    ('2002', 'adam', '0644556688', 'adam@gmail.fr'),
    ('3003', 'momo', '0644556699', 'momo@gmail.fr'),
    ('4004', 'momo', '0644556600', 'mouma@gmail.fr'),
    ('5005', 'adam', '0644556688', 'adama@gmail.fr'),
    ('6006', 'rida', '0644556688', 'rida@gmail.fr')
]).toDF("id", "first_name", "mobile_phone", "email")

mk_edges = udf(
    lambda a: [{'src': src, 'dst': dst} for (src, dst) in zip(a, a[-1:] + a[:-1])],
    returnType=ArrayType(StructType([
        StructField('src', StringType(), nullable=False),
        StructField('dst', StringType(), nullable=False)])))

def edges_by_group_key(df, group_key):
    return df.groupBy(group_key) \
        .agg(collect_list('id').alias('ids')) \
        .select(mk_edges('ids').alias('edges')) \
        .select(explode('edges').alias('edge')) \
        .select("edge.*")

mobileEdges = edges_by_group_key(vertices, 'mobile_phone')
print('mobile edges')
mobileEdges.show(truncate=False)

# mobile edges
# +----+----+
# |src |dst |
# +----+----+
# |2002|6006|
# |5005|2002|
# |6006|5005|
# |1001|1001|
# |4004|4004|
# |3003|3003|
# +----+----+

emailEdges = edges_by_group_key(vertices, 'email')
print('email edges')
emailEdges.show(truncate=False)

# email edges
# +----+----+
# |src |dst |
# +----+----+
# |3003|3003|
# |4004|4004|
# |1001|2002|
# |2002|1001|
# |5005|5005|
# |6006|6006|
# +----+----+

g = GraphFrame(vertices, mobileEdges.union(emailEdges))

result = g.connectedComponents()
print('connectedComponents')
result.select("id", "component") \
    .groupBy("component") \
    .agg(collect_list('id').alias('ids')) \
    .select('ids').show(truncate=False)

# connectedComponents
# +------------------------+
# |ids                     |
# +------------------------+
# |[1001, 2002, 5005, 6006]|
# |[4004]                  |
# |[3003]                  |
# +------------------------+

可能有一种更聪明的方法来实现移动和电子邮件数据帧之间的联合,也许可以使用不同的方法进行重复数据删除,但您明白了。

关于python - 在 Apache Spark (pyspark 2.4) 中同一行的数据帧集合列表中获取重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53934437/

相关文章:

python - 无法在 ubuntu 10.04 上的 openERP7 安装中加载模块 Web

python - ValueError:缓冲区大小必须是元素大小的倍数

apache-spark - pyspark : How to take a sample RDD from a huge RDD?

mapreduce - Spark中 "RDDs can be stored in memory"是什么意思?

sql - 如何在列数据 Spark scala 上检查 isEmpty

scala - 如何在多列上动态执行 udfs

python - 在条件列表上使用逻辑AND的PySpark DataFrame过滤器-Numpy All Equivalent

python - 二维数组 : row-wise operations

python - 在多变量情况下使用 SymPy 求解时限制解集的域

python - 将一行中的每一列传递给 Spark SQL 中的哈希函数