太长了;没读过
warnings.catch_warnings()
上下文管理器是 not thread safe .如何在并行处理环境中使用它?
背景
以下代码使用 Python 的 multiprocessing
并行处理解决了最大化问题。模块。它需要一个(不可变的)小部件列表,对它们进行分区(参见 Efficient multiprocessing of massive, brute force maximization in Python 3 ),找到所有分区的最大值(“决赛选手”),然后找到这些“决赛选手”的最大值(“冠军”)。如果我正确理解了自己的代码(如果我理解了我就不会在这里),我将与所有子进程共享内存以向它们提供输入小部件,并且 multiprocessing
使用操作-系统级管道和酸洗,以在工作人员完成后将决赛小部件发送回主进程。
问题的根源
我想捕捉当小部件从进程间管道出来时发生的由小部件重新实例化引起的冗余小部件警告。当小部件对象实例化时,它们会验证自己的数据,从 Python 标准 warnings
模块发出警告,告诉应用程序的用户小部件怀疑用户的输入数据存在问题。因为 unpickling 会导致对象实例化,所以我对代码的理解意味着每个小部件对象只被重新实例化一次,当且仅当它从管道中出来后进入决赛 - 请参阅下一节以了解为什么这是不正确的.
小部件在被 frobnicated 之前已经创建,因此用户已经痛苦地意识到他输入错误并且不想再听到它。这些是我想用 warnings
模块的 catch_warnings()
上下文管理器(即 with
语句)捕获的警告。
失败的解决方案
在我的测试中,当多余的警告被发送到我在下面标记为 Line A 和 Line B 之间的任何地方时,我已经缩小了范围。令我惊讶的是,警告是在 output_queue.get()
附近以外的地方发出的。这对我来说意味着 multiprocessing
使用酸洗将小部件发送给 worker 。
结果是放置由 warnings.catch_warnings()
创建的上下文管理器即使围绕从 Line A 到 Line B 的所有内容,并且在此上下文中设置正确的警告过滤器也不会捕获警告。这对我来说意味着警告正在工作进程中发出。将此上下文管理器放在工作代码周围也不会捕获警告。
代码
此示例省略了用于确定问题规模是否太小而无法 fork 进程、导入多处理以及定义 my_frobnal_counter
和 my_load_balancer
的代码。
"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"
def frobnicate_parallel_worker(widgets, output_queue):
resultant_widget = max(widgets, key=my_frobnal_counter)
output_queue.put(resultant_widget)
def frobnicate_parallel(widgets):
output_queue = multiprocessing.Queue()
# partitions: Generator yielding tuples of sets
partitions = my_load_balancer(widgets)
processes = []
# Line A: Possible start of where the warnings are coming from.
for partition in partitions:
p = multiprocessing.Process(
target=frobnicate_parallel_worker,
args=(partition, output_queue))
processes.append(p)
p.start()
finalists = []
for p in processes:
finalists.append(output_queue.get())
# Avoid deadlocks in Unix by draining queue before joining processes
for p in processes:
p.join()
# Line B: Warnings no longer possible after here.
return max(finalists, key=my_frobnal_counter)
最佳答案
您可以尝试覆盖 Process.run
方法以使用 warnings.catch_warnings
。
>>> from multiprocessing import Process
>>>
>>> def yell(text):
... import warnings
... print 'about to yell %s' % text
... warnings.warn(text)
...
>>> class CustomProcess(Process):
... def run(self, *args, **kwargs):
... import warnings
... with warnings.catch_warnings():
... warnings.simplefilter("ignore")
... return Process.run(self, *args, **kwargs)
...
>>> if __name__ == '__main__':
... quiet = CustomProcess(target=yell, args=('...not!',))
... quiet.start()
... quiet.join()
... noisy = Process(target=yell, args=('AAAAAAaaa!',))
... noisy.start()
... noisy.join()
...
about to yell ...not!
about to yell AAAAAAaaa!
__main__:4: UserWarning: AAAAAAaaa!
>>>
或者你可以使用一些内部结构......(__warningregistry__
)
>>> from multiprocessing import Process
>>> import exceptions
>>> def yell(text):
... import warnings
... print 'about to yell %s' % text
... warnings.warn(text)
... # not filtered
... warnings.warn('complimentary second warning.')
...
>>> WARNING_TEXT = 'AAAAaaaaa!'
>>> WARNING_TYPE = exceptions.UserWarning
>>> WARNING_LINE = 4
>>>
>>> class SelectiveProcess(Process):
... def run(self, *args, **kwargs):
... registry = globals().setdefault('__warningregistry__', {})
... registry[(WARNING_TEXT, WARNING_TYPE, WARNING_LINE)] = True
... return Process.run(self, *args, **kwargs)
...
>>> if __name__ == '__main__':
... p = SelectiveProcess(target=yell, args=(WARNING_TEXT,))
... p.start()
... p.join()
...
about to yell AAAAaaaaa!
__main__:6: UserWarning: complimentary second warning.
>>>
关于 python 3 : Catching warnings during multiprocessing,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12654267/