python - 基于 Spark 中的另一个 RDD 进行过滤

标签 python scala apache-spark

我只想保留在第二个表中引用了部门 ID 的员工。

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

我已经尝试了以下不起作用的代码:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

有什么想法吗?我在 Python 中使用 Spark 1.1.0。但是,我会接受 Scala 或 Python 的答案。

最佳答案

在这种情况下,您想要实现的是使用 department 表中包含的数据在每个分区进行过滤: 这将是基本的解决方案:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

如果您的部门数据很大,广播变量将通过一次将数据传送到所有节点来提高性能,而不必在每个任务中对其进行序列化

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

虽然使用 join 可行,但这是一个非常昂贵的解决方案,因为它需要对数据进行分布式洗牌 (byKey) 才能实现连接。鉴于需求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。

关于python - 基于 Spark 中的另一个 RDD 进行过滤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26214112/

相关文章:

python - 如何在 Azure 或 AWS 中托管私有(private) python 包管理器

list - Scala 获取列表中小于特定值的所有元素

java - 从 Java 转到 Scala,我应该使用 List 还是 Buffer 来替代 ArrayList

scala - Spark 、斯卡拉 : How to Subtract the values in the RDD pairs based on their key?

scala - 从自定义数据格式创建 spark 数据框

python - 根据索引值和条件语句将行值相加

python - Facebook 问题上的本地参数

python - 如何手动将包安装到 anaconda 的 python 发行版中?

Scala 2.8 突破

apache-spark - 如何通过增加 spark 的内存来解决 pyspark `org.apache.arrow.vector.util.OversizedAllocationException` 错误?