scala - 断言 RDD 未排序

标签 scala unit-testing apache-spark scalatest

我有一个方法叫 split接受一个 RDD[T] 和一个 splitSize 并返回一个 Array[RDD[T]]。
现在,我为它编写的测试用例之一应该验证该函数是否也会随机打乱 RDD。
于是我创建了一个排序后的RDD,然后查看结果:

  it should "randomize shuffle" in {
    val inputRDD = sc.parallelize((0 until 16))
    val result = RDDUtils.split(inputRDD, 2)

    result.foreach(rdd => {
      rdd.collect.foreach(println)
    })

    // Asset result is not sorted
  }
如果结果是:
0
1
2
3
..
15
然后它没有按预期工作。
一个好的结果可能是这样的:
11
3
9
14
...
1
6
如何断言输出 Array[RDD[T]]] 未排序?

最佳答案

你可以试试这样的

val resultOrder = result.sortBy(....)
assert(!resultOrder.sameElements(result))
或者
val resultOrder = result.sortBy(....)
assert(!resultOrder.toList == result.toList)
需要注意的是,关键是要知道如何对 Array 进行排序。对于整数数据类型,这很容易,但对于复杂数据类型,您可能需要 隐式排序 为您的数据类型。例如:
implicit val ordering: Ordering[T] =
    Ordering.fromLessThan[T]((sa: T, sb: T) => sa < sb)

// OR

implicit val ordering: Ordering[MyClass] =
    Ordering.fromLessThan[MyClass]((sa: MyClass, sb: MyClass) => sa.field1 < sb.field1)
确切的代码将取决于您的数据类型。
作为一个完整的例子
package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object SortArrayRDD {

  val spark = SparkSession
    .builder()
    .appName("SortArrayRDD")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","SortArrayRDD") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {
    try {

      Logger.getRootLogger.setLevel(Level.ERROR)

      val arrRDD: Array[RDD[Int]] = Array(sc.parallelize(List(2,3)),sc.parallelize(List(10,11)),sc.parallelize(List(6,7)),sc.parallelize(List(8,9)),
        sc.parallelize(List(4,5)),sc.parallelize(List(0,1)),sc.parallelize(List(12,13)),sc.parallelize(List(14,15)))
      val aux = arrRDD

      implicit val ordering: Ordering[RDD[Int]] = Ordering.fromLessThan[RDD[Int]]((sa: RDD[Int], sb: RDD[Int]) => sa.sum() < sb.sum())

      aux.sorted.foreach(rdd => println(rdd.collect().mkString(",")))

      val resultOrder = aux.sorted

      assert(!resultOrder.sameElements(arrRDD))
      println("It's unordered")
    } finally {
      sc.stop()
    }
  }
}

关于scala - 断言 RDD 未排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63629961/

相关文章:

scala - 如何编写绑定(bind)集合类型和元素类型的通用 Scala 增强方法?

scala - 在 Scala 中使用 Future 和 Promise 取消

scala - minBy[B](f : ((A, B)) ⇒ B)(隐式cmp : Ordering[B]): (A, B)的类型参数

c# - 将 NUnit 迁移/升级到 Team 系统

java - 在 flink YARN 集群作业中使用 JNI

javascript - 如何在 Vue.js 中测试计算属性?无法模拟 "data"

java - Apache Spark GraphX java.lang.ArrayIndexOutOfBoundsException

apache-spark - 使用 yarn 运行Spark的步骤

scala - 在编译时验证 Scala 案例类

java - 单元测试应该使用绝对值吗?