distributed-computing - 使用 Dask 从文件系统/S3 并行读取文件 block ?

标签 distributed-computing dask

我正在整理一个概念证明,我想在其中使用 PyCuda 在分布式环境(具体来说是 AWS)中处理大型字符数据文件(每个任务一个文件约 8GB)。我知道 HDFS 会将数据文件分段并将其分发给工作人员,但我正在努力使我的环境尽可能简单,如果没有必要,我宁愿不必安装 Hadoop。

我最近看了几个来自 Continuum Analytics 的关于他们的 Dask 框架的网络研讨会,看起来它完全可以满足我的需要。鉴于以上段落和 Dask 框架,当前对文件系统的建议是什么?我是坚持使用 HDFS 还是有更好/更简单的解决方案?

最佳答案

大多数文件系统都提供只读取文件的一部分的能力,包括 HDFS(您的本地文件系统)和 S3(AWS 实例的标准批量数据存储)。这允许并行计算框架(如 Dask)将大文件分成许多较小的部分,供工作人员并行处理。

dask.bytes.read_bytes

对于大多数用例,这会在幕后自动发生(read_textread_csv 的用户不必为此担心。)听起来您有一个自定义文件格式,所以我将引导您使用 read_bytes 函数。对于 S3,其工作方式如下:

from dask.bytes import read_bytes
sample, partitions = read_bytes('s3://bucket/keys.*.foo', 
                                blocksize=100000000)

样本将是一个 10kB 的简短数据样本,partitions 将是 dask.delayed 的列表。可与一般 for 循环一起使用以构建计算的对象。

如果您的数据有某种您希望 dask 遵守的分隔符,您可以使用 delimiter= 关键字参数提供它。

同样的功能也适用于其他系统,例如您的本地文件系统或 HDFS(如果您已经安装并导入了 hdfs3distributed)。

sample, partitions = read_bytes('local://bucket/keys.*.foo', blocksize=100000000)
sample, partitions = read_bytes('hdfs://bucket/keys.*.foo')

例子

例如,这是我们如何实现 dask.dataframe.read_csv

的一个不正确但说明性的版本
from dask import delayed
import pandas as pd
import dask.dataframe as dd

def read_csv(path, **kwargs):
    sample, partitions = read_bytes(path, blocksize=100000000, delimiter=b'\n')
    dataframes = [delayed(pd.read_csv)(part, **kwargs) for part in partitions]
    return dd.from_delayed(dataframes)

这是不正确的,因为 pd.read_csv 实际上需要一个 BytesIO 对象,我们没有稳健地处理关键字参数,并且我们没有很好地管理示例中的数据帧元数据(列、数据类型等) ..)这些细节妨碍了一般观点,并且可能超出了这个问题的兴趣。

编辑:在更常见的情况下使用其他功能

人们一直将此问题作为对“如何从 S3 读取数据?”这一更普遍问题的回答。大多数人不使用read_bytes 接口(interface),它有点低级。相反,大多数用户可能希望使用如下所示的高级功能之一:

import dask.bag as db
records = db.read_text('s3://bucket/keys.*.json').map(json.loads)

import dask.dataframe as dd
df = dd.read_csv('s3://bucket/keys.*.csv')

关于distributed-computing - 使用 Dask 从文件系统/S3 并行读取文件 block ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37245163/

相关文章:

c# - 恶霸算法

algorithm - 协调员首先做什么?

distributed-computing - 为什么这个输出是错误的? - 顺序一致性

java - 将两个 SOAP 服务合二为一的首选设计 "transaction"

synchronization - COMPAS 中的不同步轨迹

python - 在 dask 生成的进程中调用 dask

python - Python 3.5 支持的 dask 版本是什么?

python - 使用 AutoSklearn 时得到 "UserWarning: Port 8787 is already in use",为什么 AutoSklearn 会使用端口?

python - Dask 数据框中的多个聚合用户定义函数

python - 使用非唯一索引列日期在 Dask 数据框中提取最新值