我的文本文件中有这些行:
Some different lines....
Name : Praveen
Age : 24
Contact : 1234567890
Location : India
Some different lines....
Name : John
Contact : 1234567890
Location : UK
Some different lines....
Name : Joe
Age : 54
Contact : 1234567890
Location : US
一些不同的行表示中间有一些其他信息。
现在我需要读取文件并提取人员信息。如果缺少任何键,则应将其读取为空字符串(第二人称信息中缺少年龄)。
JavaRDD<String> data = jsc.textFile("person.report");
List<String> name = data.filter(f -> f.contains("Name")).collect();
List<String> age = data.filter(f -> f.contains("Age")).collect();
List<String> contact = data.filter(f -> f.contains("Contact")).collect();
List<String> location = data.filter(f -> f.contains("Location")).collect();
当我以上述方式执行并迭代 for 循环时,第 3 个人的年龄将分配给第 2 个人。
最佳答案
首先,您正在收集驱动程序上的所有内容,您确定这是您想要做的吗?它不适用于大数据集...
基本上,您的问题是您认为是记录的内容不在一行中。默认情况下,spark 将每一行视为一条单独的记录。然而在这里,您的记录有几行(姓名、年龄、位置……)。为了克服这个问题,您需要找到另一个分隔符。如果在“一些不同的行”中,有一个特定的字符串,使用它并设置这个属性:
sc.hadoopConfiguration.set("textinputformat.record.delimiter","specific string")
然后你可以这样写:
val cols = Seq("Name","Age", "Contact", "Location")
sc.textFile("...")
.map( _.split("\n"))
.map(x => cols
.map( col => x.find(_.startsWith(col)).getOrElse(col+" :") ) )
一个人对应的所有行都会在同一条记录中,供您随意处理。如果您找不到任何合适的分隔符,您的记录可能都有一个名称,因此您可以使用“名称:”。
在 java8 中,你可以使用流以相同的方式实现它。这有点冗长,但由于问题是针对 java 提出的,所以你去吧:
String[] array = {"Name", "Age", "Contact", "Location"};
List<String> list = Arrays.asList(array);
sc.textFile("...")
.map(x -> Arrays.asList(x.split("\n")))
.map(x -> list.stream()
.map(col -> x.stream()
.filter(line -> line.startsWith(col))
.findAny()
.orElse(col+" :"))
.collect(Collectors.toList()) );
关于java - 如何使用 foreach 迭代 JavaRDD 并使用 spark java 从每一行中查找特定元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47807798/