我有一个很大的 RDD,称之为 RDD1,经过初始过滤后大约有 3 亿行。我想做的是从 RDD1 中获取 id,并在另一个大型数据集中找到它的所有其他实例,称之为 RDD2,大约有 30 亿行。 RDD2 是通过查询存储在 Hive 中的 parquet 表以及 RDD1 创建的。 RDD1 中唯一 ID 的数量约为 1000 万个元素。
我的方法是当前收集 ids 并广播它们,然后过滤 RDD2。
我的问题是 - 有没有更有效的方法来做到这一点?或者这是最佳实践?
我有以下代码-
hiveContext = HiveContext(sc)
RDD1 = hiveContext("select * from table_1")
RDD2 = hiveContext.sql("select * from table_2")
ids = RDD1.map(lambda x: x[0]).distinct() # This is approximately 10 million ids
ids = sc.broadcast(set(ids.collect()))
RDD2_filter = RDD2.rdd.filter(lambda x: x[0] in ids.value))
最佳答案
我认为最好只使用单个 SQL 语句来进行连接:
RDD2_filter = hiveContext.sql("""select distinct t2.*
from table_1 t1
join table_2 t2 on t1.id = t2.id""")
关于apache-spark - 通过迭代另一个大 RDD 来过滤大 RDD - pySpark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34939296/