PythonSpark : need to execute hive queries from file columns

标签 python pandas apache-spark hive pyspark

我有一个包含如下行的文件(文件名:sample.csv)

Id,Query
T1012,"Select * from employee_dim limit 100"
T1212,"Select * from department_dim limit 100"
T1231,"Select dept_number,location,dept_name from locations"

我需要迭代此文件(sample.csv)并获取第二列(“查询”),在配置单元数据库中运行它并获取结果,然后将其保存到名为 T1012_result.csv 的新文件,并对所有行执行类似的操作。

你能帮忙吗?

我尝试通过spark读取文件并将其转换为列表,然后使用sparksession执行SQL查询,但它不起作用。

from pyspark.sql import SparkSession,HiveContext

spark=SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("use sample")
input=spark.read.csv("sample.csv")
#input.select('_c1').show()

import pandas as pd

a=input.toPandas().values.tolist()
for i in a :
   print i[1]
   spark.sql('pd.DataFrame(i)')

最佳答案

更新:spark

file_path="file:///user/vikrant/inputfiles/multiquery.csv"
df=spark.read.format("com.databricks.spark.csv").option("header", "true").load(file_path)

+---+-------------------------------+
|id |query                          |
+---+-------------------------------+
|1  |select * from exampledate      |
|2  |select * from test             |
|3  |select * from newpartitiontable|
+---+-------------------------------+

def customFunction(row):
    for row in df.rdd.collect():
        item=(row[1])
        filename=(row[0])
        query=""
        query+=str(item)
        newdf=spark.sql(query)
        savedataframe(newdf,filename)

def savedataframe(newdf,filename):
    newdf.coalesce(1).write.csv("/user/dev/hadoop/external/files/file_" + filename + ".csv")

customFunction(df)

drwxr-xr-x   - vikct001 hdfs          0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_1.csv
drwxr-xr-x   - vikct001 hdfs          0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_2.csv
drwxr-xr-x   - vikct001 hdfs          0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_3.csv

更新:使用 pandas 我在 sql server 上有几个测试表,正如您在问题中提到的那样,我正在将它们读取到 pandas 数据帧,并将查询结果保存到每个不同的文件中,并重命名为数据帧的第一列:

import pandas as pd
import pyodbc
from pandas import DataFrame


connection = pyodbc.connect('Driver={ODBC Driver 13 for SQL Server};SERVER=yourservername;DATABASE=some_db;UID=username;PWD=password')
cursor = connection.cursor()

data=[['1','select * from User_Stage_Table'],['2','select * from User_temp_Table']]
df=pd.DataFrame(data,columns=['id','query'])


def get_query(df):
    a=df.values.tolist()
    for i in a:
        query=i[1]    #reading second column value as query
        filename=i[0] #reading first column value as filename
        write_query(query,filename) #calling write_query function 

def write_query(query,filename):
    df=pd.read_sql_query(query,connection)
    df.to_csv(outfile_location+filename+".txt",sep=',',encoding='utf-8',index=None,mode='a')

get_query(df)  #calling get_query function to build the query
out_file_location='G:\Testing\OutputFile\outfile'

您的输出文件名称将为:

outfile1.txt #这将包含表User_Stage_Table

的数据

outfile2.txt #这将包含表User_temp_Table'

的数据

如果这可以解决您的问题或进一步面临任何问题,请告诉我。

关于PythonSpark : need to execute hive queries from file columns,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57315590/

相关文章:

apache-spark - Spark 2.0中使用SparkSession时的parallelize()方法

python - Spark 支持使用 Windows 函数

apache-spark - Spark数据框添加新列问题 - 结构化流

Python 2 do_POST http.server multipart/formdata 到 Python 3

python - 使用条件语句返回数据帧的子集

python - 如何在 Python3 中使用 StringIO?

python - Pandas read_fwf 没有加载文件的全部内容

python - 将 dtype ('int64' ) 转换为 pandas 数据帧

python - DataFrame 中的时间计算和操作分组

python - 模拟掷骰子游戏