在 SO 中已经有一个很好的问题了但最好的答案现在已经有 5 年了,所以我认为 2018 年应该会有更好的选择。
我目前正在寻找大于内存数据集的特征工程管道(使用合适的数据类型)。
初始文件是一个 csv,内存放不下。这是我的需求:
- 创建特征(主要是对多列使用groupby操作。)
- 将新特征合并到以前的数据(在磁盘上,因为它不适合内存)
- 为某些 ML 应用程序使用子集(或所有)列/索引
- 重复 1/2/3(这是一个迭代过程,如第 1 天:创建 4 功能,第 2 天:再创建 4 个 ...)
尝试使用 parquet 和 dask:
首先,我将大的 csv 文件拆分为多个小的“parquet”文件。有了这个,dask 对于新特征的计算非常有效,但是我需要将它们合并到初始数据集和 atm,我们不能将新列添加到 parquet 文件。逐 block 读取 csv,合并并重新保存到多个 parquet 文件太耗时,因为特征工程在这个项目中是一个迭代过程。
尝试使用 HDF 和 dask:
然后我转向了 HDF,因为我们可以添加列并使用特殊查询,而且它仍然是二进制文件存储。我再次将大 csv 文件拆分为多个 HDF,使用相同的 key='base' 作为基本功能,以便使用 DASK 的并发写入(HDF 不允许)。
data = data.repartition(npartitions=10) # otherwise it was saving 8Mo files using to_hdf
data.to_hdf('./hdf/data-*.hdf', key='base', format='table', data_columns=['day'], get=dask.threaded.get)
(附件问题:指定 data_columns 似乎对 dask 没有用,因为 dask.read_hdf 中没有“where”?)
与我的预期不同,我无法使用如下代码将新功能合并到多个小文件中:
data = dd.read_hdf('./hdf/data-*.hdf', key='base')
data['day_pow2'] = data['day']**2
data['day_pow2'].to_hdf('./hdf/data-*.hdf', key='added', get=dask.threaded.get)
使用 dask.threaded,我在 2% 后得到“python 停止工作”。 使用 dask.multiprocessing.get 它需要永远并创建新文件
最适合此工作流程的工具(存储和处理)是什么?
最佳答案
我只会复制一份来自 related issue 的评论在 fastparquet 上:在技术上可以向现有的 parquet 数据集添加列,但这在 fastparquet 中没有实现,并且可能也没有在任何其他 parquet 实现中实现。
编写代码来执行此操作可能不会太繁重(但目前未计划):调用 write columns顺序发生,因此用于写入的新列需要向下渗透到此函数,以及与页脚中元数据的当前第一个字节相对应的文件位置。此外,架构需要单独更新(这很简单)。需要对数据集的每个文件重复该过程。这不是问题的“答案”,但也许有人愿意承担这项任务。
关于使用 dask hdf/parquet 的 Python 大数据集特征工程工作流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49554270/