apache-spark - pyspark 数据框列 : Hive column

标签 apache-spark dataframe hive

我有一个 Hive 表如下:

hive> describe stock_quote;
OK
tickerid                string                                      
tradeday                string                                      
tradetime               string                                      
openprice               string                                      
highprice               string                                      
lowprice                string                                      
closeprice              string                                      
volume                  string

Spark 的以下代码读取 csv 文件并尝试将记录插入到 Hive 表中:

sc = spark.sparkContext
lines = sc.textFile('file:///<File Location>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(TickerId = row[0], TradeDay = row[1], TradeTime = row[2], OpenPrice = row[3], HighPrice = row[4], LowPrice = row[5], ClosePrice = row[6], Volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').insertInto('default.stock_quote')

我面临的问题是,当我在数据帧上调用 show() 函数时,它会按字母顺序打印列,如下所示

|ClosePrice|HighPrice|LowPrice|OpenPrice|TickerId|TradeDay|TradeTime|Volume|

,在表中,它在 TickerId(Hive 表中的第 1 列)列中插入 ClosePrice(DF 中的第 1 列)的值,在 TradeDay 列中插入 HighPrice 的值等等。

尝试调用 dataframe 上的 select() 函数,没有帮助。 试图将列名列表如下:

rows_df = spark.createDataFrame(rows_map, ["TickerId", "TradeDay", "TradeTime", "OpenPrice", "HighPrice", "LowPrice", "ClosePrice", "Volume"])

上面改变了列名的顺序,但值保持在相同的位置,这更不正确。

我们将不胜感激任何帮助。

最佳答案

您还可以使用 saveAsTable 代替 insertInto

来自docs :

Unlike insertInto, saveAsTable will use the column names to find the correct column positions

关于apache-spark - pyspark 数据框列 : Hive column,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50022478/

相关文章:

apache-spark - Spark 写入 postgres 很慢

r - 在 R 中绘制子集 os 子集

apache-spark - Spark 性能从 Dataframe 保存到 hdfs 或 hive 的大型数据集

hadoop - oozie shell脚本在kerberos集群中执行beeline

java - 在 apache Spark 中使用 current_timestamp 获取正确的时区偏移

hadoop - yarn client模式如何在远程master节点提交spark作业?

python - 如何在 Python 中将 JSON 文件目录加载到 Apache Spark

scala - spark RDD折叠方法的解释

python - 如何从数据框列中的某些行中删除字符?

hadoop - 将大量 Spark 数据帧合并为一个