java - Spark Combinebykey JAVA lambda 表达式

标签 java lambda apache-spark

我想使用 lambda 函数来计算 a ( JavaPairRDD<Integer, Double> pairs) 的按键平均值。为此,我开发了以下代码:

java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc = x -> new Tuple2<Double, Integer>(x, 1);

BiFunction<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>  addAndCount = (Tuple2<Double, Integer> x, Double y) -> {  return new Tuple2(x._1()+y, x._2()+1 );   };

BiFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>  combine = (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> {  return new Tuple2(x._1()+y._1(), x._2()+y._2() );   };

JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine);

但是,eclipse 显示此错误:

The method combineByKey(Function<Double,C>, Function2<C,Double,C>, Function2<C,C,C>) in the type JavaPairRDD<Integer,Double> is not applicable for the arguments (Function<Double,Tuple2<Double,Integer>>,
 BiFunction<Tuple2<Double,Integer>,Double,Tuple2<Double,Integer>>, BiFunction<Tuple2<Double,Integer>,Tuple2<Double,Integer>,Tuple2<Double,Integer>>) 

最佳答案

combineByKey 方法需要 org.apache.spark.api.java.function.Function2 而不是 java.util.function.BiFunction。所以要么你写:

java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc =
    x -> new Tuple2<Double, Integer>(x, 1);

Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>  addAndCount = 
    (Tuple2<Double, Integer> x, Double y) -> {  return new Tuple2(x._1()+y, x._2()+1 );   };

Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>  combine = 
    (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> {  return new Tuple2(x._1()+y._1(), x._2()+y._2() );   };

JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = 
    pairs.combineByKey(createAcc, addAndCount, combine);

关于java - Spark Combinebykey JAVA lambda 表达式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28806792/

相关文章:

java - 使用远程调试运行 docker-compose。 Intellij,Java 11

java - java中如何结束一个线程?

java - 验证 Mockito 中同一方法的后续调用的参数

c++ - Lambda 表达式在 if 语句中返回 bool

c++ - 从绑定(bind)与 lambda 函数中获取 operator() 的类型

apache-spark - pyspark:使用 JavaObject StructType

python - 涉及 pyspark 的 CI/CD 测试 - 未设置 JAVA_HOME

java - 套接字编程ConnectException

android - 如何从 Android 中的数据绑定(bind)访问 Kotlin Lambda 函数

apache-spark - 如何在 Apache-Spark 中连接主从? (独立模式)