java - 在JAVA中的spark流中添加最大值和最小值?

标签 java apache-spark spark-streaming

我正在尝试将 max 和 min 添加到 Spark dstream 中的每个 RDD..每个元组。我编写了以下代码,但无法理解如何传递参数 min 和 max 。 谁能建议一种方法来进行这种转变? 我尝试了以下方法:

JavaPairDStream<Tuple2<Long, Integer>, Tuple3<Integer,Long,Long>> sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());

class MinMax implements Function<JavaPairRDD<Tuple2<Long, Integer>, Integer>, JavaPairRDD<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>>>{
    Long max;
    Long min;
    @Override
    public JavaPairRDD<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> call(JavaPairRDD<Tuple2<Long, Integer>, Integer> input) throws Exception {
        JavaPairRDD<Tuple2<Long,Integer>,Tuple3<Integer,Long,Long>> output;
        max = input.max(new CMP1())._1._1;
        min = input.min(new CMP1())._1._1;
        output = input.mapToPair(new maptoMinMax());
        return output   ;
    }
    class maptoMinMax implements PairFunction<Tuple2<Tuple2<Long, Integer>, Integer>, Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> {

        @Override
        public Tuple2<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> call(Tuple2<Tuple2<Long, Integer>, Integer> tuple2IntegerTuple2) throws Exception {
            return new Tuple2<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>>(new Tuple2<Long, Integer>(tuple2IntegerTuple2._1._1,tuple2IntegerTuple2._1._2), new Tuple3<Integer, Long, Long>(tuple2IntegerTuple2._2, max,min));
        }
    }
}

我收到以下错误:本质上似乎找不到 JavaPairRDD 的 min 和 max 函数

15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)
15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000
Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
        at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
        at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
        at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStrea

最佳答案

我们可以使用 rdd.transform 在同一个 RDD 上应用多个操作,以获得每个批处理间隔的结果。我们将此结果添加到每个元组中(根据问题规范)

data.transform{rdd => 
     val mx = rdd.map(x=> (x,x)).reduce{case ((x1,x2),(y1,y2)) => ((x1 min y1), (x2 max y2))}
     rdd.map(elem => (elem,mx))                              
}

这会生成一个 RDD,每个 block 间隔如下(包括 1 到 999 之间的随机数):

(258,(0,998)) (591,(0,998)) ...

Java 版本在语义上是相同的,但由于所有这些 Tuple<...> 对象而更加冗长。

关于java - 在JAVA中的spark流中添加最大值和最小值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30902090/

相关文章:

java - IBM DB2 的 Hibernate Pojo 生成器

java - 最多 K 次交换的最小连续总和的程序是什么

java - 用android的字符串计算

scala - Spark+Scala App如何对云对象存储进行认证授权?

apache-spark - 使用 mapGroupsWithState 为任意状态处理保存的状态在哪里?

apache-spark - 从 google pubsub 到 spark streaming 的数据摄取很慢

java - 类文字在静态方法中究竟是如何计算的?

apache-spark - 在 EMR 上的 PySpark 中运行自定义 Java 类

apache-spark - 在HD Insights上7天后,Spark Streaming作业完全失败,没有错误日志

java - Spark Streaming数据放入HBase的问题