r - 使用 R 的非常慢/卡住的 hadoop 流式传输

标签 r hadoop mapreduce hadoop-streaming

我正在尝试使用 R 编写自定义 map-reduce。这是我的映射器函数:

#! /usr/bin/env Rscript

input <- file("stdin", "r")

while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
  # in case of empty lines
  if(nchar(line) == 0) break

  # split line into data
  data = unlist(strsplit(line, ","))

  # output scores with cat()
  cat(data[2],"|",data[3],"|",data[4]
      ,"\t"  # reduce key followed by tab
      ,paste(data[1],paste(unlist(data[5:length(data)]),collapse=","),sep = ",")    # all other fields separated by commas
      ,"\n",sep='') # line break
}

close(input)

所以基本上3列的组合是我在这里的关键;其余列将有值(value)。一旦我在单个 reducer 节点中获得了属于特定键的所有数据,那么这些数据将由下面的 reducer 代码处理:
first_line <- TRUE
first_time <- TRUE
prev_id <- ""
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
  if(nchar(line) == 0) break

  if(first_time == TRUE){
    first_time = FALSE
    next
  } 

  id <- unlist(strsplit(line,"\t"))[1]
  data0 <- unlist(strsplit(line,"\t"))[2]

  data1 = data.frame(t(unlist(strsplit(data0, ","))),stringsAsFactors=FALSE)

  colnames(data1) = c('ITEM_I','BOH','EOH','LATTD_I','LNGTD_I')

  data1$DEPT = strsplit(id,"\\|")[[1]][1]
  data1$CLAS = strsplit(id,"\\|")[[1]][2]
  data1$SBCL = strsplit(id,"\\|")[[1]][3]

  if(prev_id==id | first_line==T){
    if(!exists("base_data")){
      base_data <- rbind(data1)
      first_line <- F
    }else{
      base_data <- rbind(base_data,data1)
    }
  }else{
    if(!exists("results")){
      results <- BuildDTnProcess(base_data)
      base_data <- rbind(data1)
    }else{
      results <- rbind(results,BuildDTnProcess(base_data))
      base_data <- data1
    }
  }
  prev_id <- id
}
close(input)

if(!exists("results")){
  results <- BuildDTnProcess(base_data)
}else{
  results <- rbind(results,BuildDTnProcess(base_data))
}
base_data <- NULL

因此,我试图将属于单个键的所有记录堆积到一个数据框中(以及每当出现新键时启动一个新数据框)。然后将此数据传递给函数 BuildDTnProcess ,该函数将执行一些操作以完成由单个关键观察组成的数据帧;结果将存储在结果中。

我观察到这段代码卡住了几天然后被杀死。所以我开始一一添加代码块来识别瓶颈。我已经确定直到 data1$MDSE_SBCL_REF_I = strsplit(id,"\\|")[[1]][3]代码运行良好,但当我添加
if(prev_id==id | first_line==T){
        if(!exists("base_data")){
          base_data <- rbind(data1)
          first_line <- F
        }else{
          base_data <- rbind(base_data,data1)
        }
      }

然后它变得很慢。在日志中(在 20 分钟内完成运行)

2016-05-11 14:57:26,160 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=200000/0/0 in:1169=200000/171 [rec/s] out:0=0/171 [rec/s] 2016-05-11 14:58:47,346 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=300000/0/0 in:1185=300000/253 [rec/s] out:0=0/253 [rec/s] 2016-05-11 15:00:09,503 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=400000/0/0 in:1194=400000/335 [rec/s] out:0=0/335 [rec/s] 2016-05-11 15:01:33,969 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=500000/0/0 in:1193=500000/419 [rec/s] out:0=0/419 [rec/s] 2016-05-11 15:02:54,523 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=600000/0/0 in:1200=600000/500 [rec/s] out:0=0/500 [rec/s]



它变慢并卡住(即使几天后也没有完成)

2016-05-11 13:51:17,543 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10000/0/0 in:87=10000/114 [rec/s] out:0=0/114 [rec/s] 2016-05-11 16:58:16,552 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100000/0/0 in:8=100000/11333 [rec/s] out:0=0/11333 [rec/s]



我在这里遗漏了什么重要的东西吗?

PS:在进行此分析时,我已经删除了下面提到的瓶颈代码块的所有代码部分。

最佳答案

自己写答案,因为我找到了原因,这个问题仍然没有任何回复甚至评论。性能缓慢的根本原因是“rbind”操作。 Rbind 实现是这样一种方式,它需要更多的时间来追加行;到更大的基础 data.frame 比更小的基础 data.frame。更多详细信息请参见此处 Growing a data.frame in a memory-efficient manner

我自己已经实现了 data.table 以及预填充版本的解决方案,效果非常好。

关于r - 使用 R 的非常慢/卡住的 hadoop 流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37178088/

相关文章:

r - 使用 'system' 时如何解除R的提示?

r - 在带有刻面的图形上添加 R^2

python - Hadoop:在 Ubuntu 12.04 中通过 NameNode 格式化 HDFS 文件系统

hadoop - map 约简中的共享变量

Hadoop 减少多种输入格式

r - 列名在 read.table 或 read.csv 上向左移动

r - 在R中按组添加观察数

hadoop - HIVE的自定义记录分隔符

hadoop - canopy聚类算法中如何增加reducer的数量

如果我使用 -files 命令行选项,hadoop 如何在映射器中获取分发缓存文件