java - CoGrouped RDD 上的 FlatMap 函数

标签 java apache-spark

我正在尝试在具有签名的 cogroupedRDD 上使用平面 map 函数:

JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>>

我的平面图功能如下:

static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
    @Override
    public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        }};

但是我收到编译错误。我确信这一定是一个我无法理解的语法问题。

完整代码:

JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>> coGroupedRDD = rdd1.cogroup(rdd2);
JavaRDD<Row> jd = coGroupedRDD.flatmap(setupF);
 static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
        @Override
        public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        //logic
}};

错误:

The method flatmap(FlatMapFunction<Tuple2<String,Tuple2<Iterable<Row>,Iterable<Row>>>,Row>) is undefined for the type JavaPairRDD<String,Tuple2<Iterable<Row>,Iterable<Row>>>

最佳答案

这里有一个大胆的猜测,也许原因是您针对 Spark 1.6 API 编写代码,但实际上使用 Spark 2.0 依赖项?这两个版本之间的 API 有所不同。

Spark 1.6 API FlatMapFunction方法签名:

Iterable<R> call(T t)

Spark 2.0 API FlatMapFunction方法签名:

Iterator<R> call(T t) 

所以尝试将代码更改为:

new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>, Row>() {
        @Override
        public Iterator<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        //...
        }
};

或使用 Java 8 lambda 版本:

   coGroupedRDD
   .flatMap(t -> {
        List<Row> result = new ArrayList<>();
        //...use t._1, t._2._1, t._2._2 to construct the result list
        return result.iterator();
    });

关于java - CoGrouped RDD 上的 FlatMap 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40038297/

相关文章:

java - JSON - 对象数组中的对象数组

java.lang.NullPointerException : uri - Turning on Location through Alert Dialog

java - 如何将 JFileChooser 限制为目录?

scala - 连接 DataFrame 列的数组元素

java - XML - 如何使用 java 获取子节点数

scala - Spark:如何告诉Spark使用本地hadoop而不是其嵌入式hadoop?

python - 是否可以在 Pyspark 中继承 DataFrame?

java - Spark-Shell:org.apache.spark.SparkException:任务无法序列化

python - 如何在 PySpark 中分别对多个列进行透视

java - 如何使Java程序成为平台无关的exe?