scala - 示例Spark程序

标签 scala hadoop apache-spark bigdata

嗨,我正在学习Spark和Scala,我有一种情况需要提出Sparkscala代码

输入文件

Name  attr1 attr2 attr3  
John    Y     N    N  
Smith   N     Y    N

预期产量
John  attr1 Y  
John  attr2 N  
John  attr3 N  
Smith attr1 N  
...  
...

我知道如何在Map-Reduce中做到这一点

对于每一行,分别获得Name并遍历attr值,并将输出表示为(Name, attrX Y/N),但是在scala和Spark中,这有点令人困惑,有人可以帮我吗?

最佳答案

假设您已经知道输入属性的数量,并且输入属性由\t分隔,那么您可以这样做:

在Java中

// load data file
JavaRDD<String> file = jsc.textFile(path);

// build header rdd
JavaRDD<String> header = jsc.parallelize(Arrays.asList(file.first()));

// subtract header to have real data
JavaRDD<String> data = file.subtract(header);

// create row rdd
JavaRDD<Row> rowRDD = data.flatMap(new FlatMapFunction<String,Row>(){
    private static final long serialVersionUID = 1L;

    @Override
    public Iterable<Row> call(String line) throws Exception {
        String[] strs = line.split("\t");
        Row r1 = RowFactory.create(strs[0], "Attr1", strs[1]);
        Row r2 = RowFactory.create(strs[0], "Attr2", strs[2]);
        Row r3 = RowFactory.create(strs[0], "Attr3", strs[3]);
        return Arrays.asList(r1,r2,r3);
    }
});

// schema for df
StructType schema = new StructType().add("Name", DataTypes.StringType)
                                    .add("Attr", DataTypes.StringType)
                                    .add("Value", DataTypes.StringType);

DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();

这是输出:
+-----+-----+-----+
| Name| Attr|Value|
+-----+-----+-----+
|Smith|Attr1|    N|
|Smith|Attr2|    Y|
|Smith|Attr3|    N|
| John|Attr1|    Y|
| John|Attr2|    N|
| John|Attr3|    N|
+-----+-----+-----+

Scala和Java类似,您可以轻松地将它们翻译成Scala。

关于scala - 示例Spark程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37724361/

相关文章:

scala - scala 中的 "return"和 "try-catch-finally" block 评估

java - 如何以编程方式停止 Spark 作业执行

scala - 在 spark-shell 中拆分多行

compression - 如何让 Hive 与 bz2 一起工作?

hadoop - 提取HDFS文件夹或文件详细信息

apache-spark - Spark : how to get all configuration parameters

java - Spark结构化流: Current batch is falling behind

dataframe - Spark 中的 Dataframe 合并是否保持顺序?

scala - 如何获取余积的所有可能成员

scala - 在 Scala 中检查 foreach 中的条件