python - 适用于DataFrame操作/功能的Python多重处理

标签 python pandas multithreading multiprocessing

我正在使用Pandas Dataframe处理100,000行文本数据。每隔一段时间(<100,000个中的5个),我在选择删除的行中都会出现错误。错误处理功能如下:

def unicodeHandle(datai):
    for i, row in enumerate(datai['LDTEXT']):
        print(i)
        #print(text)
        try:
            text = row.read()
            text.strip().split('[\W_]+') 
            print(text)
        except UnicodeDecodeError as e:
            datai.drop(i, inplace=True)
            print('Error at index {}: {!r}'.format(i, row))
            print(e)
    return datai

该功能运行良好,并且我已经使用了几个星期。

问题在于,我永远不知道何时会发生错误,因为数据来自不断添加到数据库中的数据(或者我可能提取不同的数据)。关键是,我必须遍历每一行以运行我的错误测试函数unicodeHandle以便初始化我的数据。这个过程大约需要5分钟,这有点烦人。我正在尝试实现多处理以加快循环速度。通过网络和各种教程,我得出了:

def unicodeMP(datai):
    chunks = [datai[i::8] for i in range(8)]
    pool = mp.Pool(processes=8)
    results = pool.apply_async(unicodeHandle, chunks)
    while not results.ready():
        print("One Sec")
    return results.get()

if __name__ == "__main__":
    fast = unicodeMP(datai)

当我运行它进行多处理时,即使我的CPU说它正在以更高的利用率运行,它也需要花费与常规时间相同的时间。另外,代码将错误作为正常错误而不是我完整的干净数据框返回。我在这里想念什么?

如何对DataFrames上的函数使用多重处理?

最佳答案

您可以尝试dask对数据框进行多处理

import dask.dataframe as dd

partitions = 7 # cpu_cores - 1
ddf = dd.from_pandas(df, npartitions=partitions)
ddf.map_partitions(lambda df: df.apply(unicodeHandle).compute(scheduler='processes')

您可以阅读有关dask here的更多信息

关于python - 适用于DataFrame操作/功能的Python多重处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59584238/

相关文章:

python - Pandas 合并循环内生成的数据帧

python-3.x - 如何标准化滚动 Pandas 数据框的子集?

c++ - 从另一个类启动 QTimer

java - 在输入流中等待时执行某些操作

java - Android从长时间运行的线程接收消息

python - 使用python pandas从sql表中读取postgres数组

python - django channel 配置不当 : Cannot find 'app' in ASGI_APPLICATION module

python - 在组的最后一个元素之后添加行

python - 小时、日期、天数计算

python - 如何删除数据集中的重复值 : python