python - processpoolexecutor 子进程突然停止

标签 python multiprocessing

我有一个代码,它从 gstorage 下载文件,将它们转储到 json,然后将该 json 转为 csv,然后转为 parquet,最后上传到 aws s3(不要问为什么我不是写它的人)。
我从我的日志中发现,有时运行结束时所有子进程都没有完成。
有人知道为什么会发生这种情况吗?如果不是你认为可能会切换ProcessPoolExecutor与普通 multiprocessing有帮助?
在我的主要开始这整个事情我使用:

with ProcessPoolExecutor(max_workers=NUM_OF_PROCESS_WORKERS) as process_executor:
    for table_type in TABLES_COLUMNS_MAPPER.keys():
        for node in nodes:
            process_executor.submit(handle_sstable_group_files_per_node, node, table_type)
如果有帮助,我正在使用 ubuntu
谢谢。

最佳答案

因此,继续评论,您仍然不需要等待结果,并且因为您使用了上下文管理器,因此将“吞下”异常:
测试.py:

from concurrent.futures import ProcessPoolExecutor


def worker(i):

    if i == 3:
        raise Exception(f"ERROR: {i}")

    print(f"TASK: {i}")

    return i * i


def main():
    futures = []

    with ProcessPoolExecutor() as executor:
        for i in range(10):
            futures.append(executor.submit(worker, i))

        # for future in futures:
        #     print(future.result())


if __name__ == "__main__":
    main()
测试:
$ python test.py
TASK: 0
TASK: 1
TASK: 2
TASK: 4
TASK: 6
TASK: 8
TASK: 7
TASK: 9
现在,当您取消注释这两行时:
for future in futures:
    print(future.result())
您现在可以看到错误(假设您没有处理工作函数中的错误):
$ python test.py
TASK: 0
TASK: 1
TASK: 2
TASK: 4
0
1
4
TASK: 8
TASK: 6
TASK: 7
TASK: 9
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "test.py", line 8, in worker
    raise Exception(f"ERROR: {i}")
Exception: ERROR: 3
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "f.py", line 30, in <module>
    main()
  File "test.py", line 25, in main
    print(future.result())
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
Exception: ERROR: 3

关于python - processpoolexecutor 子进程突然停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67250675/

相关文章:

python - 为什么 `print` 在 Python 多处理 pool.map 中不起作用

python - 为什么 TF Keras 推理比 Numpy 运算慢得多?

python - Openpyxl:如何向所有列添加过滤器

python - 如何使用 python 多处理模块重新启动进程

c++ - 从 MATLAB 运行多进程应用程序

python - 如何使用多处理从输出队列中获取 "batch write"?

python - 使用opecv2和Python显示图像时滞后

python - 使用 scons 将参数从一个 SConstruct 传递到另一个 SConstruct

Python pandas - 根据集体 NaN 计数删除组

c - 对称多处理系统的输出