python - 使用 Lock 创建 Dask 延迟。错误: _thread._local没有execution_state

标签 python dask

我想创建一个包含多个 block 的 Dask 数组。 每个 block 都来自读取文件的函数。 为了避免同时从硬盘读取多个文件,我遵循答案here并使用锁。

但是创建延迟会出现以下错误:

AttributeError: '_thread._local' object has no attribute 'execution_state'

测试:

import numpy as np
import dask
import distributed

def make_test_data():
    n = 2
    m = 3
    x = np.arange(n * m, dtype=np.int).reshape(n, m)
    np.save('0.npy', x)
    np.save('1.npy', x)
    shape = (n, m)
    return shape

@dask.delayed
def load_numpy(lock, fn):
    lock.acquire()
    out = np.load(fn)
    lock.release()
    return out

def make_delayed():
    # np.load is a function that reads a file
    # and returns a numpy array.
    read_lock = distributed.Lock('numpy-read')
    return [load_numpy(read_lock, '%d.npy' % i) for i in range(2)]

def main():
    shape = make_test_data()
    ds = make_delayed()

main()

完整的错误消息:

Traceback (most recent call last):
  File "<...>/site-packages/distributed/worker.py", line 2536, in get_worker
    return thread_state.execution_state['worker']
AttributeError: '_thread._local' object has no attribute 'execution_state'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "test_lock.py", line 32, in <module>
    main()
  File "test_lock.py", line 30, in main
    ds = make_delayed()
  File "test_lock.py", line 25, in make_delayed
    read_lock = distributed.Lock('numpy-read')
  File "<...>/site-packages/distributed/lock.py", line 92, in __init__
    self.client = client or _get_global_client() or get_worker().client
  File "<...>/site-packages/distributed/worker.py", line 2542, in get_worker
    raise ValueError("No workers found")
ValueError: No workers found

最佳答案

试试这个

@dask.delayed
def load_numpy(fn):
    lock = distributed.Lock('numpy-read')
    lock.acquire()
    out = np.load(fn)
    lock.release()
    return out

def make_delayed():
    # np.load is a function that reads a file
    # and returns a numpy array.
    read_lock = distributed.Lock('numpy-read')
    return [load_numpy('%d.npy' % i) for i in range(2)]

关于python - 使用 Lock 创建 Dask 延迟。错误: _thread._local没有execution_state,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51711953/

相关文章:

python - 如何将utf-8输出重定向到txt文件

python - 将数据从 S3 加载到 dask 数据帧

python - Fabric / python : AttributeError: 'NoneType' object has no attribute 'partition'

python - 如何使用 .query() 通过时间戳函数过滤 Pandas 数据帧

python - Dask 中的索引存在数据左移 q 的问题

python-3.x - 首先通过 ID 聚合 Dask 数据,然后在几分钟内聚合 Dask 数据时杀死了工作人员

python - 并行化执行按位操作的代码

python - 不支持延迟对象的真实性

python - Django:通过注释字段的总和来排序查询集?

python - 如何将结构文本文件转换为 pandas 数据框