我正在尝试在具有签名的 cogroupedRDD 上使用平面 map 函数:
JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>>
我的平面图功能如下:
static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
@Override
public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
}};
但是我收到编译错误。我确信这一定是一个我无法理解的语法问题。
完整代码:
JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>> coGroupedRDD = rdd1.cogroup(rdd2);
JavaRDD<Row> jd = coGroupedRDD.flatmap(setupF);
static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
@Override
public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
//logic
}};
错误:
The method flatmap(FlatMapFunction<Tuple2<String,Tuple2<Iterable<Row>,Iterable<Row>>>,Row>) is undefined for the type JavaPairRDD<String,Tuple2<Iterable<Row>,Iterable<Row>>>
最佳答案
这里有一个大胆的猜测,也许原因是您针对 Spark 1.6 API 编写代码,但实际上使用 Spark 2.0 依赖项?这两个版本之间的 API 有所不同。
Spark 1.6 API FlatMapFunction方法签名:
Iterable<R> call(T t)
Spark 2.0 API FlatMapFunction方法签名:
Iterator<R> call(T t)
所以尝试将代码更改为:
new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>, Row>() {
@Override
public Iterator<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
//...
}
};
或使用 Java 8 lambda 版本:
coGroupedRDD
.flatMap(t -> {
List<Row> result = new ArrayList<>();
//...use t._1, t._2._1, t._2._2 to construct the result list
return result.iterator();
});
关于java - CoGrouped RDD 上的 FlatMap 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40038297/