我正在迭代规则列表(每个规则都是一个大 bool 表达式)。
我打算使用 Pyeda 库来解决这个表达式。步骤是1.解析规则,2.转换为BDD形式,3.求解规则。步骤对于我面临的问题并不重要,并且包含在函数 do_big_job
下,该函数需要 rule
来解决和 global Queue (q)
借用自“multiprocessing.Manager”,而不是通用队列。
我必须使处理时间过长(“time_in”秒)的规则超时。 do_threading
函数采用全局 q(队列)
、要在子进程中运行的函数 (do_big_job
) 和一个参数(规则)传递给 do_big_job
和 timeout_in
来控制子进程的执行。
令我惊奇的是,我观察到,当超时并且子进程因运行时间过长而被终止时,结果会乱序,即队列中返回的值与规则不匹配通过并属于其他一些较早的规则。
我在这里做错了什么? 还有其他方法可以完成我想做的事情吗?
另外,我还有一个问题,当我以线性方式执行此操作而不使用多重处理时,处理每个规则所花费的时间比在单独进程中处理每个规则所花费的时间要多得多。对此有何解释?
def do_threading(q,function,argument, timeout_in=1):
# Start function as a process
p = Process(target=function, args=(argument,q,))
p.start()
p.join(.1)
if p.is_alive():
# Wait for 'timeout_in' seconds or until process finishes
p.join(timeout_in)
# If thread is still active
if p.is_alive():
print("running... let's kill it...")
# Terminate
p.terminate()
p.join()
return False
return True
def do_big_job(rule, q):
# Do something with passed argument
print("Child: ", rule)
# heavy computation using Pyeda library
f = expr2bdd(expr(rule))
count = f.satisfy_count()
solution=[]
for i in f.satisfy_all():
solution.append(i)
# Putting result in the queue for exchange
q.put([solution,count])
def main()
manager = multiprocessing.Manager()
q = manager.Queue() # Initializing Queue for data exchange between processes
solved_parts={}
timed_out_parts={}
for rule in rules: # Iterating over rules and creating process for each rule
each_rule={}
#Creating new processes to carry out heavy computation and passing the Queue 'q' for data exchange
processed = do_threading( q, do_big_job, rule, timeout_in=1)
if processed:
r = q.get() # Getting result from the queue
each_rule["solution"] = r[0]
each_rule["solution_count"] = r[1]
each_rule["count_unique_var"]=count_unique_variables(rule)
else:
each_rule["solution"] = "None"
each_rule["solution_count"] = "None"
each_rule["count_unique_var"]=count_unique_variables(rule)
# Putting results in 2 types of lists
if each_rule["solution"]=="None":
timed_out_parts[part_num]=each_rule.copy()
else:
solved_parts[part_num]=each_rule.copy()
main()
最佳答案
出于各种原因,我必须对您的代码进行大量更改。
有些名称未定义为 part_num
。
我省略了使用实际的 Pyeda 库。多处理的解决方案是通用的,工作进程中实际发生的事情与处理进程之间的数据流无关。
我也没有尝试猜测 expr
从哪里导入。
因此,有些函数被模拟,但它们与理解并行计算无关。
模拟会相应地被注释,虚拟输入数据也是如此。
您的代码的主要问题是您希望在一个循环中启动工作程序并收集结果。每当您使用线程或多处理时,请忘记它,因为从工作人员返回的数据顺序基本上是未定义的。因此,工作人员有责任提供有关其正在处理的规则以及结果的明确信息。
还有一个很大的区别是,我实际上在一开始就启动了所有工作线程,这使得计算实际上是并行的。然后我正在收集传入的结果。每当队列为空时,我都会检查是否所有工作人员都已返回退出代码,这是一个明确的信息,表明不会再发生任何有趣的事情。
主进程不对工作线程超时负责。 Worker 在 SIGALRM
超时后自行终止。我这样做是因为主进程没有关于工作进程何时进入 Python 代码入口点的可靠信息。
最后一件事是,我根据 solved_parts
中缺失的结果填充 timed_out_parts
。
from multiprocessing import Process, Manager
from multiprocessing.queues import Empty as QueueEmpty
from signal import alarm
# Following imports are only needed to mock some function
from time import sleep
from collections import namedtuple
import random
# Mock for `expr()`
def expr(rule):
return rule
# Mock for `expr2bdd()` - sleeps randomly simulating heavy computation
def expr2bdd(expression):
sleep(random.randint(0, 9))
satisfied = [n for n in xrange(random.randint(0, 5))]
def satisfy_count():
return len(satisfied)
def satisfy_all():
return satisfied
Evaluation = namedtuple('Evaluation', ('satisfy_count', 'satisfy_all'))
return Evaluation(satisfy_count=satisfy_count, satisfy_all=satisfy_all)
# Mock for `count_unique_variables()`
def count_unique_variables(arg):
return random.randint(0, 9)
# This function is executed in separate process - does the actual computation
def evaluate_rule(queue, part_num, rule, timeout):
alarm(timeout)
print 'Part: {}, Rule: {}'.format(part_num, rule)
evaluation = expr2bdd(expr(rule))
count = evaluation.satisfy_count()
solution=[]
for i in evaluation.satisfy_all():
solution.append(i)
queue.put([part_num, solution, count])
# Main function which starts workers and collects results
def evaluate_rules(rules, timeout=5):
manager = Manager()
queue = manager.Queue()
solved_parts = {}
processes = []
for part_num, rule in enumerate(rules):
process = Process(target=evaluate_rule, args=(queue, part_num, rule, timeout))
process.start()
processes.append(process)
while True:
try:
result = queue.get_nowait()
except QueueEmpty:
if all((process.exitcode is not None for process in processes)):
break
solved_parts[result[0]] = {
'solution': result[1],
'solution_count': result[2],
'count_unique_var': count_unique_variables(rule)
}
timed_out_parts = {
part_num: {
'solution': None,
'solution_count': None,
'count_unique_var': count_unique_variables(rule)
}
for part_num, rule in enumerate(rules) if part_num not in solved_parts
}
return solved_parts, timed_out_parts
# Initialize `random generator` - only for mocks
random.seed()
# Dummy rules
rules = [i for i in xrange(50)]
# Fun starts here
solved_parts, timed_out_parts = evaluate_rules(rules)
# You definitely want to do something more clever with the results than just printing them
print solved_parts
print timed_out_parts
关于你的第二个问题:没有黄金答案。线性和并行处理时间的差异取决于工作人员实际执行的操作。
关于Python - 子进程被终止后,多处理队列未按正确顺序返回结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30979211/