我正在尝试使用 Dask,并希望将查找 pandas.DataFrame
发送到所有工作节点。不幸的是,它失败了:
TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')
当使用 lookup.result()['foo'].iloc[2]
代替 lookup['baz'].iloc[2]
时,它可以工作很好,但是:对于输入数据帧的较大实例,它似乎一次又一次地卡在 from_pandas
处。另外,似乎很奇怪的是, future 需要手动阻塞(对于应用操作中的每一行一遍又一遍。有没有办法为每个工作节点仅阻塞一次?一个天真的改进可能是使用 map_partitions
,但这只有在分区数量相当小时才可行。
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})
df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)
def foo(row, lookup):
# TODO some computation which relies on the lookup
return lookup['foo'].iloc[2]
df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()
事实上,对于较大的问题实例,这种简单的 dask 实现似乎比普通的 pandas 慢。我怀疑执行速度慢与上面提出的问题有关。
最佳答案
而不是这个:
df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
试试这个:
df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))
以前,您将 future 隐藏在 lambda 函数内。 Dask 无法找到它并将其转换为正确的值。相反,当我们将 future 作为正确的论点传递时,Dask 能够识别它的本质并为您提供正确的值。
关于python - Dask 广播在计算图中不可用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56084548/