python - 在自定义函数上加入两个 RDD - SPARK

标签 python join apache-spark pyspark cluster-computing

是否可以在自定义函数上加入 Spark 中的两个 RDD? 我有两个以字符串作为键的大 RDD。我想加入他们,而不是使用经典的加入,而是使用自定义函数,例如:

def my_func(a,b):
    return Lev.distance(a,b) < 2

result_rdd = rdd1.join(rdd2, my_func)

如果不可能,是否有任何替代方案可以继续利用 spark 集群的优势? 我写了类似这样的东西,但 pyspark 将无法在我的小集群上分发工作。

def custom_join(rdd1, rdd2, my_func):
    a = rdd1.sortByKey().collect()
    b = rdd2.sortByKey().collect()
    i = 0
    j = 0
    res = []
    while i < len(a) and j < len(b):
        if my_func(a[i][0],b[j][0]):
            res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))]
            i+=1
            j+=1
        elif a[i][0] < b[j][0]:
            i+=1
        else:
            j+=1

    return sc.parallelize(res)

提前致谢(抱歉我的英语不好,因为我是意大利人)

最佳答案

您可以使用笛卡尔,然后根据条件进行过滤。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("b", 3)])

def customFunc(x):
    # You may use any condition here
    return x[0][0] ==x[1][0]

print(x.join(y).collect()) # normal join
# replicating join with cartesian
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect())

输出:

[('b', (4, 3)), ('a', (1, 2))]
[('a', (1, 2)), ('b', (4, 3))]

关于python - 在自定义函数上加入两个 RDD - SPARK,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43274215/

相关文章:

mysql - 如何检查具有多值的列

MySQL : selecting minimum value from one table and country details from another and grouping them according to country code

scala - 如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数

Python Pandas - 数据透视表输出意外 float

Python:无法使用 SSL 套接字发出 HTTPS 请求

python - 如何在运行时向通过 glade 创建的 gtk.ComboBox 添加项目?

scala - Spark :How to use join method?

python - 升级到 IPython 6.0 后,spyder 与 Anaconda 的 jedi 依赖问题

sql - 如何在 MySQL 中执行 FULL OUTER JOIN?

hadoop - org.apache.hive.jdbc.HiveDriver : HiveBaseResultSet has not implemented absolute()?