python - 使用 SSHCluster 客户端的 Dask future 未并行化

标签 python dask dask-distributed

我是 Dask 的新手,目前正在尝试使用 futures 制作一个简单的示例,但我似乎无法使其工作。我的目标是使用 futures 获取所有节点的主机名(我有一个由 3 个节点组成的集群,其中 2 个节点是工作节点,一个调度程序)。 为此,我创建了一个休眠 2 秒的函数,然后获取主机名。然而,当我多次启动该函数时,我似乎只获得了一个节点的主机名,而不是 2 个节点。这是我的代码和输出:

#!/usr/bin/python3
from dask.distributed import Client, SSHCluster
from time import sleep
import dask
from dask import delayed
import socket
import time


cluster=SSHCluster(["10.20.3.1","c002-interconnect-1","c003-interconnect-1"],connect_options={"known_hosts":None},worker_options={},scheduler_options={"port":0,"dashboard_address":":8788"})
client=Client(cluster)
client.cluster.scale(2)

def gethost():
    sleep(2)
    return socket.gethostname()



futures=[]
workers=(client.scheduler_info()['workers'])
print("workers details")


start=time.time()

for i in workers.keys():
    l=(workers[i]['nthreads'])
    l2=(workers[i]['metrics']['cpu'])
    print("worker :"+i)
    print('number of cpu: ')
    print(l2)
    print('number of threads:')
    print(l)
    print("##########")
    for j in range(int((l*(l2+10)))):
        future=client.submit(gethost)
        futures.append(future)

results=[future.result() for future in futures]

end=time.time()
print("results:")
print(results)
print("time: ")
print(end-start)

这是我得到的输出:


distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - /usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8788 is already in use.
distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
distributed.deploy.ssh - INFO - Hosting the HTTP server on port 39850 instead
distributed.deploy.ssh - INFO - http_address["port"], self.http_server.port
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at:     tcp://10.20.3.1:41283
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.20.3.3:46321'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.20.3.2:32907'
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:      tcp://10.20.3.3:35165
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:      tcp://10.20.3.2:33232
workers details
worker :tcp://10.20.3.2:33232
number of cpu:
0.0
number of threads:
12
##########
worker :tcp://10.20.3.3:35165
number of cpu:
0.0
number of threads:
12
##########
results:
['c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002']
time:
3.2259795665740967

如果您想了解更多详细信息,请告诉我。 这也是我的第一个堆栈溢出问题,所以如果我做错了什么,请告诉我!

最佳答案

可能需要在工作人员上导入socket:

def gethost():
    sleep(2)
    import socket
    return socket.gethostname()

此外,client.submit 部分应指定哪个工作线程来执行该函数:

for i in workers.keys():
   # code skipped
   future = client.submit(gethost, pure=False, workers=[i])

我目前无法尝试,但希望这会有所帮助。

关于python - 使用 SSHCluster 客户端的 Dask future 未并行化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68102238/

相关文章:

python - dask 在运行函数之前是否等待资源可用?

python - Dask 延迟/Dask 阵列无响应

hadoop - Dask:从 HDFS 读取时,pyarrow/hdfs.py 返回 OSError: Getting symbol hdfsNewBuilder failed

python - 并行dask for循环比常规循环慢?

python - 用全息图绘制表格数据

python - Dask 与 MS VS2017 一起分发

python - 如何生成uuid字符串

python - Matplotlib rcparams (autolimit_mode) 用于单个图形

python - ValueError : Conflicting metadata name name, 需要区分 Pandas 中的前缀

Python:将类名作为参数传递给函数?