scala - Spark 嵌套转换 SPARK-5063

标签 scala apache-spark rdd

我正在尝试在使用 Spark 时获取特定获胜拍卖时间前后的拍卖列表的过滤列表。 获胜拍卖 RDD 和完整拍卖 DD 由案例类组成,格式如下:
案例类拍卖(id:String,prodID:String,时间戳:Long)

我想过滤在相同的产品 ID 上,在获胜拍卖后 10 秒内发生拍卖的完整拍卖 RDD,并接收包含这些的 RDD。

我尝试像这样过滤它:

val specificmessages = winningauction.map(it =>
  allauctions.filter( x =>
    x.timestamp > it.timestamp - 10 &&
    x.timestamp < it.timestamp + 10 &&
    x.productID == it.productID
  )
)

有没有办法执行此操作,因为嵌套转换是不可能的?

还有另一个答案,但这主要涉及嵌套映射 SPARK-5603 nested map funcitons

最佳答案

尝试查看 cartesian构建新 RDD 并将过滤器应用于它的方法

val specificmessages = allauctions.cartesian(winningauction)
                                  .filter( (x, y) => x.timestamp > y.timestamp - 10 && 
                                               x.timestamp < y.timestamp + 10 && 
                                               x.productID == y.productID )

关于scala - Spark 嵌套转换 SPARK-5063,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31295235/

相关文章:

json - GSON 与 Scala - 无法调用 scala.collection.immutable.Map 的无参数构造函数

hadoop - 从另一个运行 Spark 的 Docker 容器写入在 Docker 中运行的 HDFS

apache-spark - Pyspark - 如何拆分具有 Datetime 类型结构值的列?

python - 如何使用 Spark 和 Caffe 对图像进行分类

scala - 使用 Spark MLib 与 PredictionIO 进行 Play 框架比较

scala-将自类型注释类传递给子对象

pyspark - 如何计算pyspark中每行的字数

python - pyspark RDD countByKey() 是如何计数的?

scala - 将值作为列表附加到 Map

python - TCP 上的 Spark 流式传输