除非 Localcluster
中未使用 asynchronous
关键字,否则以下示例不起作用。我想控制使用多少个进程/工作人员并并行处理函数并在准备好时打印结果。需要改变什么?
import time
from dask.distributed import Client, LocalCluster, as_completed
def wait(sec):
time.sleep(sec)
return sec
def main():
cluster = LocalCluster(n_workers=2, ncores=2, asynchronous=True)
inputs = [5, 7, 3, 1]
client = Client(cluster)
futures = client.map(wait, inputs)
for future, result in as_completed(futures, with_results=True):
print(result)
client.close()
if __name__ == '__main__':
main()
最佳答案
按照您的建议,您应该从 LocalCluster 调用中删除 asynchronous=
关键字。该关键字用于支持异步函数,如下所示:
async def main():
cluster = await LocalCluster(n_workers=2, ncores=2, asynchronous=True)
inputs = [5, 7, 3, 1]
client = await Client(cluster, asynchronous=True)
futures = client.map(wait, inputs)
async for future, result in as_completed(futures, with_results=True):
print(result)
await client.close()
如果您不想使用 async-await 语法(这种语法相对较少),那么您应该忽略 asynchronous= 关键字。它可能不会做你想象的那样。
关于python - Dask 异步处理,在结果到达时打印结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50975543/