python - Pandas + HDF5面板数据存储可存储大数据

标签 python database pandas hdf5

作为我研究的一部分,我正在为我的面板数据寻找一个好的存储设计。我用熊猫做所有的记忆操作。我已经看了下面两个问题/贡献,Large Data Work flows using PandasQuery HDF5 Pandas因为它们最接近我的设置。不过,我还有几个问题要问。首先,让我定义我的数据和一些要求:
大小:我有大约800个日期,9000个ID和多达200个变量。因此,展开面板(沿着日期和id)对应于7.2mio行和200列。这可能都适合记忆或不适合,让我们假设它不适合。磁盘空间不是问题。
变量通常计算一次,但更新/更改可能会不时发生。一旦更新发生,旧版本就不再重要了。
新的变量会不时地被添加,大多数情况下一次只能添加一个。
不添加新行。
查询发生。例如,我通常只需要选择特定的日期范围,比如date>start_date & date<end_date。但有些查询需要考虑日期上的排名条件。例如,获取所有数据(即列),其中rank(var1)>500 & rank(var1)<1000,其中rank是截止日期。
目标是实现数据的快速读取/查询。数据写入并不是那么关键。
我想到了下面的HDF5设计:
按照groups_map方法(of1)将变量存储在不同的表中。将每个组的列数限制为10(为了在更新单个变量时避免较大的内存负载,请参见第3点)。
每个组代表一个表,其中我使用基于存储的每个表的日期和id的多索引。
创建一个更新函数来更新变量。函数将包含所有(10)列的表作为df加载到内存中,删除磁盘上的表,替换df中更新的变量,并将表从内存中保存回磁盘。
创建一个add函数,将var1添加到少于10列的组中,或者根据需要创建新组。保存类似于3。将当前组加载到内存中,删除磁盘上的表,添加新列并将其保存回磁盘。
为相关变量计算截至日期的列组,并将它们作为列组变量1添加到磁盘存储中,这将减少到的查询。
我有以下问题:
更新hdftable时,我想我必须删除整个表才能更新一个列?
何时使用“data_columns”,还是应该在hdfstore.append()中简单地指定true?
如果我想基于条件rank_var1 > 500 & rank_var1<1000进行查询,但我需要来自其他组的列。我可以将从rank-var1条件接收到的索引输入到查询中以获取基于此索引的其他列(该索引是一个带有日期和id的多索引)吗?或者,我需要按日期循环这个索引,然后将与2中建议的相似的id分块,并在需要的地方为每个组重复该过程。或者,(a)我可以在每个组中添加表列,但在磁盘存储方面似乎效率极低。注意,秩过滤相关的变量数量是有限的(比如说5)。或者(b)我可以简单地使用从rank-var1查询接收到的df-rank,并通过rank_var1 > 500 & rank_var1<1000和循环组(df-tmp)使用内存操作,在这里我选择所需的列。
假设我有不同频率的数据。对于不同的频率有不同的组映射(或不同的存储)是我想的方法吗?
存储副本可以在win/ux系统上使用。我想它是完全兼容的,有什么需要考虑的吗?
我计划使用df_rank.merge(df_tmp, left_index=True, right_index=True, how='left')。关于complevel或complib有什么问题吗?
我已经开始写一些代码,一旦我有东西要显示,我会编辑和添加它,如果需要的话。如果你需要更多的信息,请告诉我。
在这里编辑我的存储类的第一个版本,请相应地调整底部的路径。抱歉代码太长,欢迎留言

import pandas as pd
import numpy as np
import string

class LargeDFStorage():

    # TODO add index features to ensure correct indexes
    # index_names = ('date', 'id')

    def __init__(self, h5_path, groups_map):
        """

        Parameters
        ----------
        h5_path: str
            hdf5 storage path
        groups_map: dict
            where keys are group_names and values are dict, with at least key
            'columns' where the value is list of column names.
            A special group_name is reserved for group_name/key "query", which
            can be used as queering and conditioning table when getting data,
            see :meth:`.get`.
        """

        self.path = str(h5_path)
        self.groups_map = groups_map
        self.column_map = self._get_column_map()
        # if desired make part of arguments
        self.complib = 'blosc'
        self.complevel = 9

    def _get_column_map(self):
        """ Calc the inverse of the groups_map/ensures uniqueness of cols

        Returns
        -------
        dict: with cols as keys and group_names as values
        """
        column_map = dict()
        for g, value in self.groups_map.items():
            if len(set(column_map.keys()) & set(value['columns'])) > 0:
                raise ValueError('Columns have to be unique')
            for col in value['columns']:
                column_map[col] = g

        return column_map

    @staticmethod
    def group_col_names(store, group_name):
        """ Returns all column names of specific group

        Parameters
        ----------
        store: pd.HDFStore
        group_name: str

        Returns
        -------
        list:
            of all column names in the group
        """
        if group_name not in store:
            return []

        # hack to get column names, straightforward way!?
        return store.select(group_name, start=0, stop=0).columns.tolist()

    @staticmethod
    def stored_cols(store):
        """ Collects all columns stored in HDF5 store

        Parameters
        ----------
        store: pd.HDFStore

        Returns
        -------
        list:
            a list of all columns currently in the store
        """
        stored_cols = list()
        for x in store.items():
            group_name = x[0][1:]
            stored_cols += LargeDFStorage.group_col_names(store, group_name)

        return stored_cols

    def _find_groups(self, columns):
        """ Searches all groups required for covering columns

        Parameters
        ----------
        columns: list
            list of valid columns

        Returns
        -------
        list:
            of unique groups
        """
        groups = list()
        for column in columns:
            groups.append(self.column_map[column])

        return list(set(groups))

    def add_columns(self, df):
        """ Adds columns to storage for the first time. If columns should
        be updated use(use :meth:`.update` instead)

        Parameters
        ----------
        df: pandas.DataFrame
            with new columns (not yet stored in any of the tables)

        Returns
        -------

        """
        store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
                            complib=self.complib)

        # check if any column has been stored already
        if df.columns.isin(self.stored_cols(store)).any():
            store.close()
            raise ValueError('Some cols are already in the store')

        # find all groups needed to store the data
        groups = self._find_groups(df.columns)

        for group in groups:
            v = self.groups_map[group]

            # select columns of current group in df
            select_cols = df.columns[df.columns.isin(v['columns'])].tolist()
            tmp = df.reindex(columns=select_cols, copy=False)

            # set data column to False only in case of query data
            dc = None
            if group=='query':
                dc = True

            stored_cols = self.group_col_names(store,group)
            # no columns in group (group does not exists yet)
            if len(stored_cols)==0:
                store.append(group, tmp, data_columns=dc)
            else:
                # load current disk data to memory
                df_grp = store.get(group)
                # remove data from disk
                store.remove(group)
                # add new column(s) to df_disk
                df_grp = df_grp.merge(tmp, left_index=True, right_index=True,
                                      how='left')
                # save old data with new, additional columns
                store.append(group, df_grp, data_columns=dc)

        store.close()

    def _query_table(self, store, columns, where):
        """ Selects data from table 'query' and uses where expression

        Parameters
        ----------
        store: pd.HDFStore
        columns: list
            desired data columns
        where: str
            a valid select expression

        Returns
        -------

        """

        query_cols = self.group_col_names(store, 'query')
        if len(query_cols) == 0:
            store.close()
            raise ValueError('No data to query table')
        get_cols = list(set(query_cols) & set(columns))
        if len(get_cols) == 0:
            # load only one column to minimize memory usage
            df_query = store.select('query', columns=query_cols[0],
                                    where=where)
            add_query = False
        else:
            # load columns which are anyways needed already
            df_query = store.select('query', columns=get_cols, where=where)
            add_query = True

        return df_query, add_query

    def get(self, columns, where=None):
        """ Retrieve data from storage

        Parameters
        ----------
        columns: list/str
            list of columns to use, or use 'all' if all columns should be
            retrieved
        where: str
            a valid select statement

        Returns
        -------
        pandas.DataFrame
            with all requested columns and considering where
        """
        store = pd.HDFStore(str(self.path), mode='r')

        # get all columns in stored in HDFStorage
        stored_cols = self.stored_cols(store)

        if columns == 'all':
            columns = stored_cols

        # check if all desired columns can be found in storage
        if len(set(columns) - set(stored_cols)) > 0:
            store.close()
            raise ValueError('Column(s): {}. not in storage'.format(
                set(columns)- set(stored_cols)))

        # get all relevant groups (where columns are taken from)
        groups = self._find_groups(columns)

        # if where query is defined retrieve data from storage, eventually
        # only index of df_query might be used
        if where is not None:
            df_query, add_df_query = self._query_table(store, columns, where)
        else:
            df_query, add_df_query = None, False

        # dd collector
        df = list()
        for group in groups:
            # skip in case where was used and columns used from
            if where is not None and group=='query':
                continue
            # all columns which are in group but also requested
            get_cols = list(
                set(self.group_col_names(store, group)) & set(columns))

            tmp_df = store.select(group, columns=get_cols)
            if df_query is None:
                df.append(tmp_df)
            else:
                # align query index with df index from storage
                df_query, tmp_df = df_query.align(tmp_df, join='left', axis=0)
                df.append(tmp_df)

        store.close()

        # if any data of query should be added
        if add_df_query:
            df.append(df_query)

        # combine all columns
        df = pd.concat(df, axis=1)

        return df

    def update(self, df):
        """ Updates data in storage, all columns have to be stored already in
        order to be accepted for updating (use :meth:`.add_columns` instead)

        Parameters
        ----------
        df: pd.DataFrame
            with index as in storage, and column as desired


        Returns
        -------

        """
        store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
                            complib=self.complib)

        # check if all column have been stored already
        if df.columns.isin(self.stored_cols(store)).all() is False:
            store.close()
            raise ValueError('Some cols have not been stored yet')

        # find all groups needed to store the data
        groups = self._find_groups(df.columns)
        for group in groups:
            dc = None
            if group=='query':
                dc = True
            # load current disk data to memory
            group_df = store.get(group)
            # remove data from disk
            store.remove(group)
            # update with new data
            group_df.update(df)
            # save updated df back to disk
            store.append(group, group_df, data_columns=dc)

        store.close()


class DataGenerator():
    np.random.seed(1282)

    @staticmethod
    def get_df(rows=100, cols=10, freq='M'):
        """ Simulate data frame
        """
        if cols < 26:
            col_name = list(string.ascii_lowercase[:cols])
        else:
            col_name = range(cols)
        if rows > 2000:
            freq = 'Min'
        index = pd.date_range('19870825', periods=rows, freq=freq)
        df = pd.DataFrame(np.random.standard_normal((rows, cols)),
                          columns=col_name, index=index)
        df.index.name = 'date'
        df.columns.name = 'ID'
        return df

    @staticmethod
    def get_panel(rows=1000, cols=500, items=10):
        """ simulate panel data
        """

        if items < 26:
            item_names = list(string.ascii_lowercase[:cols])
        else:
            item_names = range(cols)
        panel_ = dict()

        for item in item_names:
            panel_[item] = DataGenerator.get_df(rows=rows, cols=cols)

        return pd.Panel(panel_)


def main():
    # Example of with DataFrame
    path = 'D:\\fc_storage.h5'
    groups_map = dict(
        a=dict(columns=['a', 'b', 'c', 'd', 'k']),
        query=dict(columns=['e', 'f', 'g', 'rank_a']),
    )
    storage = LargeDFStorage(path, groups_map=groups_map)
    df = DataGenerator.get_df(rows=200000, cols=15)
    storage.add_columns(df[['a', 'b', 'c', 'e', 'f']])
    storage.update(df[['a']]*3)
    storage.add_columns(df[['d', 'g']])

    print(storage.get(columns=['a','b', 'f'], where='f<0 & e<0'))

    # Example with panel and rank condition
    path2 = 'D:\\panel_storage.h5'
    storage_pnl = LargeDFStorage(path2, groups_map=groups_map)
    panel = DataGenerator.get_panel(rows=800, cols=2000, items=24)
    df = panel.to_frame()
    df['rank_a'] = df[['a']].groupby(level='date').rank()
    storage_pnl.add_columns(df[['a', 'b', 'c', 'e', 'f']])
    storage_pnl.update(df[['a']]*3)
    storage_pnl.add_columns(df[['d', 'g', 'rank_a']])
    print(storage_pnl.get(columns=['a','b','e', 'f', 'rank_a'],
                          where='f>0 & e>0 & rank_a <100'))


if __name__ == '__main__':
    main()

最佳答案

如果没有特别的例子回答这些问题有点困难…
更新hdftable,我想我必须删除
是否要更新单个列?
afaik是的,除非您单独存储单个列,但它将自动完成,您只需将您的df/面板写回hdf存储。
何时使用“数据列”,或者我应该在
hdfstore.append()?
data_columns=True-将为所有列编制索引-如果不使用where参数中的所有列(即,如果所有列都应编制索引),这将浪费资源。
我将只在where=子句中指定那些经常用于搜索的列。将这些列视为数据库表中的索引列。
如果我想基于等级1>500的条件进行查询&
rank_var1<1000,但我需要其他组的列。我能进入
从rank\u var1条件接收到的索引到查询中以获取
基于此索引的其他列(索引是具有
日期和身份证?
我想我们需要一些可复制的样本数据和您的查询示例,以便给出一个合理的答案…
存储副本可以在win/ux系统上使用。我想是的
完全兼容,有什么需要考虑的吗?
是的,应该完全兼容
我计划使用pd.hdfstore(str(self.path),mode='a',complevel=9,
complib='blosc')。关于complevel或complib有什么问题吗?
使用数据测试它-结果可能取决于数据类型、唯一值的数量等。您可能还需要考虑complib-在某些用例中它可能更快。检查this。有时,高的lzo不能给你更好的共压比,但会慢一些(见my old comparison的结果)

关于python - Pandas + HDF5面板数据存储可存储大数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40039776/

相关文章:

python - urllib IncompleteRead() 错误我可以通过重新请求来解决吗?

python - cursor.fetchall() 只返回函数游标的默认值(不运行 fetch all in)

python - 如何正确停止 python 线程?

sql - 查询 SELECT all users who speak all the specified languages

python - 类型错误 : tan^-1 (tan inverse) of the ratio of two dataframe column

python - requests.exceptions.ConnectionError : HTTPConnectionPool(host ='127.0.0.1' , 端口 = 8000):超过最大重试次数,网址:/api/1/

database - 通过数据库删除队列

mysql - 多个表上的 SQL 约束

python - 如何访问数据框滚动窗口中的单个元素

python - pandas read_csv 跳过行不起作用