python - 如何洗牌 Dask 包中的元素

标签 python dask

我有一个数据集,其中一些元素彼此靠近并且通常最终位于同一分区中,导致比其他元素更多的计算,因为它们具有二次复杂性。我想随机重新排列它们,以便工作负载最终或多或少均匀地分布在各个分区中,并且我避免必须在单个分区中进行所有计算。

现在我正在使用如下代码下载协调器中的所有内容:

import dask.bag as db
import random

bag = ...
l = bag.compute()
random.shuffle(l)
bag = db.from_sequence(l)

有没有办法以更分布式的方式做到这一点?例如,我尝试过根据随机键重新分区,但我最终发现大多数分区都是空的。

最佳答案

一种解决方案是对袋子的每个分区进行洗牌。问题是您只能为每个分区独立洗牌。

import random

import dask.bag as db
import matplotlib.pyplot as plt

# we can't directly use shuffle because it does inplace
def shuffle(x):
    """shuffle and return x"""
    random.shuffle(x)
    return x

bag = db.from_sequence(list(range(2000)), npartitions=4)
# we apply the shuffle to each partition
result = bag.map_partitions(shuffle).compute()
print(result[:10], result[-10:])
# [233, 204, 181, 18, 50, 114, 424, 6, 195, 348] [1910, 1623, 1730, 1552, 1754, 1899, 1659, 1946, 1834, 1551]

plt.scatter(result, range(len(result)))

enter image description here

如您所见,它只是在每个分区上出现问题。但是由于 bag 使用多处理并且我们不在分区之间共享内存,所以这应该非常快。

另一种方法,如果您可以使用 dask.array 而不是 dask.bag,则使用 shuffle_slice。这确实给出了更统一的结果。

import random

import numpy as np
import dask.array as da
from dask.array.slicing import shuffle_slice
import matplotlib.pyplot as plt

array = da.from_array(np.arange(2000))
shuffle_index = np.arange(2000)
np.random.shuffle(shuffle_index)

array = shuffle_slice(array, shuffle_index)
result = array.compute()
print(result[:10], result[-10:])

plt.scatter(result, range(len(result)))

enter image description here

如果您可以使用 dask.dataframe,使用 random_split 可能会更容易.

关于python - 如何洗牌 Dask 包中的元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67489774/

相关文章:

python - 在 python 中过滤/迭代非常大的列表

python - pyyaml 加载数为十进制

c++ - python 执行现有的 (&big) c++ 代码

python - 如何跨多个调用将 dask-DAG 持久保存在分布式集群上并保留中间结果?

python - 如何超时提交给 Dask 的作业?

python - 将多个 Parquet 文件中的数据检索到一个数据帧中 (Python)

python - 从具有特定模式的 txt 文件创建 Pandas DataFrame

python - 在字符串列表 (Python) 中查找特定模式(正则表达式)

dask - 使用 dask.delayed 和 pandas.DataFrame 将 dask.bag 字典转换为 dask.dataframe

arrays - 多个图像意味着 dask.delayed 与 dask.array