python - 为什么在使用 Python 多处理池时会有空闲的工作线程?

标签 python parallel-processing python-multiprocessing

我正在将一个非常大的文本文件分成较小的 block ,并对这些 block 执行进一步的处理。对于此示例,让 text_chunks 为列表列表,每个列表包含一段文本。 text_chunks 元素的长度范围为 ~50 到 ~15000。 ProcessedText 类存在于代码中的其他位置,并根据输入的文本进行大量后续处理和数据分类。使用如下代码并行地将不同的文本 block 处理为 ProcessedText 实例:

def do_things_to_text(a, b):
    #pull out necessary things for ProcessedText initialization and return an instance
    print('Processing {0}'.format(a))
    return ProcessedText(a, b)

import multiprocessing as mp

#prepare inputs for starmap, pairing with list index so order can be reimposed later
pool_inputs = list(enumerate(text_chunks))

#parallel processing
pool = mp.Pool(processes=8)
results = pool.starmap_async(do_things_to_text, pool_inputs)
output = results.get()

代码执行成功,但在代码运行时,作为一部分创建的一些工作进程似乎随机闲置。当代码执行时,我在 top 中跟踪内存使用情况、CPU 使用情况和状态。

一开始,所有 8 个工作进程都处于工作状态(top 中的状态“R”且 CPU 使用率非零),在完成来自 text_chunks 的大约 20 个条目后,工作进程流程开始发生巨大变化。有时,只有 1 个工作进程正在运行,其他进程处于状态“S”,CPU 使用率为零。我还可以从打印的输出语句中看到 do_things_to_text() 的调用频率较低。到目前为止,我还无法确定进程开始空闲的原因。还有大量条目需要处理,因此它们闲置会导致时间效率低下。

我的问题是:

  1. 为什么这些工作进程处于空闲状态?
  2. 有没有更好的方法来实现多处理来防止这种情况发生?

编辑添加: 我进一步描述了这个问题。从我在 do_things_to_text() 中打印的索引可以清楚地看出,多处理正在将作业总数划分为每十个索引处的线程。所以我的控制台输出显示作业 0、10、20、30、40、50、60、70 同时提交(8 个进程)。并且某些作业比其他作业完成得更快,因此您可能会在看到作业 1 完成之前看到作业 22 已完成。

在第一批线程完成之前,所有进程都处于事件状态,没有空闲的进程。然而,当该批处理完成并且作业 80 启动时,只有一个进程处于事件状态,其他 7 个进程处于空闲状态。我还没有确认,但我相信在 80 系列完成之前都会这样。

最佳答案

以下是一些提高内存利用率的建议:

我不知道 text_chunks 是如何创建的,但最终你会在 pool_inputs 中得到 8GB 大小的字符串。理想情况下,您应该有一个生成器函数,例如 make_text_chunks,它生成以前组成 text_chunks 可迭代的各个“文本 block ”(如果 text_chunks > 已经是一个这样的生成器表达式,那么你就准备好了)。我们的想法是一次创建所有 8GB 的​​数据,而是仅在需要数据时创建。使用此策略,您不能再使用 Pool 方法 starmap_asynch;我们将使用Pool.imap 。与 startmap_asynch 不同,此方法将以 chunksize block 的形式迭代提交作业,并且您可以在结果可用时对其进行处理(尽管这似乎不是问题)。

def make_text_chunks():
    # logic goes here to generate the next chunk
    yield text_chunk


def do_things_to_text(t):
    # t is now a tuple:
    a, b = t
    #pull out necessary things for ProcessedText initialization and return an instance
    print('Processing {0}'.format(a))
    return ProcessedText(a, b)


import multiprocessing as mp

# do not turn into a list!
pool_inputs = enumerate(make_text_chunks())

def compute_chunksize(n_jobs, poolsize):
    """
    function to compute chunksize as is done by Pool module
    """
    if n_jobs == 0:
        return 0
    chunksize, remainder = divmod(n_jobs, poolsize * 4)
    if remainder:
        chunksize += 1
    return chunksize

#parallel processing
# number of jobs approximately
# don't know exactly without turning pool_inputs into a list, which would be self-defeating
N_JOBS = 300
POOLSIZE = 8
CHUNKSIZE = compute_chunksize(N_JOBS, POOLSIZE)
with mp.Pool(processes=POOLSIZE) as pool:
    output = [result for result in pool.imap(do_things_to_text, pool_inputs, CHUNKSIZE)]

关于python - 为什么在使用 Python 多处理池时会有空闲的工作线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65926296/

相关文章:

Python 将 ddmmyy 格式转换为有效日期

cpu - OpenCL 适合什么类型的代码域?

Python - 队列、管理器和多处理的奇怪行为

python - 如何并行化一个简单的 Python 循环?

python - 为什么我的所有进程不是同时启动?

python-multiprocessing - Python 多处理,记录到不同的文件

python - 'function'对象在django中没有属性 'get'

python - 如何在表单 View 中捕获查询字符串参数?

python - 在 Pandas 数据框中将第二行移到上方一行

perl - 如何并行运行 Test::Perl::Critic?