使用 dask hdf/parquet 的 Python 大数据集特征工程工作流

标签 python pandas dask hdf feature-engineering

SO 中已经有一个很好的问题了但最好的答案现在已经有 5 年了,所以我认为 2018 年应该会有更好的选择。

我目前正在寻找大于内存数据集的特征工程管道(使用合适的数据类型)。

初始文件是一个 csv,内存放不下。这是我的需求:

  1. 创建特征(主要是对多列使用groupby操作。)
  2. 将新特征合并到以前的数据(在磁盘上,因为它不适合内存)
  3. 为某些 ML 应用程序使用子集(或所有)列/索引
  4. 重复 1/2/3(这是一个迭代过程,如第 1 天:创建 4 功能,第 2 天:再创建 4 个 ...)

尝试使用 parquet 和 dask:

首先,我将大的 csv 文件拆分为多个小的“parquet”文件。有了这个,dask 对于​​新特征的计算非常有效,但是我需要将它们合并到初始数据集和 atm,我们不能将新列添加到 parquet 文件。逐 block 读取 csv,合并并重新保存到多个 parquet 文件太耗时,因为特征工程在这个项目中是一个迭代过程。

尝试使用 HDF 和 dask:

然后我转向了 HDF,因为我们可以添加列并使用特殊查询,而且它仍然是二进制文件存储。我再次将大 csv 文件拆分为多个 HDF,使用相同的 key='base' 作为基本功能,以便使用 DASK 的并发写入(HDF 不允许)。

data = data.repartition(npartitions=10) # otherwise it was saving 8Mo files using to_hdf
data.to_hdf('./hdf/data-*.hdf', key='base', format='table', data_columns=['day'], get=dask.threaded.get)

(附件问题:指定 data_columns 似乎对 dask 没有用,因为 dask.read_hdf 中没有“where”?)

与我的预期不同,我无法使用如下代码将新功能合并到多个小文件中:

data = dd.read_hdf('./hdf/data-*.hdf', key='base')
data['day_pow2'] = data['day']**2
data['day_pow2'].to_hdf('./hdf/data-*.hdf', key='added', get=dask.threaded.get) 

使用 dask.threaded,我在 2% 后得到“python 停止工作”。 使用 dask.multiprocessing.get 它需要永远并创建新文件

最适合此工作流程的工具(存储和处理)是什么?

最佳答案

我只会复制一份来自 related issue 的评论在 fastparquet 上:在技术上可以向现有的 parquet 数据集添加列,但这在 fastparquet 中没有实现,并且可能也没有在任何其他 parquet 实现中实现。

编写代码来执行此操作可能不会太繁重(但目前未计划):调用 write columns顺序发生,因此用于写入的新列需要向下渗透到此函数,以及与页脚中元数据的当前第一个字节相对应的文件位置。此外,架构需要单独更新(这很简单)。需要对数据集的每个文件重复该过程。这不是问题的“答案”,但也许有人愿意承担这项任务。

关于使用 dask hdf/parquet 的 Python 大数据集特征工程工作流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49554270/

相关文章:

python - 分布式Dask如何高效提交大参数任务?

python - 无法在 PyDev 中创建 Django 项目

python - 更改马赛克图的默认颜色

python - Telebot + Celery + pytransitions : response to task

python - 使用列表匹配包含整个单词的正则表达式

python - 在多级 DataFrame 上使用 pandas apply 函数

python - Dask 数据帧如何处理大于内存的数据集?

python - 在 Dask apply 中返回结构化行

python - 删除矩阵 subview 中的第一个元素

Python 2 无法获取键和值(字典和元组)