我正在尝试按照以下步骤将数据框加载到 Hive 表中:
读取源表并将数据帧保存为 HDFS 上的 CSV 文件
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load()
按照我的 Hive 表列对列进行排序 我的配置单元表列以以下格式的字符串存在:
val hiveCols = col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype val schemaList = hiveCols.split("\\|") val hiveColumnOrder = schemaList.map(e => e.split("\\:")).map(e => e(0)).toSeq val finalDF = yearDF.selectExpr(hiveColumnOrder:_*)
我在“execQuery”中读取的列顺序与“hiveColumnOrder”相同,为了确保顺序,我再次使用 selectExpr 选择 yearDF 中的列
将数据帧保存为 HDFS 上的 CSV 文件:
newDF.write.format("CSV").save("hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/")
保存数据框后,我从“hiveCols”中获取相同的列, 准备一个 DDL 以在相同位置创建一个配置单元表,其值按给定的逗号分隔 下面:
create table if not exists schema.tablename(col1 coldatatype,col2 coldatatype,col3 coldatatype,col4 coldatatype...col200 datatype)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILELOCATION 'hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/';
在我将数据框加载到创建的表中之后,我在这里面临的问题是当我查询表时,我在查询中得到不正确的输出。 例如:如果我在将数据框保存为文件之前对数据框应用以下查询:
finalDF.createOrReplaceTempView("tmpTable")
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2
我得到了正确的输出。所有值都与列正确对齐:
[19924598,2,null,null,381761.40000000000000000000,381761.4,-381761.40000000000000000000,-381761.4,0.01489610000000000000,0.014896100000000,5686.76000000000000000000,5686.76]
但是在将数据框保存到 CSV 文件中之后,在其之上创建一个表(第 4 步)并对创建的表应用相同的查询,我发现数据是困惑的并且与列的映射不正确:
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| header_id | line_num | debit_rate | debit_rate_text | credit_rate | credit_rate_text | activity_amount | activity_amount_text | exchange_rate | exchange_rate_text | amount_cr | amount_cr_text |
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| 19924598 | 2 | NULL | | 381761.4 | | 5686.76 | 5686.76 | NULL | -5686.76 | NULL | |
所以我尝试使用一种不同的方法,即预先创建配置单元表并将数据从数据帧插入其中:
- 运行上面第 4 步中的 DDL
- finalDF.createOrReplaceTempView("tmpTable")
- spark.sql("insert into schema.table select * from tmpTable")
如果我在作业完成后运行上述选择查询,即使这种方式也会失败。
我尝试使用 refresh table schema.table
和 msckrepair table schema.table
刷新表,只是为了查看元数据是否有任何问题,但似乎没有任何效果。
谁能告诉我是什么原因造成的,我这里操作数据的方式有什么问题吗?
最佳答案
代码使用 Spark 2.3.2 测试
无需从 CSV 文件创建 Spark 数据框,然后将其注册为 Hive 表,您可以轻松地运行 SQL 命令并从 CSV 文件创建 Hive 表
val conf = new SparkConf
conf
.set("hive.server2.thrift.port", "10000")
.set("spark.sql.hive.thriftServer.singleSession", "true")
.set("spark.sql.warehouse.dir", "hdfs://PATH_FOR_HIVE_METADATA")
.set("spark.sql.catalogImplementation","hive")
.setMaster("local[*]")
.setAppName("ThriftServer")
val spark = SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
现在使用 spark
对象,您可以作为 Hive 用户运行 SQL 命令:
spark.sql("DROP DATABASE IF EXISTS my_db CASCADE")
spark.sql("create database if not exists my_db")
spark.sql("use my_db")
使用以下代码,您可以加载 HDFS 目录中的所有 csv_files(或者您可以只给出一个 CSV 文件的路径):
spark.sql(
"CREATE TABLE test_table(" +
"id int," +
"time_stamp bigint," +
"user_name string) " +
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ',' " +
"STORED AS TEXTFILE " +
"LOCATION 'hdfs://PATH_TO_CSV_Directory_OR_CSV_FILE' "
)
最后将 Spark sqlContext 对象注册为 Hive ThriftServer:
HiveThriftServer2.startWithContext(spark.sqlContext)
这将在端口 10000 上创建一个 ThriftServer 端点。
INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads
现在您可以运行 beeline 并连接到 ThriftServer:
beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: enter optional_username
Enter password for jdbc:hive2://localhost:10000: leave blank
Connected to: Spark SQL (version 2.3.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000>
并测试表test_table
是否在my_db
数据库下创建:
0: jdbc:hive2://localhost:10000> use my_db;
0: jdbc:hive2://localhost:10000> show tables ;
+-----------+-----------------------+--------------+--+
| database | tableName | isTemporary |
+-----------+-----------------------+--------------+--+
| my_db | test_table | false |
+-----------+-----------------------+--------------+--+
此外,您可以使用 ThrifServer JDBC 端点创建任何其他 Hive 表(或任何 HiveQL 命令)。
这里是需要的依赖:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion,
"org.apache.hadoop" % "hadoop-hdfs" % "2.8.3",
"org.apache.hadoop" % "hadoop-common" % "2.8.3",
)
关于apache-spark - 将数据从 CSV 文件映射到 HDFS 上的 Hive 表时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54691697/