我正在使用 SparkR:::map,我的函数为每个输入行返回一个较大的 R 数据帧,每个数据帧的形状相同。我想将这些数据帧编写为 Parquet 文件,而不“收集”它们。我可以将 write.df 映射到我的输出列表吗?我可以让工作人员任务来编写 Parquet 吗?
我现在有工作了。。例子。我对此很满意,除了我没想到归约会隐式“收集”,因为我想将生成的 DF 写为 Parquet。
此外,我不相信:::map 实际上可以并行执行任何操作。我是否也需要始终调用“并行”?
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
appName = "Peter Spark test",
list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
x <- v$xs[1]
y <- v$ys[1]
startTime <- format(Sys.time(), "%F %T")
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE)
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
# Make unique identifiers for each run
xs <- c(1:365)
ys <- c(1:1)
xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
# Convert to Spark dataframe for mapping
sqlContext <- get("sqlContext", envir = .GlobalEnv)
xys.sdf <- createDataFrame(sqlContext, xys)
# Let Spark do what Spark does
output.list <- SparkR:::map(xys.sdf, run.model)
# Reduce gives us a single R dataframe, which may not be what we want.
output.redux <- SparkR:::reduce(output.list, rbind)
# Or you can have it as a list of data frames.
output.col <- collect(output.list)
return(NULL)
}
最佳答案
假设您的数据或多或少像这样:
rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])
并且所有列都有受支持的类型,您可以将其转换为逐行格式:
rows <- SparkR:::flatMap(dfs, function(x) {
data <- as.list(x)
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
do.call(mapply, append(args, data))
})
调用createDataFrame
:
sdf <- createDataFrame(sqlContext, rows)
head(sdf)
## mpg cyl disp hp drat wt qsec vs am gear carb
## 1 18.7 8 360.0 175 3.15 3.44 17.02 0 0 3 2
## 2 18.1 6 225.0 105 2.76 3.46 20.22 1 0 3 1
## 3 14.3 8 360.0 245 3.21 3.57 15.84 0 0 3 4
## 4 24.4 4 146.7 62 3.69 3.19 20.00 1 0 4 2
## 5 22.8 4 140.8 95 3.92 3.15 22.90 1 0 4 2
## 6 19.2 6 167.6 123 3.92 3.44 18.30 1 0 4 4
printSchema(sdf)
## root
## |-- mpg: double (nullable = true)
## |-- cyl: double (nullable = true)
## |-- disp: double (nullable = true)
## |-- hp: double (nullable = true)
## |-- drat: double (nullable = true)
## |-- wt: double (nullable = true)
## |-- qsec: double (nullable = true)
## |-- vs: double (nullable = true)
## |-- am: double (nullable = true)
## |-- gear: double (nullable = true)
## |-- carb: double (nullable = true)
只需使用write.df
/saveDF
即可。
问题是您一开始就不应该使用内部 API。它在初始版本中被删除的原因之一是不够强大,无法直接使用。更不用说目前还不清楚它是否会被支持,甚至将来是否可用。只是说...
关于r - 写入从 SparkR:::map 返回的 R 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33961103/