java - 如何根据给定分区过滤 RDD?

标签 java apache-spark partitioning rdd

考虑以下示例:

JavaPairRDD<String, Row> R = input.textFile("test").mapToPair(new PairFunction<String, String, Row>() {
        public Tuple2<String, Row> call(String arg0) throws Exception {
            String[] parts = arg0.split(" ");
            Row r = RowFactory.create(parts[0],parts[1]);
            return new Tuple2<String, Row>(r.get(0).toString(), r);
        }}).partitionBy(new HashPartitioner(20));

上面的代码创建了一个名为 R 的 RDD,它通过对名为“test”的 txt 文件的第一列进行散列而被分成 20 block 。

考虑 test.txt 文件具有以下形式:

...
valueA1 valueB1
valueA1 valueB2
valueA1 valueB3
valueA1 valueB4
... 

在我的上下文中,我有一个已知值,例如 valueA1,我想检索所有其他值。通过使用具有指定值的现有过滤器操作来完成它是微不足道的。但是,我想避免这种情况,因为基本上过滤操作将在整个 RDD 上执行。

假设 hash(valueA1)=3,我只想在分区 3 上执行给定的操作。更一般地说,我有兴趣从 RDD 中删除/选择特定分区并对其执行操作。

从 SPARK API 看来,它似乎是不可能直接实现同样的事情的解决方法吗?

最佳答案

对于单个键,您可以使用lookup 方法:

rdd.lookup("a")

// Seq[Int] = ArrayBuffer(1, 4)

为了高效查找,您需要一个分区的 RDD,例如使用如下所示的 HashPartitioner

如果您只想过滤包含特定键的分区,可以使用 mapPartitionsWithIndex 来完成:

import org.apache.spark.HashPartitioner

val rdd = sc.parallelize(
  Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)
// A particular number is used only to get a reproducible output
)).partitionBy(new HashPartitioner(8))  

val keys = Set("a", "c")
val parts = keys.map(_.## % rdd.partitions.size)

rdd.mapPartitionsWithIndex((i, iter) =>
  if (parts.contains(i)) iter.filter{ case (k, _) => keys.contains(k) }
  else Iterator()
).collect

// Array[(String, Int)] = Array((a,1), (a,4), (c,3))

关于java - 如何根据给定分区过滤 RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33944721/

相关文章:

java - ProcessBuilder导致无法运行程序

hadoop - Hive 中的 RANK OVER 函数

scala - 如何根据 Spark-scala 中的过滤器将数据集分为两部分

python - 如何在分发给 worker 的 Spark 集群上执行任意python代码

indexing - cassandra中分区摘要的内部结构

amazon-web-services - 雅典娜 : Minimize data scanned by query including JOIN operation

java - 如何使用 JAVA 将 XML 字符串添加到现有 XML 文件中?

Java 安卓工作室 : change an attribute of a View inside its own listener

java - 解析 JSON 数据时出现 ArrayIndexOutOfBoundsException 错误

scala - Spark 2.0数据集groupByKey并对操作和类型安全进行划分