我正在创建一个到 Hive 的连接字符串,并在该连接上的 Hive 表上运行一些 SELECT 查询。
对检索到的数据执行一些转换后,我正在创建一个数据框df_student_credits
,如下所示
NAME_STUDENT_INITIAL CREDITS_INITIAL NAME_STUDENT_FINAL CREDITS_FINAL LOAD_DATE
John 23 John 25 21/03/2017
Alan 19 Alan 19 17/06/2018
Will 24 Will 26 02/08/2019
Lily 25 Lily 25 22/01/2019
现在,我想将此数据帧插入到我使用以下命令创建的 Hive 外部表中:
CREATE EXTERNAL TABLE IF NOT EXISTS school_db.student_credits
(
NAME_STUDENT_INITIAL STRING,
CREDITS_INITIAL STRING,
NAME_STUDENT_FINAL STRING,
CREDITS_FINAL STRING,
LOAD_DATE STRING
)
LOCATION '/user/gradebook/student_credits/';
我希望每次运行此脚本并生成数据帧时都将插入内容附加到表中,而不是覆盖现有的表数据。
我看到的几乎每一篇文章都展示了使用 PySpark 执行此操作的方法。但我无法使用 PySpark,我必须使用与触发 SELECT 查询相同的 python 脚本来实现此目的。
我是 Hive 的新手,也是 Python 的新手。有人可以帮我解决这个问题吗?
最佳答案
您似乎正在尝试从 Hive 表读取 pandas 数据帧并进行一些转换并将其保存回某个 Hive 外部表。请引用下面的代码作为示例。在这里,我已从 Hive 表读取到 pandas 数据框中,并向其中添加了一些日期列。后来我使用 subprocess 模块来执行我的 shell,它将数据加载到按某些日期列分区的 Hive 表中。
from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys
conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()
query="select user_id,country from test_dev_db.test_data"
start_time= datetime.datetime.now()
output_file='/home/vikct001/user/vikrant/python/test_data.csv'
data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)
data.to_csv(output_file, sep='|', encoding='utf-8',index=None)
hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""
def save_to_hdfs(output_file):
print("I am here")
p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
stdout,stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
save_to_hdfs(output_file)
end_time=datetime.datetime.now()
print 'processing ends', (start_time-end_time).seconds/60.0,' minutes'
表说明:
hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id int
country string
input_date date
loaded_date string
# Partition Information
# col_name data_type comment
loaded_date string
您可以看到数据已加载并创建了具有当前日期的分区。
hive (test_dev_db)> show partitions test_dev_db.test_data_external;
OK
loaded_date=2019-08-21
hive (test_dev_db)> select * from test_dev_db.test_data_external;
OK
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
关于python - 如何使用 Python(不使用 PySpark)将 pandas 数据框插入现有的 Hive 外部表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57590374/