python - 聚合后使用 spark 从 hive 表读取和写入

标签 python hadoop hive apache-spark

我们有一个hive仓库,想用spark做各种任务(主要是分类)。有时将结果写回配置单元表。例如,我们编写了以下 python 函数来查找 original_table 第二列的总和,按 original_table 第一列分组。该函数有效,但我们担心它效率低下,尤其是转换为键值对的映射和字典版本。 combiner、mergeValue、mergeCombiner 函数在别处定义,但工作正常。

from pyspark import HiveContext

rdd = HiveContext(sc).sql('from original_table select *')

#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))

#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)

# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})

# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)

# save
schema_rdd.saveAsTable('new_table_name')

有没有更有效的方法来做同样的事情?

最佳答案

...也许在编写问题时这是不可能的,但现在(1.3 后)使用 createDataFrame() 调用是否有意义?

在获得第一个 RDD 之后,看起来您可以进行调用,然后针对该结构运行一个简单的 SQL 语句,一次完成整个工作。 (求和和分组)另外,如果我正确阅读 API 文档,DataFrame 结构可以在创建时直接推断模式。

( http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext )

关于python - 聚合后使用 spark 从 hive 表读取和写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28412954/

相关文章:

python - 如何修复 Selenium WebDriverException : The browser appears to have exited before we could connect?

python - 在sqlalchemy中,什么对应于mysql函数 'load_file'?

hadoop - Hadoop集群可以处理多少数据?

java - MapReduce:增加并发映射器任务的数量

hadoop - 为什么在 hive 中使用 “insert into”时总会有一些空值?

c++ - 从 C++ 运行一些命令行命令

python - 使用 pyparsing 解析多行的单词转义拆分

hadoop - MapReduce到 yarn 配置转换

hadoop - 如何将 Oozie 配置传递给工作流操作,例如 Hive?

hadoop - 失败 : Error in semantic analysis: Column Found in more than One Tables/Subqueries