python - 并行化列表过滤

标签 python dictionary dask dask-delayed bag

我有一个项目列表,需要根据某些条件进行过滤。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几千万条记录)。

基本上,我需要做的是:

items = [
    {'type': 'dog', 'weight': 10},
    {'type': 'dog', 'weight': 20},
    {'type': 'cat', 'weight': 15},
    {'type': 'dog', 'weight': 30},
]

def item_is_valid(item):
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions

    return item_is_valid

items_filtered = [item for item in items if item_is_valid(item)]

使用 Dask,我已经做到了以下几点:

def item_is_valid_v2(item):
    """Return the whole item if valid."""
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions
    
    if item_is_valid:
        return item

results = []
item = []
for item in items:
    delayed = dask.delayed(item_is_valid)(item)
    results.append(delayed)

results = dask.compute(*results)

但是,我得到的结果包含一些 None 值,然后需要以非并行方式以某种方式过滤掉这些值。

({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)

最佳答案

也许 bag API 对您有用,这是一个粗略的伪代码:

import dask.bag as db

bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)

要检查这是否有效,请检查 result.take(5) 的结果,如果结果令人满意:

computed_result = result.compute()

关于python - 并行化列表过滤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69109158/

相关文章:

python - 更改 seaborn 热图的 xticklabels 字体大小

dictionary - 有没有关于常用编程词汇的字典?

python - 合并列与 dask

python - 复杂的字典引用,python

python - 使用Python和Dask计算欧氏距离

pandas - 从 BigQuery 加载大量数据到 python/pandas/dask

python - 为什么setuptools需要写字节码?

python - Tensorflow reshape 用法

python - 类型错误 : return arrays must be of ArrayType using lambdify of sympy in python

python - 将带有整数键的字典转换为 numpy 数组