python - 如何使用dask有效地计算许多简单统计数据

标签 python pandas distributed dask dask-distributed

问题
我想用dask计算一堆“容易收集”的统计数据。
速度是我最关心的问题,也是我的目标,所以我希望对这个问题有一个广泛的研究。
理想情况下,我希望在不到一小时内完成所描述的问题。
我希望雇佣100-1000名工人。
目前在基准测试中,我正在大型机器上运行这个程序(160核,4tb ram),但计划很快转移到kubernetes。
安装程序
我在数据框中有一些数据(pandas、dask、csv、parquet等)
我还有很多数据子集(带有任意列过滤器),我想为它们计算统计信息。
数据帧大小:介于5 GB和5 TB之间的数据(1亿行,1000列)。预计未来为50-100 TB。
统计数据大小:大约5000个唯一的过滤器,每个唯一的过滤器有1到500个统计数据。(5K-5M统计)
玩具示例如下:

requested_statistics = [
    {'filters': [{'col': 'col_1', 'op': 'lt', 'value': 0.8},
                 {'col': 'col_38', 'op': 'lt', 'value': 0.4},
                 {'col': 'col_224', 'op': 'gt', 'value': 0.12333}],
     'output': {'col': 'col_3', 'op': 'sum'},
     'name': 'stat_1'},
     {'filters': [{'col': 'col_551', 'op': 'lt', 'value': 0.8},
                  {'col': 'col_112', 'op': 'gt', 'value': '2018-01-13'},
                  {'col': 'col_1', 'op': 'lt', 'value': 0.8}],
      'output': {'col': 'col_2', 'op': 'avg'},
      'name': 'stat_2'}
]

我可以编写一个简单的解析器,运行在dask或pandas上:
def filter_index(df, filter):
    filter_ops = {'lt': lambda x, y: x < y, 'gt': lambda x, y: x > y, 'eq': lambda x, y: x == y}
    return filter_ops[filter['op']](df[filter['col']], filter['value'])

def get_indexer(df, filters):
    if len(filters) == 1:
        return filter_index(df, filters[0])
    return np.logical_and(filter_index(df, filters[0]), get_indexer(df, filters[1:]))

def get_statistic(df, statistic):
    indexer = get_indexer(df, statistic['filters'])
    agg_ops = {'sum': np.sum, 'avg': np.mean, 'unique_count': lambda x: x.unique().size}
    return agg_ops[statistic['output']['op']](df[statistic['output']['col']][indexer])

all_stats = {x['name']: get_statistic(df, x) for x in requested_statistics}

我试过一些优化。
1)只需依赖dask:future_stats = client.compute(all_stats)
这不起作用,因为优化图(或只是序列化到调度程序)的计算时间太长。
在小规模测试中,这种方法工作得很好,但当我放大nPartition时,这种方法的时间缩放效果似乎比O(N)差得多。
2)对每个统计数据进行计算(client.compute(stat, sync=True)client.compute(stat).result())。
这增加了与调度程序对话的太多开销,对于我试图计算的大约100000个统计数据来说,这将花费太长的时间。
3)缓存(通过持久化)中间结果(索引器),以便我可以重用。
考虑到过滤器有时可以共享索引器,我在filter_indexget_indexer字段中添加了缓存。
具体来说,创建一个散列并indexer = client.persist(indexer),在以后的调用中返回持久化索引器对于get_indexer,我还添加了一个combinations检查,试图查看高速缓存中是否存在任何过滤器子集。我还优化了调用统计信息的顺序,以便在下一个集合中最多只需要1个新的更改索引器。
(例如,一次完成所有共享同一个过滤器的操作,然后转到下一个)。
这就产生了一个不幸的后果,即需要大量的内存来保存所有的布尔掩码。
我还没有尝试滚动缓存(当计算运行时,cache.pop(index_id),一旦计算不再需要它持久化),但这是我的下一步。
当前的关键问题
上面列出的解决方案(3)是我目前实现的,但它的性能仍然不如我所希望的那样好。
内存开销非常高(有效地为每个唯一的筛选器创建一个完整的新列)
调度程序/图序列化似乎很昂贵
htop可以看出,大多数情况下,只有dask-scheduler在以100%的速度运行,而工人大多处于空闲状态。
问题
1)我可以采取哪些其他方法,或者我的方法中是否有明显的遗漏?
2)我考虑过df.query(string),但由于它在整个数据帧上运行,因此似乎效率低下(大量重复数据)这是真的吗,还是通过使用内置语法分析器取得了一些成功(我注意到dask图对此较小,但不确定它是否值得)。
3)调度器和单线程(?)dask图形生成器似乎是瓶颈,是否有明确的路径来并行化这些?
4)当我查看分布式bokeh状态观察程序时,我经常注意到它在这些计算过程中也会挂起,这使得调试变得困难,并且让我好奇使用web服务器是否真的会损害调度程序的性能?这是真的吗?
5)在日志中,我收到了很多Event loop was unresponsive in Worker for Xs.警告我能做些什么来帮助平衡工作,或者重新编写分配给工作人员的任务,或者使调度程序更具响应性?
6)从减少图的复杂度的愿望来看,我有repartition(npartitions=num_workers*2),但我不确定这是一个好的启发式还是我应该使用什么?
下面是调度程序正在管理的任务的一个示例(这是针对~25个唯一的筛选器,每个筛选器有~50个统计信息,总共有~1000个统计信息正在计算)。
https://i.imgur.com/hRzmXHP.png
感谢您对如何考虑优化这个问题的帮助或指导性建议。

最佳答案

我想到了两个一般性的建议,但是如果没有实践经验,很难诊断出这样的问题。听起来你已经在看仪表盘了,很高兴听到我将在这里集中讨论两个避免调度开销的建议,因为这是您特别提到的。
使用较大的分区
dd.read_csv等操作的默认分区大小足够小,可以在消费型笔记本电脑上使用。(我怀疑它们大约是128MB)考虑到节点的大小,您可以将其增加10倍(或更多)并保持良好状态。这将减少您的调度开销10倍。
使用高级图融合
截至2018年12月20日,这仍在开发分支中,但dask.dataframe开始在表达式级别而不是任务级别融合。这将有助于显著减少数千个统计数据的开销,从dask的角度来看,可能会将它们变成一个任务。
您可能需要跟踪以下prs:
https://github.com/dask/dask/pull/4092
https://github.com/dask/dask/pull/4229
我还鼓励您将您的用例作为github问题提供一个综合示例,以便它可以为将来的开发提供信息。我建议使用dask.datasets.timeseries()来生成一个假数据帧,然后使用一些简单的方法从中生成大量简单的统计数据(如果可能的话,简单的方法会更好,这样维护人员就不必陷得太深)。

关于python - 如何使用dask有效地计算许多简单统计数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53844188/

相关文章:

python - 如何向 QTabWidget 添加新选项卡

python - 将一个数据框与另一个数据框分离

database - 为什么 "joins"会降低大型分布式数据库系统的可扩展性?

python - 如何在 dask/distributed 中存储 worker-local 变量

python - 从 CSV 文件编写微分方程(与 solve_ivp 一起使用)

python - 修改非理想列表格式并将其导出到 Excel (Python)

python - Python 中的 DataFrame 切片失败

database - 自动增量在数据库内部如何工作?

python - 尝试从 Azure Key Vault 加载证书时出现 OpenSSL.crypto.Error

python - SQLite 查询性能、Transpose、Melt 和 Pandas