hadoop - Spark Map Task 内存消耗巨大

标签 hadoop apache-spark rdd

我有很多包含大约 60.000.000 行的文件。我所有文件的格式都是 {timestamp}#{producer}#{messageId}#{data_bytes}\n

我一个一个地浏览我的文件,还想为每个输入文件构建一个输出文件。 因为有些台词依赖于以前的台词,所以我将它们按制作人分组。每当一行依赖于一个或多个先前的行时,它们的生产者总是相同的。 对所有行进行分组后,我将它们交给我的 Java 解析器。 然后,解析器会将所有已解析的数据对象包含在内存中,然后将其输出为 JSON。

为了可视化我认为我的作业是如何处理的,我拼凑了以下“流程图”。请注意,我没有可视化 groupByKey-Shuffeling-Process。
flow graph

我的问题:

  • 我希望 Spark 拆分文件,使用单独的任务处理拆分并将每个任务输出保存到“部分”文件。
  • 但是,我的任务耗尽内存并在完成之前被 YARN 终止:容器因超出内存限制而被 YARN 终止。使用了 7.6 GB 的 7.5 GB 物理内存
  • 我的解析器将所有已解析的数据对象放入内存。我无法更改解析器的代码。
  • 请注意,我的代码适用于较小的文件(例如,两个文件各有 600.000 行,作为我的工作的输入)

我的问题:

  1. 如何确保 Spark 会为我的 map task 中的每个文件拆分创建一个结果? (如果我的任务成功,也许他们会,但我现在永远不会看到输出。)

  2. 我认为我的 map 转换 val lineMap = lines.map ...(请参阅下面的 Scala 代码)会生成分区的 rdd。因此,我希望在调用我的第二个 map 任务之前以某种方式拆分 rdd 的值。

    此外,我认为在此 rdd lineMap 上调用 saveAsTextFile 将生成一个输出任务,该任务在我的每个 map task 完成后运行。如果我的假设是正确的,为什么我的执行者仍然内存不足? Spark 是否正在执行多个(太大)大文件拆分并同时处理它们,从而导致解析器填满内存?

  3. 重新分区 lineMap rdd 以便为我的解析器获取更多(更小)输入是个好主意吗?

  4. 是否有我不知道的额外 reducer 步骤?喜欢在写入文件或类似文件之前聚合结果?


Scala 代码(我省略了不相关的代码部分):

def main(args: Array[String]) {
    val inputFilePath = args(0)
    val outputFilePath = args(1)

    val inputFiles = fs.listStatus(new Path(inputFilePath))
    inputFiles.foreach( filename => {
        processData(filename.getPath, ...)
    }) 
}


def processData(filePath: Path, ...) {
    val lines  = sc.textFile(filePath.toString())
    val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()

    val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
    //each output should be saved separately
    parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)     
}


def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
    val importer = new LogFileImporter(...)
    importer.parseData(values.toIterator.asJava, ...)

    //importer from now contains all parsed data objects in memory that could be parsed 
    //from the given values.  

    val jsonMapper = getJsonMapper(...)
    val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)

    (key, jsonStringData)
}

最佳答案

我通过删除 groupByKey 调用并实现新的 FileInputFormat 和 RecordReader 来解决此问题,以消除行依赖于其他行的限制。现在,我实现了它,以便每个拆分都包含前一个拆分的 50.000 字节开销。这将确保可以正确解析依赖于先前行的所有行。

我现在继续查看前一个拆分的最后 50.000 字节,但只复制实际影响当前拆分解析的行。因此,我最大限度地减少了开销,并且仍然获得了高度并行化的任务。

以下链接将我拖向了正确的方向。因为 FileInputFormat/RecordReader 的主题乍一看很复杂(至少对我来说是这样),所以最好通读这些文章并了解这是否适合您的问题:

ae.be 文章中的相关代码部分,以防网站出现故障。作者 ( @Gurdt ) 使用它来检测聊天消息是否包含转义行返回(通过以“\”结尾的行)并将转义行附加在一起,直到找到未转义的\n。这将允许他检索跨越两行或更多行的消息。用 Scala 编写的代码:

用法

val conf = new Configuration(sparkContext.hadoopConfiguration)
val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat],
classOf[LongWritable], classOf[Text], conf)

文件输入格式

class MyFileInputFormat extends FileInputFormat[LongWritable, Text] {
    override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
    RecordReader[LongWritable, Text] = new MyRecordReader()
}

记录器

class MyRecordReader() extends RecordReader[LongWritable, Text] {
    var start, end, position = 0L
    var reader: LineReader = null
    var key = new LongWritable
    var value = new Text

    override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
        // split position in data (start one byte earlier to detect if
        // the split starts in the middle of a previous record)
        val split = inputSplit.asInstanceOf[FileSplit]
        start = 0.max(split.getStart - 1)
        end = start + split.getLength

        // open a stream to the data, pointing to the start of the split
        val stream = split.getPath.getFileSystem(context.getConfiguration)
        .open(split.getPath)
        stream.seek(start)
        reader = new LineReader(stream, context.getConfiguration)

        // if the split starts at a newline, we want to start yet another byte
        // earlier to check if the newline was escaped or not
        val firstByte = stream.readByte().toInt
        if(firstByte == '\n')
            start = 0.max(start - 1)
        stream.seek(start)

        if(start != 0)
            skipRemainderFromPreviousSplit(reader)
    }

    def skipRemainderFromPreviousSplit(reader: LineReader): Unit = {
        var readAnotherLine = true
        while(readAnotherLine) {
            // read next line
            val buffer = new Text()
            start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
            pos = start

            // detect if delimiter was escaped
            readAnotherLine = buffer.getLength >= 1 && // something was read
            buffer.charAt(buffer.getLength - 1) == '\\' && // newline was escaped
            pos <= end // seek head hasn't passed the split
        }
    }

    override def nextKeyValue(): Boolean = {
        key.set(pos)

        // read newlines until an unescaped newline is read
        var lastNewlineWasEscaped = false
        while (pos < end || lastNewlineWasEscaped) {
            // read next line
            val buffer = new Text
            pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)

            // append newly read data to previous data if necessary
            value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer

            // detect if delimiter was escaped
            lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\'

            // let Spark know that a key-value pair is ready!
            if(!lastNewlineWasEscaped)
                return true
        }

        // end of split reached?
        return false
    }
}

注意:您可能还需要在 RecordReader 中实现 getCurrentKey、getCurrentValue、close 和 getProgress。

关于hadoop - Spark Map Task 内存消耗巨大,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37569635/

相关文章:

performance - 我在哪里可以找到 Spark 中的操作成本?

hadoop - Hive和Sqoop分区

hadoop - 使用 Apache Oozie 编排 Apache Spark

python - 如何在 NiFi 的 ExecuteStreamCommand 处理器中读取文件

hadoop - java中通过Spark存储orc格式

apache-spark - Spark SQL 安全注意事项

scala - 如何在 Scala 中将 RDD 转换为二维数组?

scala - 在 Apache Spark 中,如何按两个共享值对 RDD 的所有行进行分组?

python - 如何为 Spark、Python 设置特定的 Hadoop 版本

hadoop - 在 Hadoop 中读取 BZip2 文件