r - 写入从 SparkR:::map 返回的 R 数据帧

标签 r apache-spark sparkr

我正在使用 SparkR:::map,我的函数为每个输入行返回一个较大的 R 数据帧,每个数据帧的形状相同。我想将这些数据帧编写为 Parquet 文件,而不“收集”它们。我可以将 write.df 映射到我的输出列表吗?我可以让工作人员任务来编写 Parquet 吗?

我现在有工作了。。例子。我对此很满意,除了我没想到归约会隐式“收集”,因为我想将生成的 DF 写为 Parquet。

此外,我不相信:::map 实际上可以并行执行任何操作。我是否也需要始终调用“并行”?

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")

source("jdbc-utils.R")

options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
                         sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
                         appName = "Peter Spark test",
                         list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
  x <- v$xs[1]
  y <- v$ys[1]
  startTime     <- format(Sys.time(), "%F %T")
  xs <- c(1:x)
  endTime <- format(Sys.time(), "%F %T")
  hostname <- system("hostname", intern = TRUE)
  xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
  return(xys)
}

# HERE BE THE SCRIPT BIT
main <- function() {

  # Make unique identifiers for each run
  xs <- c(1:365)
  ys <- c(1:1)
  xys <- data.frame(xs,ys,stringsAsFactors = FALSE)

  # Convert to Spark dataframe for mapping
  sqlContext <- get("sqlContext", envir = .GlobalEnv)
  xys.sdf <- createDataFrame(sqlContext, xys)

  # Let Spark do what Spark does
  output.list <- SparkR:::map(xys.sdf, run.model)

  # Reduce gives us a single R dataframe, which may not be what we want.
  output.redux <- SparkR:::reduce(output.list, rbind)

  # Or you can have it as a list of data frames.
  output.col <- collect(output.list)

  return(NULL)
}

最佳答案

假设您的数据或多或少像这样:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])

并且所有列都有受支持的类型,您可以将其转换为逐行格式:

rows <- SparkR:::flatMap(dfs, function(x) {
  data <- as.list(x)
  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
  do.call(mapply, append(args, data))
})

调用createDataFrame:

sdf <- createDataFrame(sqlContext, rows)
head(sdf)

##    mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4 24.4   4 146.7  62 3.69 3.19 20.00  1  0    4    2
## 5 22.8   4 140.8  95 3.92 3.15 22.90  1  0    4    2
## 6 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4

printSchema(sdf)

## root
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

只需使用write.df/saveDF即可。

问题是您一开始就不应该使用内部 API。它在初始版本中被删除的原因之一是不够强大,无法直接使用。更不用说目前还不清楚它是否会被支持,甚至将来是否可用。只是说...

关于r - 写入从 SparkR:::map 返回的 R 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33961103/

相关文章:

r - 将具有N个类别的分类因子重新编码为N个二进制列

amazon-web-services - 如何创建代理以在AWS Glue的Spark UI上查看作业?

scala - 如何在 IntelliJ IDEA 中创建 Spark/Scala 项目(无法解析 build.sbt 中的依赖项)?

scala - 在Apache Spark中传递参数

r - 无法运行程序 "Rscript"

r - 如何从 lme4 获得随机效应(BLUPs/条件模式)的协方差矩阵

html - [ Shiny ] : Add link to another tabPanel in another tabPanel

r - 在 R 中,将两个数据帧按元素相乘并乘以一个向量

r - Zeppelin R解释器无法执行任何操作

r - SparkR:levenshtein 来自 2 个 Spark 数据帧的 2 个变量之间的模糊字符串匹配