java - Spark 采样 - 比使用完整的 RDD/DataFrame 快多少

标签 java apache-spark apache-spark-sql

我想知道对 RDD/DF 进行采样时 Spark 的运行时间与完整 RDD/DF 的运行时间相比是多少。我不知道这是否有什么不同,但我目前使用的是 Java + Spark 1.5.1 + Hadoop 2.6。

JavaRDD<Row> rdd = sc.textFile(HdfsDirectoryPath()).map(new Function<String, Row>() {
        @Override
        public Row call(String line) throws Exception {
            String[] fields = line.split(usedSeparator);
            GenericRowWithSchema row = new GenericRowWithSchema(fields, schema);//Assum that the schema has 4 integer columns
            return row;
            }
        });

DataFrame df   = sqlContext.createDataFrame(rdd, schema);
df.registerTempTable("df");
DataFrame selectdf   =  sqlContext.sql("Select * from df");
Row[] res = selectdf.collect();

DataFrame sampleddf  = sqlContext.createDataFrame(rdd, schema).sample(false, 0.1);// 10% of the original DS
sampleddf.registerTempTable("sampledf");
DataFrame selecteSampledf = sqlContext.sql("Select * from sampledf");
res = selecteSampledf.collect();

我预计采样速度最好接近 90%。但对我来说,spark 似乎会遍历整个 DF 或进行计数,这基本上与完整 DF 选择花费的时间几乎相同。生成样本后,执行select。

我的这个假设是否正确,或者是否以错误的方式使用了采样,导致我最终得到两个选择所需的相同运行时间?

最佳答案

I would expect that the sampling is optimally close to ~90% faster.

嗯,有几个原因导致这些期望不切实际:

  • 如果之前没有任何关于数据分布的假设,为了获得统一的样本,您必须执行完整的数据集扫描。当您在 Spark 中使用 sampletakeSample 方法时,或多或少会发生这种情况
  • SELECT * 是一个相对轻量级的操作。根据资源量,您处理单个分区的时间可以忽略不计
  • 采样不会减少分区数量。如果您不合并重新分区,您最终可能会得到大量几乎为空的分区。这意味着资源使用不理想。
  • 虽然 RNG 通常非常高效,但生成随机数并不是免费的

采样至少有两个重要的好处:

  • 降低内存使用量,包括减少垃圾收集器的工作量
  • 在洗牌或收集时需要序列化/反序列化和传输的数据更少

如果您想从采样中获得最大 yield ,那么采样、合并和缓存就有意义。

关于java - Spark 采样 - 比使用完整的 RDD/DataFrame 快多少,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33562263/

相关文章:

Java 复数,3 类

scala - 将 RDD 中的元组分解为两个元组

apache-spark - 从AWS S3读取pyspark文件不起作用

scala - 应用程序中的 Spark 调度 : performance issue

java - 如何在 Java 中使用 SQL(可能使用 LATERAL VIEW)在 JSON 中使用 "expand"多值字段?

java - 如何创建一个新的 SequenceGenerator 来生成唯一值?

java - 如何将特定代码段从 Spark 1.6.2 转换为 Spark 2.2.0?

java - Hadoop + Spark : There are 1 datanode(s) running and 1 node(s) are excluded in this operation

apache-spark - PySpark - 拆分字符串列并将它们的一部分连接起来以形成新列

Java 8 foreach 将子对象添加到新列表