dask - 使用 dask-distributed 如何从队列提供的长时间运行的任务生成 future

标签 dask dask-distributed

我正在按照此示例 http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow 的方式使用磁盘分布式长时间运行任务其中,长时间运行的工作任务从队列中获取输入(如 tensorflow 示例中所示),并将其结果传递到输出队列。 (我没有看到最新版本的 dask 示例中使用的 channel )。

我可以看到如何分散列表并应用映射来生成将输入数据推送到工作人员输入队列中的 future 列表。

def transfer_dask_to_worker(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)

data = [1,2,3,4] 

future_data = e.scatter(data)

tasks = e.map(transfer_dask_to_worker, future_data ,
     workers=dask_spec['worker'], pure=False)

现在,如果我们等待工作线程消耗任务,所有结果都将位于工作线程的输出队列中。我们可以使用

将其全部拉回来
def transfer_worker_to_dask(arg):
    worker = get_worker()
    return worker.output_queue.get()

results = e.map(transfer_worker_to_dask,range(len(tasks)))

只要我们通过等待所有工作任务完成然后再回调它们来手动处理排序,就可以正常工作。

我们如何将输出 future 链接到输入的下游?有没有办法让长时间运行的任务为工作人员创建可以收集回调度程序任务的 future ?

我尝试让transfer_dask_to_worker(batch)也查询输出队列并返回结果:

def transfer_dask_to_worker_and_return(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)
    return worker.output_queue.get()

这适用于短列表,但开始失败,取消了大约 1000 项的 future 。

提前致谢。

最佳答案

注意:该博文是实验性的。这里有几种方法,我不会局限于那种模式

让我们从这个具体问题开始:

我们如何将输出 future 链接到输入的下游?有没有办法让长时间运行的任务在工作人员上创建可以收集回调度程序任务的 future ?

这里最简单的解决方案可能是分散本地数据,然后将其放入 Dask distributed Queue 。因此,如果您的 TensorFlow 代码在生成某些结果时调用一个函数,那么该函数可能会将本地数据分散到 future (这实际上并不移动数据,它只是让 Dask 工作线程开始跟踪它),然后将其放入 future 进入分布式队列。将 future 放入队列中可以让 Dask 中的其他客户端和工作人员知道数据的存在,并在必要时将其拉下来

from dask.distributed import Queue
results_q = Queue()

def tf_result_ready(result):
    future = get_worker().scatter(result)
    results_q.put(future)

然后,您可以坐在客户端代码中,并在结果可用时从该队列中提取结果:

for _ in range(n_blocks):
    future = results_q.get()
    # do stuff with future like submit or gather

关于dask - 使用 dask-distributed 如何从队列提供的长时间运行的任务生成 future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47826049/

相关文章:

python - 创建列的内存高效方法,该列指示来自一组列的值的唯一组合

python - Dask worker 的内存清理

python - 创建和合并多个数据集不适合内存,使用 Dask?

python dask dataframe将元组列拆分为两列

python - Dask:如果失败则继续执行其他任务

python - dask.distributed LocalCluster 与线程与进程之间的区别

python - 我如何在 Dask 分布式工作人员之间共享一个大型只读对象

python - 使用 Groupby 将 value_counts 存储在 Dask Dataframe 的新列中

python - 启动时自动将数据集添加到 Dask 调度程序

docker - Dask不会清理Docker容器中的上下文