scala - 从配置单元表中读取并使用 spark sql 写回它

标签 scala apache-spark hadoop apache-spark-sql

我正在使用 Spark SQL 读取 Hive 表并将其分配给 scala val

val x = sqlContext.sql("select * from some_table")

然后我对数据框 x 进行一些处理,最后得到一个数据框 y ,它具有与表 some_table 完全相同的模式。

最后,我试图将 y 数据框插入到同一个配置单元表 some_table 中

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")

然后我得到错误

org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from

我尝试创建一个插入 sql 语句并使用 sqlContext.sql() 触发它,但它也给了我同样的错误。

有什么方法可以绕过这个错误吗?我需要将记录插入回同一张表。


您好,我尝试按照建议操作,但仍然出现相同的错误。

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;

最佳答案

其实你也可以使用检查点来实现这一点。由于它破坏了数据沿袭,Spark 无法检测到您正在同一个表中读取和覆盖:

 sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")

关于scala - 从配置单元表中读取并使用 spark sql 写回它,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38746773/

相关文章:

scala - 玩 2 Scala : Adding a description attribute to this model

java - 从 Maven 迁移到 SBT

python - PySpark Dataframe 根据函数返回值创建新列

hadoop - 为 Hadoop 名称节点备份添加 QJM 或 NFS

java - 为什么“类型绑定(bind)不匹配 : The type ? 扩展 T 不是 Enum<E> 类型的有界参数 <E extends Enum<E>> 的有效替代”?

scala - 在 Scala 中匹配任意列表大小

scala - 将 HList 转换为另一个 HList

scala.ScalaReflectionException : <none> is not a term

apache-spark - 每个直接流创建了多少消费者来读取记录?

hadoop - Ended Job = job_local644049657_0014 with errors Error during job, 获取调试信息