我有以下 pyspark 应用程序,它从子/父进程 ID 的 csv 生成子/父进程序列。将问题视为一棵树,我使用从叶节点(没有子进程的进程)开始的迭代深度优先搜索,并迭代我的文件以创建这些闭包,其中进程 1 是进程 2 的父进程,进程 2 是进程 2 的父进程。进程 3 的父进程,依此类推。
换句话说,给定如下所示的 csv,是否可以使用 pyspark 数据帧和适当的 pyspark-ism 实现深度优先搜索(迭代或递归)来生成所述闭包,而无需使用 .collect()功能(这是令人难以置信的昂贵)?from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext
class Test():
def __init__(self):
self.process_list = []
def main():
test = Test()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sc.textFile("<path to csv>")
df = df.map(lambda line: line.split(","))
header = df.first()
data = df.filter(lambda row: row != header)
data = data.toDF(header)
data.createOrReplaceTempView("flat")
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat
where doc_parent_pid is not null AND
doc_process_pid is not null")
data = data.select(monotonically_increasing_id().alias("rowId"), "*")
data.createOrReplaceTempView("data")
leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_parent_pid != -1 AND
doc_process_pid == -1")
leaf_df = leaf_df.rdd.collect()
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_process_pid != -1")
data.createOrReplaceTempView("data")
for row in leaf_df:
path = []
rowID = row[0]
data = data.filter(data['rowId'] != rowID)
parentID = row[4]
path.append(parentID)
while (True):
next_df = sqlContext.sql(
"select doc_process_pid, doc_parent_pid from data where
doc_process_pid == " + str(parentID))
next_df_rdd = next_df.collect()
print("parent: ", next_df_rdd[0][1])
parentID = next_df_rdd[0][1]
if (int(parentID) != -1):
path.append(next_df_rdd[0][1])
else:
test.process_list.append(copy.deepcopy(path))
break
print("final: ", test.process_list)
main()
这是我的 csv:
doc_process_pid doc_parent_pid
1 -1
2 1
6 -1
7 6
8 7
9 8
21 -1
22 21
24 -1
25 24
26 25
27 26
28 27
29 28
99 6
107 99
108 -1
109 108
222 109
1000 7
1001 1000
-1 9
-1 22
-1 29
-1 107
-1 1001
-1 222
-1 2
它表示子/父进程关系。如果我们将其视为一棵树,则叶节点由 doc_process_id == -1 定义,根节点是 doc_parent_process == -1 的进程。
上面的代码生成两个数据框:
叶节点:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| -1| 9|
| -1| 22|
| -1| 29|
| -1| 107|
| -1| 1001|
| -1| 222|
| -1| 2|
+---------------+--------------+
剩余的子/父进程没有叶节点:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| 1| -1|
| 2| 1|
| 6| -1|
| 7| 6|
| 8| 7|
| 9| 8|
| 21| -1|
| 22| 21|
| 24| -1|
| 25| 24|
| 26| 25|
| 27| 26|
| 28| 27|
| 29| 28|
| 99| 6|
| 107| 99|
| 108| -1|
| 109| 108|
| 222| 109|
| 1000| 7|
+---------------+--------------+
输出将是:
[[1, 2],
[6, 99, 107],
[6, 99, 7, 1000, 1001],
[6, 7, 1000, 8, 9],
[21, 22],
[24, 25, 26, 27, 28, 29],
[108, 109, 222]])
想法?虽然这有点具体,但我想强调执行深度优先搜索以生成以此 DataFrame 格式表示的序列闭包的一般问题。
预先感谢您的帮助!
最佳答案
我认为 pyspark 不是执行此操作的最佳语言。
解决方案是迭代每次将数据帧与其自身连接的树节点级别。
让我们创建我们的数据框,不需要将其拆分为叶子和其他节点,我们只保留原始数据框:
data = spark.createDataFrame(
sc.parallelize(
[[1, -1], [2, 1], [6, -1], [7, 6], [8, 7], [9, 8], [21,-1], [22,21], [24,-1], [25,24], [26,25], [27,26], [28,27],
[29,28], [99, 6], [107,99], [108,-1], [109,108], [222,109], [1000,7], [1001,1000], [ -1,9], [ -1,22], [ -1,29],
[ -1,107], [ -1, 1001], [ -1,222], [ -1,2]]
),
["doc_process_pid", "doc_parent_pid"]
)
我们现在将从这棵树创建两个数据框,一个将是我们的建筑基础,另一个将是我们的建筑砖 block :
df1 = data.filter("doc_parent_pid = -1").select(data.doc_process_pid.alias("node"))
df2 = data.select(data.doc_process_pid.alias("son"), data.doc_parent_pid.alias("node")).filter("node != -1")
让我们为构造的步骤i
定义一个函数:
def add_node(df, i):
return df.filter("node != -1").join(df2, "node", "inner").withColumnRenamed("node", "node" + str(i)).withColumnRenamed("son", "node")
让我们定义初始状态:
from pyspark.sql.types import *
df = df1
i = 0
df_end = spark.createDataFrame(
sc.emptyRDD(),
StructType([StructField("branch", ArrayType(LongType()), True)])
)
当分支完全构建完成后,我们将其从df
中取出并放入df_end
中:
import pyspark.sql.functions as psf
while df.count() > 0:
i = i + 1
df = add_node(df, i)
df_end = df.filter("node = -1").drop('node').select(psf.array(*[c for c in reversed(df.columns) if c != "node"]).alias("branch")).unionAll(
df_end
)
df = df.filter("node != -1")
最后,df 为空,我们有
df_end.show(truncate=False)
+------------------------+
|branch |
+------------------------+
|[24, 25, 26, 27, 28, 29]|
|[6, 7, 8, 9] |
|[6, 7, 1000, 1001] |
|[108, 109, 222] |
|[6, 99, 107] |
|[21, 22] |
|[1, 2] |
+------------------------+
此算法的最坏情况是连接数与最大分支长度一样多。
关于Pyspark - Dataframe 上的深度优先搜索,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45971772/