python - 在 Dask map 功能中设置最大工作人员数

标签 python dask dask-distributed dask-dataframe dask-delayed

我有一个 Dask 进程可以触发 100 个具有映射功能的工作人员:

worker_args = .... # array with 100 elements with worker parameters 

futures = client.map(function_in_worker, worker_args) 
worker_responses = client.gather(futures)

我使用 docker,其中每个 worker 都是一个容器。我已将 docker 配置为生成 20 个 worker /容器,如下所示:

docker-compose up -d --scale worker=20

问题是我的机器崩溃了,因为 map 函数并行触发了 20 个 worker,这使得内存和 CPU 超过了最大值。

我想保留 20 个 worker 的配置,因为我将这些 worker 用于不需要大量内存的其他功能。

如何将 map 函数限制为 5 个 worker 并行?

最佳答案

dask 不会根据空闲的 worker 数量动态调整 worker 资源。在您提供的示例中,一旦启动了20个worker,如果只使用了5个worker,则不会分配剩余15个空闲worker的资源。

如果这是可以接受的(例如,因为空闲资源正在被外部程序使用),那么将工作限制为 5 个 worker 的一种方法是通过 workers kwarg 明确指定它们到 。映射调用:

# instantiate workers
from distributed import Client
c = Client(n_workers=20)

# select at most 5 workers from the available list
selected_workers = list(c.scheduler_info()['workers'])[:5]

dummy_function = lambda x: x**2
futs = c.map(dummy_function, range(10), workers=selected_workers)

控制工作负载分配的另一种方法是使用resources kwarg,请参阅以下相关答案:0 , 1 , 2 , 3 .

关于python - 在 Dask map 功能中设置最大工作人员数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74304427/

相关文章:

python - Dask dataframe - 根据分隔符将列拆分为多行

python - 在dask数据帧上使用groupby

dask - Streamz 与 Dask 分布式

python - 如何编写加密单元测试?

python - DEBUG 为 False 时错误处理程序不触发

python - Dask Dataframe 形状属性给出了错误的形状

python - 计算具有共同依赖性的两个值时 Dask 高内存使用率

azure - 在 Azure 中部署容器集群

python - 返回 Python 中最低有效位的索引

python - 如何对不规则时间戳列表进行重采样/下采样?