我有很多包含大约 60.000.000 行的文件。我所有文件的格式都是 {timestamp}#{producer}#{messageId}#{data_bytes}\n
我一个一个地浏览我的文件,还想为每个输入文件构建一个输出文件。 因为有些台词依赖于以前的台词,所以我将它们按制作人分组。每当一行依赖于一个或多个先前的行时,它们的生产者总是相同的。 对所有行进行分组后,我将它们交给我的 Java 解析器。 然后,解析器会将所有已解析的数据对象包含在内存中,然后将其输出为 JSON。
为了可视化我认为我的作业是如何处理的,我拼凑了以下“流程图”。请注意,我没有可视化 groupByKey
-Shuffeling-Process。
我的问题:
- 我希望 Spark 拆分文件,使用单独的任务处理拆分并将每个任务输出保存到“部分”文件。
- 但是,我的任务耗尽内存并在完成之前被 YARN 终止:
容器因超出内存限制而被 YARN 终止。使用了 7.6 GB 的 7.5 GB 物理内存
- 我的解析器将所有已解析的数据对象放入内存。我无法更改解析器的代码。
- 请注意,我的代码适用于较小的文件(例如,两个文件各有 600.000 行,作为我的工作的输入)
我的问题:
如何确保 Spark 会为我的 map task 中的每个文件拆分创建一个结果? (如果我的任务成功,也许他们会,但我现在永远不会看到输出。)
我认为我的 map 转换
val lineMap = lines.map ...
(请参阅下面的 Scala 代码)会生成分区的 rdd。因此,我希望在调用我的第二个 map 任务之前以某种方式拆分 rdd 的值。此外,我认为在此 rdd
lineMap
上调用 saveAsTextFile 将生成一个输出任务,该任务在我的每个 map task 完成后运行。如果我的假设是正确的,为什么我的执行者仍然内存不足? Spark 是否正在执行多个(太大)大文件拆分并同时处理它们,从而导致解析器填满内存?重新分区
lineMap
rdd 以便为我的解析器获取更多(更小)输入是个好主意吗?是否有我不知道的额外 reducer 步骤?喜欢在写入文件或类似文件之前聚合结果?
Scala 代码(我省略了不相关的代码部分):
def main(args: Array[String]) {
val inputFilePath = args(0)
val outputFilePath = args(1)
val inputFiles = fs.listStatus(new Path(inputFilePath))
inputFiles.foreach( filename => {
processData(filename.getPath, ...)
})
}
def processData(filePath: Path, ...) {
val lines = sc.textFile(filePath.toString())
val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()
val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
//each output should be saved separately
parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)
}
def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
val importer = new LogFileImporter(...)
importer.parseData(values.toIterator.asJava, ...)
//importer from now contains all parsed data objects in memory that could be parsed
//from the given values.
val jsonMapper = getJsonMapper(...)
val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)
(key, jsonStringData)
}
最佳答案
我通过删除 groupByKey 调用并实现新的 FileInputFormat 和 RecordReader 来解决此问题,以消除行依赖于其他行的限制。现在,我实现了它,以便每个拆分都包含前一个拆分的 50.000 字节开销。这将确保可以正确解析依赖于先前行的所有行。
我现在继续查看前一个拆分的最后 50.000 字节,但只复制实际影响当前拆分解析的行。因此,我最大限度地减少了开销,并且仍然获得了高度并行化的任务。
以下链接将我拖向了正确的方向。因为 FileInputFormat/RecordReader 的主题乍一看很复杂(至少对我来说是这样),所以最好通读这些文章并了解这是否适合您的问题:
- https://hadoopi.wordpress.com/2013/05/27/understand-recordreader-inputsplit/
- http://www.ae.be/blog-en/ingesting-data-spark-using-custom-hadoop-fileinputformat/
ae.be 文章中的相关代码部分,以防网站出现故障。作者 ( @Gurdt ) 使用它来检测聊天消息是否包含转义行返回(通过以“\”结尾的行)并将转义行附加在一起,直到找到未转义的\n。这将允许他检索跨越两行或更多行的消息。用 Scala 编写的代码:
用法
val conf = new Configuration(sparkContext.hadoopConfiguration)
val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat],
classOf[LongWritable], classOf[Text], conf)
文件输入格式
class MyFileInputFormat extends FileInputFormat[LongWritable, Text] {
override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
RecordReader[LongWritable, Text] = new MyRecordReader()
}
记录器
class MyRecordReader() extends RecordReader[LongWritable, Text] {
var start, end, position = 0L
var reader: LineReader = null
var key = new LongWritable
var value = new Text
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
// split position in data (start one byte earlier to detect if
// the split starts in the middle of a previous record)
val split = inputSplit.asInstanceOf[FileSplit]
start = 0.max(split.getStart - 1)
end = start + split.getLength
// open a stream to the data, pointing to the start of the split
val stream = split.getPath.getFileSystem(context.getConfiguration)
.open(split.getPath)
stream.seek(start)
reader = new LineReader(stream, context.getConfiguration)
// if the split starts at a newline, we want to start yet another byte
// earlier to check if the newline was escaped or not
val firstByte = stream.readByte().toInt
if(firstByte == '\n')
start = 0.max(start - 1)
stream.seek(start)
if(start != 0)
skipRemainderFromPreviousSplit(reader)
}
def skipRemainderFromPreviousSplit(reader: LineReader): Unit = {
var readAnotherLine = true
while(readAnotherLine) {
// read next line
val buffer = new Text()
start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
pos = start
// detect if delimiter was escaped
readAnotherLine = buffer.getLength >= 1 && // something was read
buffer.charAt(buffer.getLength - 1) == '\\' && // newline was escaped
pos <= end // seek head hasn't passed the split
}
}
override def nextKeyValue(): Boolean = {
key.set(pos)
// read newlines until an unescaped newline is read
var lastNewlineWasEscaped = false
while (pos < end || lastNewlineWasEscaped) {
// read next line
val buffer = new Text
pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
// append newly read data to previous data if necessary
value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer
// detect if delimiter was escaped
lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\'
// let Spark know that a key-value pair is ready!
if(!lastNewlineWasEscaped)
return true
}
// end of split reached?
return false
}
}
注意:您可能还需要在 RecordReader 中实现 getCurrentKey、getCurrentValue、close 和 getProgress。
关于hadoop - Spark Map Task 内存消耗巨大,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37569635/