java - javardd中如何通过header进行过滤?

标签 java apache-spark rdd filterfunction

我正在编写一个 JavaRDD 代码,我必须将 csv 上传到名为 RestaurantDetailRDDJavaRDD 中。 RestaurantDetailRDD 有一个地址列,必须将其过滤到另一个名为 addressRDDRDD 中。我只需要过滤条件,可以通过 csv 中提供的标题拆分地址列。

// provide path to input text file
String path = "/home/lingesh/Downloads/newitems.csv";

// read text file to RDD
JavaRDD<String> restaurantDetailRDD = sc.textFile(path);

// collect RDD for printing
for(String line:restaurantDetailRDD.collect()){
    System.out.println(line);
}

如您所见,我刚刚创建了 RestaurantDetailRDD

我希望地址列放置在不同的RDD

最佳答案

如果您知道address列的位置,则只需执行map函数即可将RDD转换为另一个RDD。

JavaRDD<String> columnRdd = rdd.map(f -> {

    String[] arr = f.split(",");
    return arr[position];
});

System.out.println("new count " + columnRdd.count());

这种方式更好,因为您使用的是 Spark 函数,这意味着您可以处理 Spark 分区并使计算速度更快。在您确实需要打印结果进行测试之前,不要尝试使用基本的 java 函数。

关于java - javardd中如何通过header进行过滤?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57125232/

相关文章:

java - 如何根据Key从PairRDD中获取新的RDD

apache-spark - 在数据帧上使用partitionBy时出现dataproc警告

python - 使用 groupby 或 aggregate 合并 RDD 或 DataFrame 中每个事务中的项目来做 FP-growth

python - PySpark - 使用共享相同值的两个键创建对 RDD

java - 错误 :(13, 28) java : package com. aliasi.tokenizer 不存在

java - Neo4j 自动升级选项

java - 为什么将布局设置为 BorderLayout 意味着永远不会调用 paintComponent

java - 确定所有 Java 发行版中的主要 Java 版本

python-3.x - 从 Spark RDD 中保存的数据中清除无效字符

apache-spark - kmean如何计算不同分区的数据?