我正在尝试使用 Amazon Elastic Map Reduce运行数百万个案例的一系列模拟。这是一个没有 reducer 的 Rscript 流作业。我在我的 EMR 调用 --reducer org.apache.hadoop.mapred.lib.IdentityReducer
中使用 Identity Reducer。
脚本文件在手动传递一行字符串时从 Linux 机器上的命令行本地测试和运行时工作正常 echo "1,2443,2442,1,5"| ./mapper.R
然后我得到了我期望的一行结果。但是,当我使用 EMR 上的输入文件中的大约 10,000 个案例(行)测试我的模拟时,在 10,000 个输入行中,我只得到了十几行左右的输出。我已经尝试了几次,但我不明白为什么。 Hadoop 作业运行良好,没有任何错误。似乎输入行被跳过,或者 Identity reducer 可能发生了一些事情。对于有输出的情况,结果是正确的。
我的输入文件是一个具有以下数据格式的 csv,由逗号分隔的一系列五个整数:
1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...
这是我的 mapper.R
R 脚本#! /usr/bin/env Rscript
# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
list <- lapply(casename, function(x) {
read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
header = TRUE,
stringsAsFactors = FALSE)})
return(data.frame(list))
}
con <- file("stdin")
line <- readLines(con, n = 1, warn = FALSE)
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases) # for indexing
## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")
## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))
## Calculations that use df d and produce a string called 'output'
## in the form of "id: value1 value2 value3 ..." to be used at a
## later time for agregation.
cat(output, "\n")
close(con)
此模拟的(广义)EMR 调用是:
ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2
如果有人对我可能遇到这些问题的原因有任何见解,我愿意接受建议,以及对 R 脚本的任何更改/优化。
我的另一个选择是将脚本转换为函数并使用 R 多核包运行并行应用,但我还没有尝试过。我想让它在 EMR 上运行。我用了JD Long's和 Pete Skomoroch's R/EMR 示例作为创建脚本的基础。
最佳答案
没有明显的跳出。但是,您能否使用仅 10 行的简单输入文件来运行该作业?确保这 10 行是未在您的大型测试用例中运行的场景。试试这个来消除您的输入导致 R 脚本不产生答案的可能性。
调试 EMR 作业本身就是一项技能。
编辑:
这是一次全面的钓鱼探险,但使用 AWS GUI 启动了 EMR 交互式 pig session 。 “Interactive pig” session 保持运行,因此您可以通过 ssh 访问它们。您也可以通过命令行工具执行此操作,但通过 GUI 更容易一些,因为希望您只需要执行一次。然后 ssh 进入集群,将你的测试用例传输到你的缓存文件和你的映射器中,然后运行这个:
cat infile.txt | yourMapper.R > 输出文件.txt
这只是为了测试您的映射器是否可以在没有 Hadoop 位的情况下在 EMR 环境中解析 infile。
编辑 2:
我将上面的文字留给后代使用,但真正的问题是您的脚本永远不会返回到标准输入以获取更多数据。因此,您为每个映射器运行一次,然后结束。如果你运行上面的一行,你只会得到一个结果,而不是 infile.txt 中每一行的结果。如果您甚至在本地机器上运行了 cat
测试,错误应该会弹出!
让我们看看皮特的 word count in R example :
#! /usr/bin/env Rscript
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
## **** could wo with a single readLines or in blocks
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
line <- trimWhiteSpace(line)
words <- splitIntoWords(line)
## **** can be done as cat(paste(words, "\t1\n", sep=""), sep="")
for (w in words)
cat(w, "\t1\n", sep="")
}
close(con)
您的脚本缺少的部分是这一点:
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
#do your dance
#do your dance quick
#come on everybody tell me what's the word
#word up
}
当然,您应该替换 Cameo's Word Up 的歌词!用你的实际逻辑。
请记住,适当的调试音乐可以减轻这个过程的痛苦:
关于r - Amazon Elastic MapReduce 上的 R 映射器脚本故障排除 - 结果不符合预期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4443751/