apache-spark - Spark : filter out all rows based on key/value

标签 apache-spark rdd

我有一个 RDD,x,其中有两个字段:id、value。如果一行有一个特定的值,我想获取 id 并过滤掉具有该 id 的所有行。

例如,如果我有:

id1,value1
id1,value2

如果具有该 id 的任何行的值为 value1,我想过滤掉所有 id,那么我希望所有行都被过滤掉。但目前只有第一行被过滤掉,因为它的值为 value1。

我试过类似的东西

val filter = x.filter(row => (set contains row.value))

这将过滤掉具有特定值的所有行,但将具有相同 ID 的其他行保留在 RDD 中。

最佳答案

您必须为每个 rdd 行应用一个过滤函数,并且 => 之后的函数应该将该行包含为数组,无论它是否包含该标记 idx 或其他内容。您可能需要调整 token 的数量,但它应该看起来像这样(您是否应该使用 contains 或不包含取决于您是要过滤还是过滤掉:

val filteredRDD = rawRDD
  .filter(rowItem => !(rowItem.map(_.toString).toSeq
  .contains(rowItem.(0).toString)))

或者甚至是这样的:

 val filteredRDD = rdd1.rawRDD(rowItem => !(rowItem._2 contains rowItem._1))

关于apache-spark - Spark : filter out all rows based on key/value,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42452959/

相关文章:

scala - RDD 到 LabeledPoint 的转换

scala - 如何在 Spark 中显示 KeyValueGroupedDataset?

scala - 如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark

python - 在 PySpark 中读取文本文件时有没有办法控制分区数

apache-spark - 我应该使用哪个版本的 hadoop-aws

apache-spark - PySpark 计数在 RDD 中按组区分

python - 如何根据 Pyspark 中的正则表达式条件验证(和删除)列,而无需多次扫描和洗牌?

hadoop - Hive是否可以定期将增量数据追加或插入到hdfs的同一表文件中?

scala - 生产中的 Spark Structured Streaming 检查点使用

scala - 为什么 Writable 的隐式转换不起作用