apache-spark - 如何处理Apache Spark中集群节点之间要独立处理的不同图形文件?

标签 apache-spark dataframe apache-spark-sql spark-graphx graphframes

假设我有大量图形文件,每个图形大约有 50 万条边。我一直在 Apache Spark 上处理这些图形文件,我想知道如何有效地并行化整个图形处理作业。由于现在每个图形文件都是独立的,我正在寻找与文件的并行性。所以,如果我有 100 个图形文件,我有 20 个节点集群,我可以处理每个节点上的每个文件,所以每个节点将处理 5 个文件。现在,正在发生的事情就像是在多个阶段中处理单个图,这导致了很多改组。

graphFile = "/mnt/bucket/edges" #This directory has 100 graph files each file with around 500K edges

nodeFile = "/mnt/bucket/nodes" #This directory has node files

graphData = sc.textFile(graphFile).map(lambda line: line.split(" ")).flatMap(lambda edge: [(int(edge[0]),int(edge[1]))])

graphDataFrame = sqlContext.createDataFrame(graphData, ['src', 'dst']).withColumn("relationship", lit('edges')) # Dataframe created so as to work with Graphframes

nodeData = sc.textFile(nodeFile).map(lambda line: line.split("\s")).flatMap(lambda edge: [(int(edge[0]),)])

nodeDataFrame = sqlContext.createDataFrame(nodeData, ['id'])

graphGraphFrame = GraphFrame(nodeDataFrame, graphDataFrame)

connectedComponent = graphGraphFrame.connectedComponents()

问题是它需要花费大量时间来处理甚至几个文件。我必须处理像 20K 的文件。每个文件有 80 万条边。可能是如果可以找出数据分区策略,确保每个从属边都在单个节点上处理,那么混洗会更少。

或者有效解决这个问题的最佳方法是什么?

最佳答案

TL;博士 Apache Spark 不是这项工作的正确工具。

Spark 的主要范围是数据并行性,但您正在寻找的是任务并行性。理论上,核心 Spark 引擎足够通用,也可以用于实现有限的任务并行性,但在实践中,有更好的工具可以完成这样的工作,这绝对不是 GraphX 和 GraphFrames 等库的目标。

由于数据分布是这些库背后的核心假设,因此它们的算法是使用消息传递或连接等技术实现的,这些技术反射(reflect)在多阶段作业结构和洗牌中。如果数据适合主内存(您可以使用优化的图形处理库轻松处理单个节点上具有数百万条边的图形),这些技术在实践中完全没有用。

鉴于您展示的代码片段,核心图形处理库,如 igraphNetworkX (更好的文档和更全面的但不幸的是内存消耗和稍微慢)结合 GNU Parallel在实践中应该绰绰有余,效率更高。对于更复杂的工作,您可以考虑使用功能齐全的工作流管理工具,如 Airflow 或 Luigi。

关于apache-spark - 如何处理Apache Spark中集群节点之间要独立处理的不同图形文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39000349/

相关文章:

python - 如何使用 Pandas 获取条件行每组每 n 天的斜率?

scala - 使用FlatMap使用Spark和Scala将列名称附加到元素

apache-spark - 如何保留 Spark HashingTF() 函数的输入键或索引?

hadoop - Hive动态分区问题

apache-spark - 为Spark数据框中的每个组创建索引

hadoop - Spark 1.2 : Write single record into multiple files (blacklisted)

apache-spark - Spark 是否在内部跨节点分发数据帧?

scala - spark-core 依赖项中的冲突。它是如何工作的?

python - 在 python 中,如何创建相互迭代的两列?

r - 如何在 R 的 igraph 图中为重复值添加一行?