编辑: 我对视频流是什么有疑问,所以我会提供更清晰的信息。该流是来 self 的网络摄像头的实时视频源,可通过 OpenCV 访问。我在相机读取时获取每一帧,并将其发送到单独的进程进行处理。该过程根据对图像所做的计算返回文本。然后文本显示在图像上。我需要实时显示流,如果文本和正在显示的视频之间存在滞后也没关系(即,如果文本适用于前一帧,那没关系)。
也许更容易想到这一点的方法是我正在对网络摄像头看到的内容进行图像识别。我一次将一帧发送到一个单独的进程以对该帧进行识别分析,然后将文本发回以作为实时提要的标题。显然,处理比简单地从网络摄像头抓取帧并显示它们要花费更多的时间,因此如果标题和网络摄像头显示的内容有延迟,这是可以接受和预期的。
现在的情况是,我正在显示的实时视频由于其他进程而滞后(当我不向进程发送帧进行计算时,没有滞后)。我还确保一次只排队一帧,以避免队列过载并导致延迟。我更新了下面的代码以反射(reflect)此详细信息。
我正在使用 python 中的多处理模块来帮助加快我的主程序。但是我相信我可能做错了什么,因为我不认为计算是并行发生的。
我希望我的程序在主进程中从视频流中读取图像,并将帧传递给两个子进程,这两个子进程对它们进行计算,并将文本(包含计算结果)发送回主进程.
但是,当我使用多处理时,主进程似乎滞后,运行速度大约是不使用多处理时的一半,这让我相信这些进程并没有完全并行运行。
经过一些研究,我推测延迟可能是由于使用队列在进程之间进行通信(将图像从主进程传递到子进程,并将文本从子进程传回主进程)。
但是我注释掉了计算步骤,只让主进程传递图像,子进程返回空白文本,在这种情况下,主进程根本没有变慢。它全速运行。
因此我认为要么
1) 我没有以最佳方式使用多处理
或
2) 这些进程不能真正并行运行(我会理解有点滞后,但它会使主进程减半)。
这是我的代码大纲。只有一个消费者而不是两个,但两个消费者几乎相同。如果有人可以提供指导,我将不胜感激。
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
while True:
image = self.task_queue.get()
#Do computations on image
self.result_queue.put("text")
return
import cv2
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
if tasks.empty():
tasks.put(image)
else:
text = tasks.get()
#Add text to frame
cv2.putText(frame,text)
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
最佳答案
I want my program to read in images from a video stream in the main process
在上面的生产者/消费者实现中,生产者将任务放入队列以供消费者执行,需要与主/控制进程分开,以便它可以并行添加任务主进程从结果队列读取输出。
尝试以下操作。在消费者进程中添加了一个 sleep 以模拟处理并添加了第二个消费者以显示它们正在并行运行。
限制任务队列的大小也是一个好主意,以避免在处理跟不上输入流时它因内存使用而耗尽。调用时可以指定尺寸Queue(<size>)
.如果队列达到该大小,则调用 .put
将阻塞直到队列未满。
import time
import multiprocessing
import cv2
class ImageProcessor(multiprocessing.Process):
def __init__(self, tasks_q, results_q):
multiprocessing.Process.__init__(self)
self.tasks_q = tasks_q
self.results_q = results_q
def run(self):
while True:
image = self.tasks_q.get()
# Do computations on image
time.sleep(1)
# Display the result on stream
self.results_q.put("text")
# Tasks queue with size 1 - only want one image queued
# for processing.
# Queue size should therefore match number of processes
tasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()
processor = ImageProcessor(tasks_q, results_q)
processor.start()
def capture_display_video(vc):
rval, frame = vc.read()
while rval:
image = frame.get_image()
if not tasks_q.full():
tasks_q.put(image)
if not results_q.empty():
text = results_q.get()
cv2.putText(frame, text)
cv2.imshow("preview", frame)
rval, frame = vc.read()
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
if not vc.isOpened():
raise Exception("Cannot capture video")
capture_display_video(vc)
processor.terminate()
关于python - 如何在 Python 中优化多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38864711/