python - 如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?

标签 python apache-spark pyspark rdd

我有一个名为 myRDDpyspark.rdd.PipelinedRDD。这是其示例内容:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),
 ((333, u'BB', u'B'), (999, u'BB', u'A')),...]

我需要删除第三列值不一致的所有条目。预期结果是这样的:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),...]

我该怎么做?

最佳答案

您可以使用带有 lambda 表达式的过滤器来检查每个元组对的第三个元素是否相同,例如:

l = [((111, u'BB', u'A'), (444, u'BB', u'A')),
     ((222, u'BB', u'A'), (888, u'BB', u'A')),
     ((333, u'BB', u'B'), (999, u'BB', u'A'))]

rdd = sc.parallelize(l)
rdd = rdd.filter(lambda x: x[0][2] == x[1][2])
result = rdd.collect()
print result

>>> [((111, u'BB', u'A'), (444, u'BB', u'A')), ((222, u'BB', u'A'), (888, u'BB', u'A'))]

为了回答您的后续评论,请记住,lambda 只是一个函数,如果您有更复杂的逻辑,您可以将其写为函数。你可以这样做:

def do_stuff(x):
    if (x[0][2] == 'C') or (x[1][2] == 'C'):
        return x     
    else:
        if x[0][2] == x[1][2]: return x
    return None

rdd = rdd.map(do_stuff).filter(lambda x: x is not None)

res = rdd.collect()

关于python - 如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46949953/

相关文章:

python - Python 中首选的最大缩进?

Python - 比较两个存在电子邮件但位于不同行的电子邮件列表

Python为单个对象/类创建多个实例

python - 当 pyspark 中两列的值是相同组合时删除行

python - PySpark 将数据帧中数组中的元素映射到另一个数据帧

apache-spark - PySpark withColumn & withField 类型错误 : 'Column' object is not callable

python - Pyspark 中时间戳的滚动平均值和总和

Java 原始音频输出

scala - 如何在 Spark 上执行大型计算

python - 在 PyCharm IDE 中添加 Spark 包