r - 无法从 SparkR 创建的 DataFrame 中检索数据

标签 r hadoop apache-spark hive sparkr

我有以下简单的 SparkR 程序,它创建一个 SparkR DataFrame 并从中检索/收集数据。

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

我能够成功创建它并查看信息,但是任何与获取数据相关的操作都会抛出以下错误。

16/07/25 16:33:59 WARN TaskSetManager: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

16/07/25 16:33:59 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job 16/07/25 16:33:59 ERROR RBackendHandler: dfToCols on org.apache.spark.sql.api.r.SQLUtils failed Error in invokeJava(isStatic = TRUE, className, methodName, ...) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPar

如果我像下面这样通过 sparkR 命令行执行它,它就会被执行。

~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client

但是当我通过 R 和 sparkR.init((master="yarn-client") 执行它时,它会抛出错误。

有人可以帮助解决这些错误吗?

最佳答案

添加这一行产生了不同:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

完整代码如下:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

关于r - 无法从 SparkR 创建的 DataFrame 中检索数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38577939/

相关文章:

r - 从过去 21 天的窗口中提取事件类型

hadoop - 使用 Hue Hadoop 在现有表中导入新数据时遇到问题

hadoop - 将数据从 HBase 迁移到文件系统。 (将 Reducer 输出写入本地或 Hadoop 文件系统)

windows - Windows 中 python 文件的 FileNotFoundException

apache-spark - Spark 客户端模式 - YARN 为驱动程序分配一个容器?

R dplyr动态列选择

在 R 中重新格式化 Excel 工作表

apache-spark - Spark dataframe groupby 均值和中值未完成

r - 在 r 的列中用字符分隔字符串

scala - 在 spark scala 的数据框中为每个组采样不同数量的随机行