hadoop - 如何使用 hadoop 自定义输入格式调整 Spark 应用程序

标签 hadoop mapreduce apache-spark

我的 spark 应用程序使用自定义 hadoop 输入格式处理文件(平均大小为 20 MB),并将结果存储在 HDFS 中。

以下是代码片段。

Configuration conf = new Configuration();


JavaPairRDD<Text, Text> baseRDD = ctx
    .newAPIHadoopFile(input, CustomInputFormat.class,Text.class, Text.class, conf);

JavaRDD<myClass> mapPartitionsRDD = baseRDD
    .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Text, Text>>, myClass>() {
        //my logic goes here
    }

//few more translformations
result.saveAsTextFile(path);

此应用程序为每个文件创建 1 个任务/分区,并在 HDFS 中处理和存储相应的部分文件。

即,对于 10,000 个输入文件,将创建 10,000 个任务,并将 10,000 个零件文件存储在 HDFS 中。

baseRDD 上的 mapPartitions 和 map 操作都为每个文件创建 1 个任务。

所以问题 How to set the number of partitions for newAPIHadoopFile? 建议设置 conf.setInt("mapred.max.split.size", 4); 用于配置分区数。

但是当这个参数被设置时,CPU 被最大限度地利用,并且即使在很长一段时间后也没有一个阶段不启动。

如果我不设置这个参数,那么应用程序将像上面提到的那样成功完成。

如何通过newAPIHadoopFile设置分区数,提高效率?

ma​​pred.max.split.size 选项会发生什么?

============

更新: ma​​pred.max.split.size 选项会发生什么情况?

在我的用例中,文件很小,更改拆分大小选项在这里无关紧要。

有关此 SO 的更多信息:Behavior of the parameter "mapred.min.split.size" in HDFS

最佳答案

只需使用 baseRDD.repartition(<a sane amount>).mapPartitions(...) .这会将生成的操作移动到更少的分区,尤其是当您的文件很小时。

关于hadoop - 如何使用 hadoop 自定义输入格式调整 Spark 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30032248/

相关文章:

database - 如何处理非常大的数据?

hadoop - 使用 hbase.hregion.max.filesize 自动分割 HBase 区域

hadoop - 映射减少作业 : Protobuf related error

java - hive 错误:失败:执行错误,从org.apache.hadoop.hive.ql.exec.mr.MapRedTask返回代码2

hadoop - 如何从容器内部获取 YARN ContainerId?

python - Spark中groupBy的使用

apache-spark - 如何在 Yarn 上配置应用程序驱动程序的自动重启

scala - org.apache.spark.sql.AnalysisException : Reference 'dattim' is ambiguous, 可能是 : dattim#6, event_dattim#55.;

python - 在Hive数据库中匹配两个字段的最有效方法

hadoop - 如何从另一个Java程序以编程方式执行MapReduce jar?