python - 如何使用 Python(不使用 PySpark)将 pandas 数据框插入现有的 Hive 外部表?

标签 python pandas dataframe hive

我正在创建一个到 Hive 的连接字符串,并在该连接上的 Hive 表上运行一些 SELECT 查询。

对检索到的数据执行一些转换后,我正在创建一个数据框df_student_credits,如下所示

NAME_STUDENT_INITIAL    CREDITS_INITIAL     NAME_STUDENT_FINAL  CREDITS_FINAL   LOAD_DATE
John                    23                  John                25              21/03/2017
Alan                    19                  Alan                19              17/06/2018
Will                    24                  Will                26              02/08/2019
Lily                    25                  Lily                25              22/01/2019

现在,我想将此数据帧插入到我使用以下命令创建的 Hive 外部表中:

CREATE EXTERNAL TABLE IF NOT EXISTS school_db.student_credits
(
NAME_STUDENT_INITIAL STRING,
CREDITS_INITIAL STRING,
NAME_STUDENT_FINAL STRING,
CREDITS_FINAL STRING,
LOAD_DATE STRING
)
LOCATION '/user/gradebook/student_credits/';

我希望每次运行此脚本并生成数据帧时都将插入内容附加到表中,而不是覆盖现有的表数据。

我看到的几乎每一篇文章都展示了使用 PySpark 执行此操作的方法。但我无法使用 PySpark,我必须使用与触发 SELECT 查询相同的 python 脚本来实现此目的。

我是 Hive 的新手,也是 Python 的新手。有人可以帮我解决这个问题吗?

最佳答案

您似乎正在尝试从 Hive 表读取 pandas 数据帧并进行一些转换并将其保存回某个 Hive 外部表。请引用下面的代码作为示例。在这里,我已从 Hive 表读取到 pandas 数据框中,并向其中添加了一些日期列。后来我使用 subprocess 模块来执行我的 shell,它将数据加载到按某些日期列分区的 Hive 表中。

from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys

conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()

query="select user_id,country from test_dev_db.test_data"

start_time= datetime.datetime.now()

output_file='/home/vikct001/user/vikrant/python/test_data.csv'

data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)

data.to_csv(output_file, sep='|', encoding='utf-8',index=None)

hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""

def save_to_hdfs(output_file):
        print("I am here")
        p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1)


save_to_hdfs(output_file)
end_time=datetime.datetime.now()

print 'processing ends', (start_time-end_time).seconds/60.0,' minutes'

表说明:

hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id                      int
country                 string
input_date              date
loaded_date             string

# Partition Information
# col_name              data_type               comment

loaded_date             string

您可以看到数据已加载并创建了具有当前日期的分区。

hive (test_dev_db)> show partitions test_dev_db.test_data_external;
OK
loaded_date=2019-08-21


hive (test_dev_db)> select * from test_dev_db.test_data_external;
OK
1       India   2019-08-21      2019-08-21
2       Ukraine 2019-08-21      2019-08-21
1       India   2019-08-21      2019-08-21
2       Ukraine 2019-08-21      2019-08-21
1       India   2019-08-21      2019-08-21
2       Ukraine 2019-08-21      2019-08-21
1       India   2019-08-21      2019-08-21

关于python - 如何使用 Python(不使用 PySpark)将 pandas 数据框插入现有的 Hive 外部表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57590374/

相关文章:

python - 将 Python 和依赖项部署到 Elastic Beanstalk

python - 当我在 python 中使用 f 字符串时得到不同的输出

python - 无法将 .dll 文件加载到 python 脚本中

python - 行元素之间变化的滚动计数

Python:按组计算数据框中的特定事件

python - 打印 pandas dataframe 的内容而不带索引

python - 只能将元组(不是 "unicode")连接到元组

python - 使用 python pandas 计算每日总和

python - 如何使用 pandas 为数据框的一列中单独包含的每个时间添加 timedelta?

python - Pandas - 在列中附加行值的总和(如果总和为偶数)或 NaN(如果为奇数)