r - Spark ML_pipelines : managing table reading

标签 r apache-spark sparklyr

我正在使用 Spark ML_pipelines 在使用 SCALA 的生产环境中轻松部署我在 Sparklyr 中开发的操作。它工作得很好,除了一个部分:似乎当我从 Hive 读取一个表然后创建一个将操作应用于该表的管道时,该管道还将保存表读取操作,从而表的名称。但是我希望管道独立于此。

这是一个可重现的例子:

Sparklyr 部分:

sc = spark2_context(memory = "4G")

iris <- copy_to(sc, iris, overwrite=TRUE)

spark_write_table(iris, "base.iris")
spark_write_table(iris, "base.iris2")
df1 <- tbl(sc, "base.iris")

df2 <- df1 %>%
  mutate(foo = 5)

pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(df2) %>%
  ml_fit(df1)

ml_save(pipeline,
        paste0(save_pipeline_path, "test_pipeline_reading_from_table"),
        overwrite = TRUE)

df2 <- pipeline %>% ml_transform(df1)

dbSendQuery(sc, "drop table base.iris")

SCALA部分:

import org.apache.spark.ml.PipelineModel
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val df1 = spark.sql("select * from base.iris2") 

val pipeline = PipelineModel.load(pipeline_path + "/test_pipeline_reading_from_table")
val df2 = pipeline.transform(df1)

我收到这个错误:

org.apache.spark.sql.AnalysisException: Table or view not found: `base`.`iris`; line 2 pos 5;
'Project ['Sepal_Length, 'Sepal_Width, 'Petal_Length, 'Petal_Width, 'Species, 5.0 AS foo#110]
+- 'UnresolvedRelation `base`.`iris`

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:82)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:637)
  at org.apache.spark.ml.feature.SQLTransformer.transformSchema(SQLTransformer.scala:86)
  at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
  at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
  at org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:310)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:304)
  ... 71 elided

我可以看到 2 个解决方案:

  1. 持久化 dataframe 似乎是一个解决方案,但我需要找到一种方法来避免让我的内存过载,因此我的问题是 unpersisting

  2. 将 Hive 中的表名作为管道参数传递,我试图在 this question 中解决这个问题

现在,说了这么多,我可能会遗漏一些东西,因为我只是一个初学者......

编辑:这与 this question 有很大不同。因为这涉及集成刚刚在管道中读取的数据框的特定问题,如标题中所述。

编辑:对于我的项目,在我阅读表格后保留表格是一个可行的解决方案。不知道有没有更好的解决办法。

最佳答案

Then the pipeline would call my table "base.table", making it impossible to apply it to another table.

事实并非如此。 ft_dplyr_transformer是Spark自带的SQLTransformer的语法糖。内部 dplyr expression is converted to SQL query, and the name of the table is replaced with __THIS__ (Spark 占位符指的是当前表)。

假设您有这样的转换:

copy_to(sc, iris, overwrite=TRUE)

df <- tbl(sc, "iris") %>%
  mutate(foo = 5)

pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(df) %>%
  ml_fit(tbl(sc, "iris"))

ml_stage(pipeline, "dplyr_transformer") %>% spark_jobj() %>% invoke("getStatement")
[1] "SELECT `Sepal_Length`, `Sepal_Width`, `Petal_Length`, `Petal_Width`, `Species`, 5.0 AS `foo`\nFROM `__THIS__`"

然而,这是一种相当困惑的表达方式,直接使用 native SQL 转换器更有意义:

pipeline <- ml_pipeline(sc) %>%
  ft_sql_transformer("SELECT *, 5 as `foo` FROM __THIS__") %>%
  ml_fit(df)

编辑:

您在这里遇到的问题看起来像是一个错误。 get_base_name函数返回不带引号的表名,因此您的情况下的值为

> get_base_name(x$ops)
<IDENT> default.iris

模式将是

> pattern
[1] "\\bdefault.iris\\b"

但是dbplyr::sql_render返回反引号的完全限定名称:

> dbplyr::sql_render(x)
<SQL> SELECT `Sepal_Length`, `Sepal_Width`, `Petal_Length`, `Petal_Width`, `Species`, 5.0 AS `foo`
FROM `default`.`iris`

因此模式与名称不匹配。

关于r - Spark ML_pipelines : managing table reading,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56344675/

相关文章:

reshape 列名中有年份的数据框

根据组平均值重新排序因子水平

java - Spark SASL 无法使用 yarn 在 emr 上工作

r - sparklyr 更改所有列名 spark dataframe

r - term.formula(formula) : '.' in formula and no 'data' argument 中的错误

r - 为每个唯一 ID 创建滞后变量

hadoop - 调用 Spark SaveAsTextFile 方法时如何获取生成的文件名

hadoop - 使用YARN的Spark流应用程序配置

r - 通过 ID 内的条件重新启动来创建增量值

r - 用于大型数据集的 sparklyr 中 copy_to 的替代方案