python - 将自定义文件格式读取到 Dask 数据框

标签 python pandas dataframe hdfs dask

我有一个巨大的自定义文本文件(无法将整个数据加载到一个 Pandas 数据帧中),我想将其读入 Dask 数据帧。我编写了一个生成器来读取和解析块中的数据并创建 Pandas 数据帧。我想将这些 Pandas 数据帧加载到 dask 数据帧中,并对结果数据帧执行操作(例如创建计算列、提取数据帧的一部分、绘图等)。
我尝试使用 Dask bag 但未能成功。
所以我决定将结果数据帧写入 HDFStore,然后使用 Dask 从 HDFStore 文件中读取。当我从我自己的电脑上做这件事时,这很有效。代码如下。

cc = read_custom("demo.xyz", chunks=1000) # Generator of pandas dataframes
from pandas import HDFStore
s = HDFStore("demo.h5")
for c in cc:
    s.append("data", c, format='t', append=True)
s.close()

import dask.dataframe as dd
ddf = dd.read_hdf("demo.h5", "data", chunksize=100000)
seqv = (
    (
        (ddf.sxx - ddf.syy) ** 2
        + (ddf.syy - ddf.szz) ** 2
        + (ddf.szz - ddf.sxx) ** 2
        + 6 * (ddf.sxy ** 2 + ddf.syz ** 2 + ddf.sxz ** 2)
    )
    / 2
) ** 0.5
seqv.compute()

由于最后一次计算速度很慢,我决定将它分布在我 LAN 上的几个系统上,并在我的机器上启动一个调度程序,并在其他系统中启动几个工作程序。并点燃了 Client如下。

from dask.distributed import Client
client = Client('mysystemip:8786') #Establishing connection with the scheduler all fine.

然后读入 Dask 数据帧。但是,当我执行 seqv.compute() 时出现以下错误.

HDF5ExtError: HDF5 error back trace

  File "H5F.c", line 509, in H5Fopen
    unable to open file
  File "H5Fint.c", line 1400, in H5F__open
    unable to open file
  File "H5Fint.c", line 1615, in H5F_open
    unable to lock the file
  File "H5FD.c", line 1640, in H5FD_lock
    driver lock request failed
  File "H5FDsec2.c", line 941, in H5FD_sec2_lock
    unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'

End of HDF5 error back trace

Unable to open/create file 'demo.h5'

我确保所有 worker 都可以访问 demo.h5文件。我尝试传入 lock=Falseread_hdf .得到了同样的错误。

这是不可能的吗?可以尝试另一种文件格式吗?我想将每个 Pandas 数据帧写入单独的文件可能会起作用,但我试图避免它(我什至不想要一个中间的 HDFS 文件)。但在我到达那条路线之前,我想知道是否有其他更好的方法来解决这个问题。

感谢您的任何建议!

最佳答案

如果您想从文本文件中的自定义格式读取数据,我建议使用 dask.bytes.read_bytes函数,它返回一个延迟对象列表,每个对象指向文件中的一个字节块。默认情况下,这些块将由行分隔符干净地分隔。

像这样的事情可能会奏效:

def parse_bytes(b: bytes) -> pandas.DataFrame:
    ...

blocks = dask.bytes.read_bytes("my-file.txt", delimiter=b"\n")
dataframes = [dask.delayed(parse_bytes)(block) for block in blocks]
df = dask.dataframe.from_delayed(dataframes)
  • https://docs.dask.org/en/latest/remote-data-services.html#dask.bytes.read_bytes
  • https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_delayed
  • 关于python - 将自定义文件格式读取到 Dask 数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59898355/

    相关文章:

    python - 使用 sklearn 聚类单变量时间序列

    python - Pandas Pivot 和 Merge 不起作用

    python - Pandas:如何使用 json 数组分解数据框

    R将列表列表转换为数据框

    python - 如何使用 numpy 数组访问列表的条目而不使用 for 循环

    Python/Django AttributeError "Object ' 玩家没有属性 'fields'

    pandas - 使用 Cython 查找数组中所有唯一元素的最快方法

    python - 选择 Pandas 中特定值后面包含 NaN 的行

    r - 如何通过组合当前变量名和第 1 行值来修改变量名?

    python - 在 Tensorflow 中使用 TPU 时,是否有适当的解决方法来保存本地驱动器中的检查点?