python - Dask 广播在计算图中不可用

标签 python pandas dask dask-distributed

我正在尝试使用 Dask,并希望将查找 pandas.DataFrame 发送到所有工作节点。不幸的是,它失败了:

TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')

当使用 lookup.result()['foo'].iloc[2] 代替 lookup['baz'].iloc[2] 时,它可以工作很好,但是:对于输入数据帧的较大实例,它似乎一次又一次地卡在 from_pandas 处。另外,似乎很奇怪的是, future 需要手动阻塞(对于应用操作中的每一行一遍又一遍。有没有办法为每个工作节点仅阻塞一次?一个天真的改进可能是使用 map_partitions,但这只有在分区数量相当小时才可行。

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)


def foo(row, lookup):
    # TODO some computation which relies on the lookup
    return lookup['foo'].iloc[2]

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()

事实上,对于较大的问题实例,这种简单的 dask 实现似乎比普通的 pandas 慢。我怀疑执行速度慢与上面提出的问题有关。

最佳答案

而不是这个:

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))

试试这个:

df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

以前,您将 future 隐藏在 lambda 函数内。 Dask 无法找到它并将其转换为正确的值。相反,当我们将 future 作为正确的论点传递时,Dask 能够识别它的本质并为您提供正确的值。

关于python - Dask 广播在计算图中不可用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56084548/

相关文章:

python - 如何使用 Python 自动终止使用过多内存的进程?

python - 使用python脚本远程访问Django中的sqlite3

python - Groupby 多列和聚合与 dask

dask - 是否可以估计 Dask 操作的执行时间

python - 批量保存数据到图像: need speedup

python - 摄氏度到华氏度的方法不起作用

python - Pandas For Loop 错误 - 嵌入了和/if 语句

python - 仅取消堆叠或旋转某些列

python - 从 Pandas 的每一行创建一个新列

python - 当我将 isin 与 Dask 数据帧一起使用时,会引发 NotImplementedError