在 Spark 中,使用 pyspark,我有一个包含重复项的数据框。我想使用电子邮件和 mobile_phone 等多个规则对它们进行重复数据删除。
这是我在 python 3 中的代码:
from pyspark.sql import Row
from pyspark.sql.functions import collect_list
df = sc.parallelize(
[
Row(raw_id='1001', first_name='adam', mobile_phone='0644556677', email='adam@gmail.fr'),
Row(raw_id='2002', first_name='adam', mobile_phone='0644556688', email='adam@gmail.fr'),
Row(raw_id='3003', first_name='momo', mobile_phone='0644556699', email='momo@gmail.fr'),
Row(raw_id='4004', first_name='momo', mobile_phone='0644556600', email='mouma@gmail.fr'),
Row(raw_id='5005', first_name='adam', mobile_phone='0644556688', email='adama@gmail.fr'),
Row(raw_id='6006', first_name='rida', mobile_phone='0644556688', email='rida@gmail.fr')
]
).toDF()
我的原始数据框是:
+--------------+----------+------------+------+
| email|first_name|mobile_phone|raw_id|
+--------------+----------+------------+------+
| adam@gmail.fr| adam| 0644556677| 1001|
| adam@gmail.fr| adam| 0644556688| 2002|
| momo@gmail.fr| momo| 0644556699| 3003|
|mouma@gmail.fr| momo| 0644556600| 4004|
|adama@gmail.fr| adam| 0644556688| 5005|
| rida@gmail.fr| rida| 0644556688| 6006|
+--------------+----------+------------+------+
然后,我应用重复数据删除规则:
df_mobile = df \
.groupBy('mobile_phone') \
.agg(collect_list('raw_id').alias('raws'))
df_email = df \
.groupBy('email') \
.agg(collect_list('raw_id').alias('raws'))
这是我得到的结果:
df_mobile.select('raws').show(10, False)
+------------------+
|raws |
+------------------+
|[2002, 5005, 6006]|
|[1001] |
|[4004] |
|[3003] |
+------------------+
df_email.select('raws').show(10, False)
+------------+
|raws |
+------------+
|[3003] |
|[4004] |
|[1001, 2002]|
|[5005] |
|[6006] |
+------------+
所以,我想要的最终结果是重新组合 raws 列的公共(public)元素,如下所示:
+------------------------+
|raws |
+------------------------+
|[3003] |
|[4004] |
|[2002, 5005, 6006, 1001]|
+------------------------+
你知道我如何使用 pyspark 做到这一点吗? 非常感谢!
最佳答案
因此,@pault 似乎暗示您可以将其建模为一个图表,其中原始数据框 df
是顶点列表以及 df_email
和 df_mobile
是连接顶点的列表。现在不幸的是 GraphX 不适用于 python,但是 GraphFrames是!
GrameFrames 有一个名为 Connected Components 的函数这将返回连接的 raw_ids
或顶点的列表。要使用它,我们必须做两件事,raw_id
必须称为 id
,边必须是源 (src) 和目标 (dst) 对,而不仅仅是顶点列表。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from graphframes import GraphFrame
spark = SparkSession \
.builder \
.appName("example") \
.getOrCreate()
spark.sparkContext.setCheckpointDir("checkpoints")
# graphframes requires a checkpoint dir since v0.3.0
# https://graphframes.github.io/user-guide.html#connected-components
spark.sparkContext.setLogLevel("WARN") # make it easier to see our output
vertices = spark.createDataFrame([
('1001', 'adam', '0644556677', 'adam@gmail.fr'),
('2002', 'adam', '0644556688', 'adam@gmail.fr'),
('3003', 'momo', '0644556699', 'momo@gmail.fr'),
('4004', 'momo', '0644556600', 'mouma@gmail.fr'),
('5005', 'adam', '0644556688', 'adama@gmail.fr'),
('6006', 'rida', '0644556688', 'rida@gmail.fr')
]).toDF("id", "first_name", "mobile_phone", "email")
mk_edges = udf(
lambda a: [{'src': src, 'dst': dst} for (src, dst) in zip(a, a[-1:] + a[:-1])],
returnType=ArrayType(StructType([
StructField('src', StringType(), nullable=False),
StructField('dst', StringType(), nullable=False)])))
def edges_by_group_key(df, group_key):
return df.groupBy(group_key) \
.agg(collect_list('id').alias('ids')) \
.select(mk_edges('ids').alias('edges')) \
.select(explode('edges').alias('edge')) \
.select("edge.*")
mobileEdges = edges_by_group_key(vertices, 'mobile_phone')
print('mobile edges')
mobileEdges.show(truncate=False)
# mobile edges
# +----+----+
# |src |dst |
# +----+----+
# |2002|6006|
# |5005|2002|
# |6006|5005|
# |1001|1001|
# |4004|4004|
# |3003|3003|
# +----+----+
emailEdges = edges_by_group_key(vertices, 'email')
print('email edges')
emailEdges.show(truncate=False)
# email edges
# +----+----+
# |src |dst |
# +----+----+
# |3003|3003|
# |4004|4004|
# |1001|2002|
# |2002|1001|
# |5005|5005|
# |6006|6006|
# +----+----+
g = GraphFrame(vertices, mobileEdges.union(emailEdges))
result = g.connectedComponents()
print('connectedComponents')
result.select("id", "component") \
.groupBy("component") \
.agg(collect_list('id').alias('ids')) \
.select('ids').show(truncate=False)
# connectedComponents
# +------------------------+
# |ids |
# +------------------------+
# |[1001, 2002, 5005, 6006]|
# |[4004] |
# |[3003] |
# +------------------------+
可能有一种更聪明的方法来实现移动和电子邮件数据帧之间的联合,也许可以使用不同的方法进行重复数据删除,但您明白了。
关于python - 在 Apache Spark (pyspark 2.4) 中同一行的数据帧集合列表中获取重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53934437/