r - SparkR foreach 循环

标签 r lapply sparkr

在 Spark 的 Java/Scala/Python 实现中,只需调用 RDDDataFrame 类型的 foreach 方法即可并行化对数据集的迭代。

在 SparkR 中我找不到这样的指令。迭代 DataFrame 的行的正确方法是什么?

我只能找到 gapplydapply 函数,但我不想计算新的列值,我只想通过从中获取一个元素来执行某些操作并行的列表。

我之前的尝试是使用lapply

inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')

distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')

collected <- collect(distinctM)[[1]]

problemSolver <- function(idM) {
  filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}

spark.lapply(c(collected), problemSolver)

但我收到此错误:

Error in handleErrors(returnStatus, conn) : 
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
 Error in callJMethod(x@sdf, "col", c) : 
  Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod

R 会提供什么解决方案来解决此类问题?

最佳答案

我也遇到了类似的问题。收集 DataFrame 将其作为数据帧放入 R 中。从那里,您可以像在常规旧 R 中一样获取每一行。在我看来,这是处理数据的一个可怕的主题,因为您失去了 Spark 提供的并行处理功能。使用内置的 SparkR 函数 select,而不是收集数据然后进行过滤。 , filter ,ETC。如果您希望执行按行运算符,内置的 SparkR 函数通常会为您执行此操作,否则,我发现 selectExprexpr当原始 Spark 函数设计为处理单个值时非常有用(例如:from_unix_timestamp)

所以,为了得到你想要的,我会尝试这样的事情(我使用的是 SparkR 2.0+):

首先按照您所做的那样读入数据:

inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")

然后将其设为 RDD:inputSparkDF<- SparkR::createDataFrame(inputDF)

接下来,仅隔离不同/唯一的值(我使用 magrittr 进行管道(在 SparkR 中工作)):

distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct()

从这里,您可以在仍然生活在 Spark 的世界中时应用过滤:

filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")

Spark 为您过滤数据后,将子集收集到基础 R 中作为工作流程的最后步骤是有意义的:

myRegularRDataframe<- SparkR::collect(filteredSparkDF)

我希望这有帮助。祝你好运。 --内特

关于r - SparkR foreach 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41816328/

相关文章:

r - 如何抑制 ggplot2 图中的垂直网格线?

r - 有没有办法从 RMarkdown 文件中引用 yaml frontmatter 部分?

r - 在变异中使用 case_when 时是否需要数据框名称?

list - 列表中的数据框;添加一个名为数据框的新变量

R包和执行时间

r - 在 R 中,从列表中的文件夹中读取文件,并通过文件名(不带文件格式 (.fa))分配列表元素名称

R:如何将结果合并为唯一的输出?

r - 使用SparkR,如何将字符串列拆分为 'n'多列?

r - 从 R 笔记本访问 Azure Blob 存储

join - SparkR:在多个连接条件下连接两个数据帧