java - 根据另一个 RDD 的值聚合一个 RDD Spark (Java)

标签 java apache-spark transformation aggregation rdd

我有两个包含时间信息的 RDD。 RDD 被分成不同的分区。 一种是以下形式

16:00:00
16:00:18
16:00:25
16:01:01
16:01:34
16:02:12
16:02:42
...

另一个包含 tuple2 形式的时间跨度

<16:00:00, 16:00:59>
<16:01:00, 16:01:59>
<16:02:00, 16:02:59>
...

我需要聚合第一个和第二个 RDD,通过根据第二个 RDD 中的值聚合第一个 RDD 的值,以获得类似的结果

<<16:00:00, 16:00:59>, [16:00:00,16:00:18,16:00:25]>
<<16:01:00, 16:01:59>, [16:01:01,16:01:34]>
<<16:02:00, 16:02:59>, [16:02:12,16:02:42]>
...

或者,类似的东西

<<16:00:00, 16:00:59>, 16:00:00>
<<16:00:00, 16:00:59>, 16:00:18>
<<16:00:00, 16:00:59>, 16:00:25>
<<16:01:00, 16:01:59>, 16:01:01>
<<16:01:00, 16:01:59>, 16:01:34>
<<16:02:00, 16:02:59>, 16:02:12>
<<16:02:00, 16:02:59>, 16:02:42>
...

我正在尝试使用整个 Spark 转换函数,但我很难找到一个适用于如此不同性质的 RDD 的函数。我知道我可能会选择 cartesian产品,然后过滤,但我想要一个“更好”的解决方案。我试过zipPartition ,这可能有效,但我的分区可能不一致,例如16:00:00可能最终会出现在不存在相应聚合值(元组 <16:00:00, 16:00:59> )的分区中。 处理这个问题的最佳方法是什么?

PS:我使用的是 Java,但也欢迎使用 Scala 解决方案。 谢谢

最佳答案

我已将下面的内容简化为使用整数,但我相信同样的操作可以多次完成。虽然示例是用 Scala 编写的,但我怀疑这一切也可以用 Java 完成。

如果范围是规则的,我会将“值”RDD 转换为范围,值,然后进行简单的连接。

val values = Seq(1, 5, 10, 14, 20)
val valuesRdd = sc.parallelize(values, 2)
valuesRdd.map(x => (((x/10)*10, ((x/10)*10)+9), x)).collect

但是,如果范围不规则,则:

如果您不介意使用 DataFrame,那么可以选择使用用户定义函数根据 V 是否在给定范围内创建列并加入该列。

case class Range(low : Int, high :Int)
val ranges = Seq( Range(0,9), Range(10,19), Range(20,29));
val rangesDf = sc.parallelize(ranges, 2).toDF

case class Value(value : Int)
val values = Seq(Value(1), Value(5), Value(10), Value(14), Value(20))
val valuesDf = sc.parallelize(values, 2).toDF

val inRange = udf{(v: Int, low: Int, high : Int) => v >= low && v<= high}

rangesDf.join(valuesDf, inRange(valuesDf("value"), rangesDf("low"), rangesDf("high"))).show

下一个选项是分解范围并加入分解版本:

val explodedRange = rangesRdd.map(x => (x, List.range(x._1, x._2 + 1))).flatMap( { case (range, lst) => lst.map { x => (x, range)} })
val valuesRdd = sc.parallelize(values, 2).map(x => (x,true))
valuesRdd.join(explodedRange).map(x => (x._2._2, x._1)).collect

关于java - 根据另一个 RDD 的值聚合一个 RDD Spark (Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35577940/

相关文章:

java - driver.quit后如何打开chromedriver的新 session ?

java - jpql SUM 一列

java - 使用 Spring Security 3.2、Spring Ldap 2.0 和 JavaConfig 的 Active Directory 身份验证

scala - Spark-Shell 和 scala CLI 客户端有什么区别吗?

scala - 在spark scala 中将行合并为单个struct 列存在效率问题,我们如何做得更好?

scala - EMR 上的 Spark 日志在哪里?

javascript - js游戏中的相机,跟随玩家

java - 使新选项卡的 Java session 无效

r - 将社区数据转换为纯素包的宽格式

java - 链接多个 Transformations.switchMap 关闭一个 LiveData 源