Pyspark - Dataframe 上的深度优先搜索

标签 pyspark apache-spark-sql depth-first-search

我有以下 pyspark 应用程序,它从子/父进程 ID 的 csv 生成子/父进程序列。将问题视为一棵树,我使用从叶节点(没有子进程的进程)开始的迭代深度优先搜索,并迭代我的文件以创建这些闭包,其中进程 1 是进程 2 的父进程,进程 2 是进程 2 的父进程。进程 3 的父进程,依此类推。

换句话说,给定如下所示的 csv,是否可以使用 pyspark 数据帧和适当的 pyspark-ism 实现深度优先搜索(迭代或递归)来生成所述闭包,而无需使用 .collect()功能(这是令人难以置信的昂贵)?

from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext

class Test():
    def __init__(self):
        self.process_list = []

def main():

    test = Test()
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc) 
    df = sc.textFile("<path to csv>") 
    df = df.map(lambda line: line.split(","))
    header = df.first()
    data = df.filter(lambda row: row != header)
    data = data.toDF(header)

    data.createOrReplaceTempView("flat")
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat 
                           where doc_parent_pid is not null AND 
                           doc_process_pid is not null")
    data = data.select(monotonically_increasing_id().alias("rowId"), "*")

    data.createOrReplaceTempView("data")
    leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
                                 where doc_parent_pid != -1 AND 
                                 doc_process_pid == -1")
    leaf_df = leaf_df.rdd.collect()
    data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data 
                             where doc_process_pid != -1")
    data.createOrReplaceTempView("data")

    for row in leaf_df:
        path = []
        rowID = row[0]  
        data = data.filter(data['rowId'] != rowID)
        parentID = row[4]
        path.append(parentID)
        while (True):
            next_df = sqlContext.sql(
                "select doc_process_pid, doc_parent_pid from data where 
                 doc_process_pid == " + str(parentID))

            next_df_rdd = next_df.collect()
            print("parent: ", next_df_rdd[0][1])
            parentID = next_df_rdd[0][1]

            if (int(parentID) != -1):
                path.append(next_df_rdd[0][1])
            else:
                test.process_list.append(copy.deepcopy(path))
                break

        print("final: ", test.process_list)


main()

这是我的 csv:

doc_process_pid doc_parent_pid
   1             -1
   2              1
   6             -1
   7              6
   8              7
   9              8
   21            -1
   22            21
   24            -1
   25            24
   26            25
   27            26
   28            27
   29            28
   99             6
   107           99
   108           -1
   109          108
   222          109
   1000           7
   1001        1000
    -1            9
    -1           22
    -1           29
    -1          107
    -1         1001
    -1          222
    -1            2

它表示子/父进程关系。如果我们将其视为一棵树,则叶节点由 doc_process_id == -1 定义,根节点是 doc_parent_process == -1 的进程。

上面的代码生成两个数据框:

叶节点:

+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
|             -1|             9|
|             -1|            22|
|             -1|            29|
|             -1|           107|
|             -1|          1001|
|             -1|           222|
|             -1|             2|
+---------------+--------------+

剩余的子/父进程没有叶节点:

+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
|              1|            -1|
|              2|             1|
|              6|            -1|
|              7|             6|
|              8|             7|
|              9|             8|
|             21|            -1|
|             22|            21|
|             24|            -1|
|             25|            24|
|             26|            25|
|             27|            26|
|             28|            27|
|             29|            28|
|             99|             6|
|            107|            99|
|            108|            -1|
|            109|           108|
|            222|           109|
|           1000|             7|
+---------------+--------------+

输出将是:

[[1, 2], 
 [6, 99, 107], 
 [6, 99, 7, 1000, 1001], 
 [6, 7, 1000, 8, 9], 
 [21, 22], 
 [24, 25, 26, 27, 28, 29], 
 [108, 109, 222]])

想法?虽然这有点具体,但我想强调执行深度优先搜索以生成以此 DataFrame 格式表示的序列闭包的一般问题。

预先感谢您的帮助!

最佳答案

我认为 pyspark 不是执行此操作的最佳语言。

解决方案是迭代每次将数据帧与其自身连接的树节点级别。

让我们创建我们的数据框,不需要将其拆分为叶子和其他节点,我们只保留原始数据框:

data = spark.createDataFrame(
    sc.parallelize(
        [[1, -1], [2,  1], [6, -1], [7,  6], [8,  7], [9,  8], [21,-1], [22,21], [24,-1], [25,24], [26,25], [27,26], [28,27], 
         [29,28], [99, 6], [107,99], [108,-1], [109,108], [222,109], [1000,7], [1001,1000], [ -1,9], [ -1,22], [ -1,29], 
         [ -1,107], [ -1, 1001], [ -1,222], [ -1,2]]
    ), 
    ["doc_process_pid", "doc_parent_pid"]
)

我们现在将从这棵树创建两个数据框,一个将是我们的建筑基础,另一个将是我们的建筑砖 block :

df1 = data.filter("doc_parent_pid = -1").select(data.doc_process_pid.alias("node"))
df2 = data.select(data.doc_process_pid.alias("son"), data.doc_parent_pid.alias("node")).filter("node != -1")

让我们为构造的步骤i定义一个函数:

def add_node(df, i):
    return df.filter("node != -1").join(df2, "node", "inner").withColumnRenamed("node", "node" + str(i)).withColumnRenamed("son", "node")

让我们定义初始状态:

from pyspark.sql.types import *
df = df1
i = 0
df_end = spark.createDataFrame(
    sc.emptyRDD(), 
    StructType([StructField("branch", ArrayType(LongType()), True)])
)

当分支完全构建完成后,我们将其从df中取出并放入df_end中:

import pyspark.sql.functions as psf
while df.count() > 0:
    i = i + 1
    df = add_node(df, i)
    df_end = df.filter("node = -1").drop('node').select(psf.array(*[c for c in reversed(df.columns) if c != "node"]).alias("branch")).unionAll(
        df_end
    )
    df = df.filter("node != -1")

最后,df 为空,我们有

df_end.show(truncate=False)
    +------------------------+
    |branch                  |
    +------------------------+
    |[24, 25, 26, 27, 28, 29]|
    |[6, 7, 8, 9]            |
    |[6, 7, 1000, 1001]      |
    |[108, 109, 222]         |
    |[6, 99, 107]            |
    |[21, 22]                |
    |[1, 2]                  |
    +------------------------+

此算法的最坏情况是连接数与最大分支长度一样多。

关于Pyspark - Dataframe 上的深度优先搜索,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45971772/

相关文章:

python - Qubole 中的宽数据 PySpark 机器学习

java - 未找到 Spark SQL 包

apache-spark - 如何更新Apache Spark DataFrame中的行/列值?

python-3.x - 如何实现广度优先和深度优先搜索网络爬虫?

algorithm - 深度优先图算法的时间复杂度

python - com.amazonaws.AmazonClientException : Unable to execute HTTP request: No such host is known (spark-tunes. s3a.ap-south-1.amazonaws.com)

python - 将 pyspark 数据框与另一个数据框进行比较

python - 如何检查我的一列值是否存在于另一列中

apache-spark - 在 Spark Thrift 服务器中缓存 DataFrame

java - 给定一个整数 n,按字典顺序返回 1 - n