hadoop - 将大量 Spark 数据帧合并为一个

标签 hadoop apache-spark hive pyspark hdfs

我在 for 循环中使用满足不同条件的不同查询超过 1500 次来查询缓存的配置单元临时表。我需要在循环内使用 unionAll 将它们全部合并。但是由于 spark 跟不上 RDD 血统,我得到了 stackoverflow 错误。

伪代码:

df=[from a hive table]
tableA=[from a hive table]
tableA.registerTempTable("tableA")
HiveContext.sql('CACHE TABLE tableA')

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)

由于 RDD 沿袭变得困难,这会引发 StackOverFlow 错误。所以我尝试检查点如下:

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)
    df.rdd.checkpoint
    df = sqlContext.createDataFrame(df.rdd, df.schema)

我遇到了同样的错误。所以我尝试了 SaveAsTable,我一直想避免它,因为每个 hql 查询和循环内的 hive io 之间的作业提交滞后。但这种方法效果很好。

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df=query something from tableA
        df.write.saveAsTable('output', mode='append')
    elif ():
        df=query something from tableA
        df.write.saveAsTable('output', mode='append') 

我需要帮助避免将数据帧保存到循环内的配置单元中。我想以某种内存中高效的方式合并 dfs。我尝试过的其他选项之一是将查询结果直接插入到临时表中,但出现错误:无法插入到基于 RDD 的表中。

最佳答案

也许,结果的临时表会起作用。

df1="query something from tableA".registerTempTable("result")
sqlContext.sql("Insert into result query something from tableA")

关于hadoop - 将大量 Spark 数据帧合并为一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43269350/

相关文章:

hadoop - Hadoop CapacitySchueduler不使用过多容量

hadoop - 如何在一个群集中运行hadoop流作业并将输出写入其他群集?

json - 删除/映射 Hive 表上的重复键?

hadoop - 在HiveQL中喜欢任何功能

hadoop - 在 Toad 数据点中设置属性失败

apache - Apache Tajo 和 Apache hive 之间的实际区别是什么

java - hadoop 连接在端口 9000 上被拒绝

scala - Spark UDAF动态输入架构处理

java - Spark 应用程序在运行 flatMap 函数后仅使用 1 个执行器

scala - 连接后如何避免重复列?