Python Multiprocessing 帮助按条件退出

标签 python multiprocessing python-multithreading multiprocess

我在 Python 中的多处理上咬牙切齿,但我没有任何运气围绕这个主题。基本上我有一个运行起来很耗时的程序。我需要在 1 到 100 的范围内运行它,但我想在满足我正在寻找的条件后中止所有进程。条件是返回值 == 90。

这是一个非多进程代码块。谁能给我一个例子,说明他们如何将其转换为多进程函数,一旦满足“90”的条件,代码将退出所有进程?

def Addsomething(i):
    SumOfSomething = i + 1    
    return SumOfSomething

def RunMyProcess():
    for i in range(100):
        Something = Addsomething(i)
        print Something
    return

if __name__ == "__main__":
    RunMyProcess()

编辑:

我在测试第三个版本时遇到了这个错误。知道是什么原因造成的吗?

Exception in thread Thread-3:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 554, in __bootstrap_inner
    self.run()
  File "C:\Python27\lib\threading.py", line 507, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\lib\multiprocessing\pool.py", line 379, in _handle_results
    cache[job]._set(i, obj)
  File "C:\Python27\lib\multiprocessing\pool.py", line 527, in _set
    self._callback(self._value)
  File "N:\PV\_Proposals\2013\ESS - Clear Sky\01-CODE\MultiTest3.py", line 20, in check_result
    pool.terminate()
  File "C:\Python27\lib\multiprocessing\pool.py", line 423, in terminate
    self._terminate()
  File "C:\Python27\lib\multiprocessing\util.py", line 200, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "C:\Python27\lib\multiprocessing\pool.py", line 476, in _terminate_pool
    result_handler.join(1e100)
  File "C:\Python27\lib\threading.py", line 657, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

最佳答案

也许您正在寻找这样的东西?请记住,我是为 Python 3 编写的。您上面的打印语句是 Python 2,在这种情况下,旁注是使用 xrange 而不是 range。

from argparse import ArgumentParser
from random import random
from subprocess import Popen
from sys import exit
from time import sleep

def add_something(i):

    # Sleep to simulate the long calculation
    sleep(random() * 30)
    return i + 1

def run_my_process():

    # Start up all of the processes, pass i as command line argument
    # since you have your function in the same file, we'll have to handle that
    # inside 'main' below
    processes = []
    for i in range(100):
        processes.append(Popen(['python', 'thisfile.py', str(i)]))

    # Wait for your desired process result
    # Might want to add a short sleep to the loop
    done = False
    while not done:
       for proc in processes:
            returncode = proc.poll()
            if returncode == 90:
                done = True
                break

    # Kill any process that are still running
    for proc in processes:

        if proc.returncode is None:

            # Might run into a race condition here,
            # so might want to wrap with try block
            proc.kill()

if __name__ == '__main__':

    # Look for optional i argument here
    parser = ArgumentParser()
    parser.add_argument('i', type=int, nargs='?')
    i = parser.parse_args().i

    # If there isn't an i, then run the whole thing
    if i is None:
        run_my_process()

    else:
        # Otherwise, run your expensive calculation and return the result
        returncode = add_something(i)
        print(returncode)
        exit(returncode)

编辑:

这是一个更简洁的版本,它使用多处理模块而不是子进程:

from random import random
from multiprocessing import Process
from sys import exit
from time import sleep

def add_something(i):

    # Sleep to simulate the long calculation
    sleep(random() * 30)

    exitcode = i + 1
    print(exitcode)
    exit(exitcode)

def run_my_process():

    # Start up all of the processes
    processes = []
    for i in range(100):
        proc = Process(target=add_something, args=[i])
        processes.append(proc)
        proc.start()

    # Wait for the desired process result
    done = False
    while not done:
        for proc in processes:
            if proc.exitcode == 90:
                done = True
                break

    # Kill any processes that are still running
    for proc in processes:
        if proc.is_alive():
            proc.terminate()

if __name__ == '__main__':
    run_my_process()

编辑 2:

这是最后一个版本,我认为它比其他两个版本要好得多:

from random import random
from multiprocessing import Pool
from time import sleep

def add_something(i):

    # Sleep to simulate the long calculation
    sleep(random() * 30)
    return i + 1

def run_my_process():

    # Create a process pool
    pool = Pool(100)

    # Callback function that checks results and kills the pool
    def check_result(result):
        print(result)
        if result == 90:
            pool.terminate()

    # Start up all of the processes
    for i in range(100):
        pool.apply_async(add_something, args=[i], callback=check_result)

    pool.close()
    pool.join()

if __name__ == '__main__':
    run_my_process()

关于Python Multiprocessing 帮助按条件退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21490137/

相关文章:

python - 通过 twython 发布转发会给出 401,而我可以轻松访问时间线

python - 使用多处理时克服内存限制

python - 模块属性更新不会传播到 Windows 上的子进程

python - 我如何使用 python 为台式电脑构建自定义搜索引擎

coding-style - 如何在一行代码中复制和修改字典

python - 跨多个进程训练模型时,在 PyTorch 中使用 tensor.share_memory_() 与 multiprocessing.Queue

python - 多线程工具

python - 将字符串添加到线程名称末尾

Python Grequests xml 响应

使用 FFMpegwriter 的 Python Matplotlib basemap 动画在 820 帧后停止?