pandas - 使用dask从Hive读取数据

标签 pandas hive dask

我正在使用 impala.util 中的 as_pandas 实用程序来读取从配置单元获取的 dataframe 形式的数据。但是,使用 pandas,我认为我将无法处理大量数据,而且速度也会变慢。我一直在阅读有关 dask 的内容,它提供了读取大型数据文件的出色功能。我如何使用它来有效地从配置单元获取数据。

def as_dask(cursor):
"""Return a DataFrame out of an impyla cursor.
This will pull the entire result set into memory.  For richer pandas- 
like functionality on distributed data sets, see the Ibis project.

Parameters
----------
cursor : `HiveServer2Cursor`
    The cursor object that has a result set waiting to be fetched.
Returns
-------
DataFrame
"""
    import pandas as pd
    import dask
    import dask.dataframe as dd

    names = [metadata[0] for metadata in cursor.description]
    dfs = dask.delayed(pd.DataFrame.from_records)(cursor.fetchall(), 
    columns=names)
    return dd.from_delayed(dfs).compute()

最佳答案

目前没有直接的方法可以做到这一点。您最好看看 dask.dataframe.read_sql_table 的实现和 intake-sql 中的类似代码- 您可能需要一种方法来对数据进行分区,并让每个工作人员通过调用delayed()来获取一个分区。然后可以使用 dd.from_delayed 和 dd.concat 将各个部分拼接在一起。

-编辑-

你的函数将延迟的想法从后到前。您正在延迟并立即在单个游标上操作的函数中具体化数据 - 它无法并行化,并且如果数据很大的话会破坏您的内存(这就是您尝试这样做的原因)。

假设您可以形成一组 10 个查询,其中每个查询获取数据的不同部分;不要不要使用 OFFSET,而是在 Hive 索引的某些列上使用条件。 您想做类似的事情:

queries = [SQL_STATEMENT.format(i) for i in range(10)]
def query_to_df(query):
    cursor = impyla.execute(query)
    return pd.DataFrame.from_records(cursor.fetchall())

现在您有了一个返回分区且不依赖全局对象的函数 - 它只接受字符串作为输入。

parts = [dask.delayed(query_to_df)(q) for q in queries]
df = dd.from_delayed(parts)

关于pandas - 使用dask从Hive读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52875508/

相关文章:

python - Pandas :从分层数据创建字典

mysql - 如何从mysql连接下载数据

python - 同时删除一行和一列 Pandas Dataframe

hadoop - 如何使用HIVE HQL从表A中选择已删除的记录与表B进行比较

python - 同时运行两个 dask-ml 输入器而不是依次运行

python - 从包含节点的 pandas Dataframe 创建邻接列表

hadoop - 有Spark、hadoop、hive的兼容映射吗

mysql - Hive 查询给出与 SQL 不同的结果

python - 触发一系列并行任务

dask - 什么导致 dask 作业失败并出现 CancelledError 异常