java - 如何使用 foreach 迭代 JavaRDD 并使用 spark java 从每一行中查找特定元素

标签 java apache-spark

我的文本文件中有这些行:

Some different lines....

Name : Praveen  
Age : 24  
Contact : 1234567890  
Location : India  

Some different lines....

Name : John  
Contact : 1234567890  
Location : UK  

Some different lines....  

Name : Joe  
Age : 54  
Contact : 1234567890  
Location : US  

一些不同的行表示中间有一些其他信息。

现在我需要读取文件并提取人员信息。如果缺少任何键,则应将其读取为空字符串(第二人称信息中缺少年龄)。

JavaRDD<String> data = jsc.textFile("person.report");

List<String> name = data.filter(f -> f.contains("Name")).collect();
List<String> age = data.filter(f -> f.contains("Age")).collect();
List<String> contact = data.filter(f -> f.contains("Contact")).collect();
List<String> location = data.filter(f -> f.contains("Location")).collect();

当我以上述方式执行并迭代 for 循环时,第 3 个人的年龄将分配给第 2 个人。

最佳答案

首先,您正在收集驱动程序上的所有内容,您确定这是您想要做的吗?它不适用于大数据集...

基本上,您的问题是您认为是记录的内容不在一行中。默认情况下,spark 将每一行视为一条单独的记录。然而在这里,您的记录有几行(姓名、年龄、位置……)。为了克服这个问题,您需要找到另一个分隔符。如果在“一些不同的行”中,有一个特定的字符串,使用它并设置这个属性:

sc.hadoopConfiguration.set("textinputformat.record.delimiter","specific string")

然后你可以这样写:

val cols = Seq("Name","Age", "Contact", "Location")
sc.textFile("...")
  .map( _.split("\n"))
  .map(x => cols
       .map( col => x.find(_.startsWith(col)).getOrElse(col+" :") ) )

一个人对应的所有行都会在同一条记录中,供您随意处理。如果您找不到任何合适的分隔符,您的记录可能都有一个名称,因此您可以使用“名称:”。

在 java8 中,你可以使用流以相同的方式实现它。这有点冗长,但由于问题是针对 java 提出的,所以你去吧:

String[] array = {"Name", "Age", "Contact", "Location"};
List<String> list = Arrays.asList(array);
sc.textFile("...")
    .map(x -> Arrays.asList(x.split("\n")))
    .map(x -> list.stream()
                  .map(col -> x.stream()
                               .filter(line -> line.startsWith(col))
                               .findAny()
                               .orElse(col+" :"))
                  .collect(Collectors.toList()) );

关于java - 如何使用 foreach 迭代 JavaRDD 并使用 spark java 从每一行中查找特定元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47807798/

相关文章:

javascript - Restcall 中的 406 Not Acceptable 错误

java - 双变量加减 12%

Java 同步列表

java - 在 POJO 中使用 EJB 注入(inject)

apache-spark - 将字符串转换为 int null 问题

apache-spark - 根据 pyspark 条件使用其他列值覆盖列值

java - IBM BlueMix | IBM BlueMix | IBM BlueMix同时添加 jsf : The application or context root for this request has not been found: ./?

scala - Spark : How to join two `Dataset` s A and B with the condition that an ID array column of A does NOT contain the ID column of B?

scala - 如何进行时间序列简单预测?

python - 如何查找 Pyspark 中列中值最大的行名称