我正在尝试将 max 和 min 添加到 Spark dstream 中的每个 RDD..每个元组。我编写了以下代码,但无法理解如何传递参数 min 和 max 。 谁能建议一种方法来进行这种转变? 我尝试了以下方法:
JavaPairDStream<Tuple2<Long, Integer>, Tuple3<Integer,Long,Long>> sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());
class MinMax implements Function<JavaPairRDD<Tuple2<Long, Integer>, Integer>, JavaPairRDD<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>>>{
Long max;
Long min;
@Override
public JavaPairRDD<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> call(JavaPairRDD<Tuple2<Long, Integer>, Integer> input) throws Exception {
JavaPairRDD<Tuple2<Long,Integer>,Tuple3<Integer,Long,Long>> output;
max = input.max(new CMP1())._1._1;
min = input.min(new CMP1())._1._1;
output = input.mapToPair(new maptoMinMax());
return output ;
}
class maptoMinMax implements PairFunction<Tuple2<Tuple2<Long, Integer>, Integer>, Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> {
@Override
public Tuple2<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> call(Tuple2<Tuple2<Long, Integer>, Integer> tuple2IntegerTuple2) throws Exception {
return new Tuple2<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>>(new Tuple2<Long, Integer>(tuple2IntegerTuple2._1._1,tuple2IntegerTuple2._1._2), new Tuple3<Integer, Long, Long>(tuple2IntegerTuple2._2, max,min));
}
}
}
我收到以下错误:本质上似乎找不到 JavaPairRDD 的 min 和 max 函数
15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)
15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000
Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStrea
最佳答案
我们可以使用 rdd.transform 在同一个 RDD 上应用多个操作,以获得每个批处理间隔的结果。我们将此结果添加到每个元组中(根据问题规范)
data.transform{rdd =>
val mx = rdd.map(x=> (x,x)).reduce{case ((x1,x2),(y1,y2)) => ((x1 min y1), (x2 max y2))}
rdd.map(elem => (elem,mx))
}
这会生成一个 RDD,每个 block 间隔如下(包括 1 到 999 之间的随机数):
(258,(0,998)) (591,(0,998)) ...
Java 版本在语义上是相同的,但由于所有这些 Tuple<...> 对象而更加冗长。
关于java - 在JAVA中的spark流中添加最大值和最小值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30902090/