java - 如何使用apache-spark快速转换多节点上的大规模数据?

标签 java apache-spark

我是 Spark 新手。因此,我正在努力训练自己,以便通过一些教程和小任务来熟悉 Spark 和分布式系统。 在一些任务中,下面是让我在这里提出问题以找到更好的解决方案的任务。

下面是我的示例数据。

EEL    A    CAT    LOC1    EEX    13|42|45|67
EEL    A    CAT    LOC2    EEX    24|32
....

My goal is to transform above data as below.

EEL    A    CAT    LOC1    EEX    13
EEL    A    CAT    LOC1    EEX    42
EEL    A    CAT    LOC1    EEX    45
EEL    A    CAT    LOC1    EEX    67
EEL    A    CAT    LOC2    EEX    24
EEL    A    CAT    LOC2    EEX    32

I made my codes as below and it worked well for the data size of under 10MB. However, if I input a large data set (more than 3GB), it took about 10~15mins to finish on the multi-nodes platform.

Note that I am using one spark-master node and four-data nodes which have 16-cores and 16GB memory size respectively.

Considering my computer specifications, I think that this simple transformation tasks shouldn't take too much time and I also think that there is a more efficient way to reduce the running time.

I know that spark is based on the in-memory systems and I strongly believe that it helps me handles such an iterative and simple tasks easily,but the results looked a little bit disappointed to me.

Below is my full code. I skipped the detailed explanation because it is very straightforward.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;


public class DataTransformation {
    public static JavaRDD<String> DataTransformation_V01(String input,JavaSparkContext sc){
        JavaRDD<String> lines = sc.textFile(input);

        JavaRDD<String> line = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split("\n")).iterator();
            }
        });


        JavaRDD<String> newLine = line.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                List<String> ret = new ArrayList<String>();
                List<String> ls = Arrays.asList(s.split("\t"));

                String values = ls.get(ls.size()-1);
                List<String> value = Arrays.asList(values.split("\\|"));

                for(int i=0;i<value.size();++i){
                    String ns = ls.get(0)+"\t"+ls.get(1)+"\t"+ls.get(2)+"\t"+ls.get(3)+"\t"+ls.get(4)+"\t"+ls.get(5);
                    ns = ns + "\t" + value.get(i);
                    ret.add(ns);
                }
                return ret.iterator();
            }
        });
        return newLine;
    }



    public static void main(String[] args) throws Exception{
        String inputFile = args[0];
        String outputFile = args[1];
        SparkConf conf = new SparkConf().setAppName("Data Transformation")
                .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaPairRDD<String,String> result = DataTransformation_V01(inputFile,sc);

        System.out.println(result.count());
        result.saveAsTextFile(outputFile);

        sc.stop();
        sc.close();
    }
} 

谢谢。

最佳答案

尝试使用 Dataframes API 而不是 RDD 来进行此计算。以下代码片段(使用 scala 简洁)将满足您的要求。

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> val df  = sc.textFile("/user/test/files/spark_test.txt").map(_.split(',')).map(x => (x(0),x(1),x(2),x(3),x(4),x(5))).toDF("c1","c2","c3","c4","c5","c6")
df: org.apache.spark.sql.DataFrame = [c1: string, c2: string, c3: string, c4: string, c5: string, c6: string]

scala> df.explode("c6","c7")((x:String) => x.split('|')).drop("c6").show()
{"level": "INFO ", "timestamp": "2017-01-30 01:13:09,138", "classname": "com.hadoop.compression.lzo.GPLNativeCodeLoader", "body": "Loaded native gpl library"}
{"level": "INFO ", "timestamp": "2017-01-30 01:13:09,141", "classname": "com.hadoop.compression.lzo.LzoCodec", "body": "Successfully loaded & initialized native-lzo library [hadoop-lzo rev 2cedc48fab9e2e10a84b909b4c198053ff379ac7]"}
+---+---+---+----+---+---+
| c1| c2| c3|  c4| c5| c7|
+---+---+---+----+---+---+
|EEL|  A|CAT|LOC1|EEX| 13|
|EEL|  A|CAT|LOC1|EEX| 42|
|EEL|  A|CAT|LOC1|EEX| 45|
|EEL|  A|CAT|LOC1|EEX| 67|
|EEL|  A|CAT|LOC2|EEX| 24|
|EEL|  A|CAT|LOC2|EEX| 32|
+---+---+---+----+---+---+

我假设您有如下所示的 csv 文件

EEL,A,CAT,LOC1,EEX,13|42|45|67
EEL,A,CAT,LOC2,EEX,24|32

Dataframes 比 RDD 性能更高,因为它可以利用 Catalyst 优化器并避免 JVM 对象序列化/反序列化以及相关的 GC 开销。 更多详细信息请参阅 here .

其他明显的性能考虑因素是使用 Parquet 文件而不是纯文本文件。由于 parquet 文件是柱状的并且具有更好的压缩性......这将导致 IO 显着减少。此外,您可以将 parquet 文件直接读入数据帧,并完全跳过昂贵的 RDD 阶段。

关于java - 如何使用apache-spark快速转换多节点上的大规模数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41925045/

相关文章:

java - 关闭liferay portlet中的jdbc资源

python - read_csv() 中的 S3 阅读器是先将文件下载到磁盘还是使用流式传输?

Java Syslog 仅使用 logback 记录到本地计算机

java - 从 sql 代码自动生成 java hibernate 代码

java - hashMap转Json文件的方法

java - Apache Spark Java API + Twitter4j + 将 Twitter 流保存到 Elasticsearch 时出现异常

java - 如何解析数据集apache spark java中的多行json

apache-spark - 如何将行rdd转换为类型化rdd

json - 缺少字段的 Spark JSON 中的 scala.MatchError : [abc, cde,null,3](属于 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

java - 在 Eclipse Maven 项目中更改动态 Web 模块版本