我有一个代码,它从 gstorage 下载文件,将它们转储到 json,然后将该 json 转为 csv,然后转为 parquet,最后上传到 aws s3(不要问为什么我不是写它的人)。
我从我的日志中发现,有时运行结束时所有子进程都没有完成。
有人知道为什么会发生这种情况吗?如果不是你认为可能会切换ProcessPoolExecutor
与普通 multiprocessing
有帮助?
在我的主要开始这整个事情我使用:
with ProcessPoolExecutor(max_workers=NUM_OF_PROCESS_WORKERS) as process_executor:
for table_type in TABLES_COLUMNS_MAPPER.keys():
for node in nodes:
process_executor.submit(handle_sstable_group_files_per_node, node, table_type)
如果有帮助,我正在使用 ubuntu谢谢。
最佳答案
因此,继续评论,您仍然不需要等待结果,并且因为您使用了上下文管理器,因此将“吞下”异常:
测试.py:
from concurrent.futures import ProcessPoolExecutor
def worker(i):
if i == 3:
raise Exception(f"ERROR: {i}")
print(f"TASK: {i}")
return i * i
def main():
futures = []
with ProcessPoolExecutor() as executor:
for i in range(10):
futures.append(executor.submit(worker, i))
# for future in futures:
# print(future.result())
if __name__ == "__main__":
main()
测试:$ python test.py
TASK: 0
TASK: 1
TASK: 2
TASK: 4
TASK: 6
TASK: 8
TASK: 7
TASK: 9
现在,当您取消注释这两行时:for future in futures:
print(future.result())
您现在可以看到错误(假设您没有处理工作函数中的错误):$ python test.py
TASK: 0
TASK: 1
TASK: 2
TASK: 4
0
1
4
TASK: 8
TASK: 6
TASK: 7
TASK: 9
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "test.py", line 8, in worker
raise Exception(f"ERROR: {i}")
Exception: ERROR: 3
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "f.py", line 30, in <module>
main()
File "test.py", line 25, in main
print(future.result())
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
raise self._exception
Exception: ERROR: 3
关于python - processpoolexecutor 子进程突然停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67250675/