我尝试在多处理场景中使用 python 的默认日志记录模块。 我读过:
以及其他有关多处理、日志记录、Python 类等的多篇文章。 阅读完所有这些内容后,我看到了这段代码,我无法使其使用 python 的 logutils QueueHandler 正确运行:
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
class Worker(Process):
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
self.qh = QueueHandler(self.queue)
self.root = logging.getLogger()
self.root.addHandler(self.qh)
self.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
def listener_process(queue):
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
import sys, traceback
print >> sys.stderr, 'Whoops! Problem:'
traceback.print_exc(file=sys.stderr)
if __name__ == "__main__":
mpq = mpQueue(-1)
root = logging.getLogger()
h = logging.StreamHandler()
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
listener = Process(target=listener_process,
args=(mpq,))
listener.start()
workers=[]
for i in xrange(1):
worker = Worker(i, mpq)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
mpq.put_nowait(None)
listener.join()
for i in xrange(10):
l.info("testing %i"%i)
print "Finish"
如果执行代码,输出会以某种方式重复以下行:
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO testing 9
Finish
在其他问题中,建议多次添加处理程序,但是,如您所见,我仅在 main 方法中添加一次流处理程序。 我已经测试过将 main 方法嵌入到具有相同结果的类中。
编辑: 正如@max建议的(或者我相信他所说的),我已经将工作类的代码修改为:
class Worker(Process):
root = logging.getLogger()
qh = None
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
if not self.qh:
Worker.qh = QueueHandler(self.queue)
Worker.root.addHandler(self.qh)
Worker.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
print self.root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
结果相同,现在队列处理程序不再一次又一次添加,但仍然存在重复的日志条目,即使只有一个工作人员。
编辑2: 我稍微改变了代码。我更改了监听器进程,现在使用 QueueListener(无论如何,这就是我一开始的意图),将主要代码移动到一个类中。
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
root = logging.getLogger()
added_qh = False
class Worker(Process):
def __init__(self, logconf, n, qh):
super(Worker, self).__init__()
self.n = n
self.logconf = logconf
# global root
global added_qh
if not added_qh:
added_qh = True
root.addHandler(qh)
root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
#print root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
class Main(object):
def __init__(self):
pass
def start(self):
mpq = mpQueue(-1)
qh = QueueHandler(mpq)
h = logging.StreamHandler()
ql = QueueListener(mpq, h)
#h.setFormatter(f)
root.addHandler(qh)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
workers=[]
for i in xrange(15):
worker = Worker(logconf, i, qh)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
print "joining worker: {}".format(worker)
worker.join()
mpq.put_nowait(None)
ql.start()
# listener.join()
for i in xrange(10):
l.info("testing %i"%i)
if __name__ == "__main__":
x = Main()
x.start()
time.sleep(10)
print "Finish"
现在,它大部分都可以工作,直到我达到一定数量的工作人员(~15),此时由于某种原因,主类在 de join 中被阻止,而其余的工作人员什么都不做。
最佳答案
我来晚了,所以你可能不再需要答案了。问题来自于这样一个事实:您已经在主进程中设置了一个处理程序,并且在您的工作进程中添加了另一个处理程序。这意味着在您的工作进程中,实际上有两个处理程序正在管理您的数据,一个将日志推送到队列,另一个将其写入流。
您只需在代码中添加额外的一行 self.root.handlers = []
即可解决此问题。从您的原始代码来看,worker 的 __init__ 方法如下所示:
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
self.qh = QueueHandler(self.queue)
self.root = logging.getLogger()
self.root.handlers = []
self.root.addHandler(self.qh)
self.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
现在的输出如下所示:
python workers.py
2016-05-12 10:07:02,971 Worker-2 W0 INFO Worker 0 Starting
2016-05-12 10:07:02,972 Worker-2 W0 INFO testing 0
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 1
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 2
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 3
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 4
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 5
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 6
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 7
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 8
2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 9
2016-05-12 10:07:02,973 Worker-2 W0 INFO Completed 0
Finish
关于python - 疯狂的多处理日志记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20332359/