scala - 如何生成具有随机内容和 N 行的 DataFrame?

标签 scala apache-spark spark-dataframe

如何在 Scala 中创建一个具有 100 行和 3 列且随机整数值在 (1, 100) 范围内的 Spark DataFrame?

我知道如何手动创建 DataFrame,但我无法自动化它:

val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3") 

最佳答案

在本地生成数据然后将其并行化是完全没问题的,尤其是在您不必生成大量数据的情况下。

然而,如果你需要生成一个巨大的数据集,你总是可以实现一个 RDD 为你并行执行此操作,如下例所示。

import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD

// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
  def values: Iterator[A] = Iterator.fill(numValues)(random)
}

// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {

  // Based on the item and executor count, determine how many values are
  // computed in each executor. Distribute the rest evenly (if any).
  private val valuesPerSlice = numValues / numSlices
  private val slicesWithExtraItem = numValues % numSlices

  // Just ask the partition for the data
  override def compute(split: Partition, context: TaskContext): Iterator[A] =
    split.asInstanceOf[RandomPartition[A]].values

  // Generate the partitions so that the load is as evenly spread as possible
  // e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
  override protected def getPartitions: Array[Partition] =
    ((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice + 1, random)) ++
      (slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray

}

一旦你有了它,你就可以使用它传递你自己的随机数据生成器来获得 RDD[Int]
val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100) + 1)
rdd.foreach(println)
/*
 * outputs:
 * 30
 * 86
 * 75
 * 20
 * ...
 */

RDD[(Int, Int, Int)]
def rand = scala.util.Random.nextInt(100) + 1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
 * outputs:
 * (33,22,15)
 * (65,24,64)
 * (41,81,44)
 * (58,7,18)
 * ...
 */

当然,您可以将其包装在 DataFrame 中也很容易:
spark.createDataFrame(rdd).show()
/*
 * outputs:
 * +---+---+---+
 * | _1| _2| _3|
 * +---+---+---+
 * |100| 48| 92|
 * | 34| 40| 30|
 * | 98| 63| 61|
 * | 95| 17| 63|
 * | 68| 31| 34|
 * .............
 */

注意在这种情况下,每次 RDD 生成的数据是如何不同的。/DataFrame被执行。通过改变 RandomPartition 的实现要实际存储值而不是即时生成它们,您可以拥有一组稳定的随机项,同时仍保留这种方法的灵活性和可扩展性。

无状态方法的一个很好的特性是您甚至可以在本地生成巨大的数据集。以下几秒钟在我的笔记本电脑上运行:
new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647

关于scala - 如何生成具有随机内容和 N 行的 DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48659259/

相关文章:

pyspark - 将列表列拆分为同一 PySpark 数据框中的多列

scala - Scala 中的多行包声明

scala - 如何为 SBT 设置 -Dsbt.override.build.repos=true 全局?

scala - 使用 Flink 计算流中有状态实体的最新状态

hadoop - 使用 s3 的 spark 加载 json 时 FS 错误

apache-spark - 在 Pyspark 中将时间戳更改为 UTC 格式

apache-spark - 如何 CROSS JOIN 2 数据框?

scala - 以 Codec 表示形式有效打包 Long 列表

apache-spark - 使用 utf-8 字符编码从 hive 中选择数据

scala - Spark : how to zip an RDD with each partition of the other RDD