python - OpenCV 视频写入会大幅降低 FPS。如何优化性能?

标签 python opencv deep-learning computer-vision object-detection

我正在从事一个涉及对象检测 + 排序跟踪的项目。
我有脚本可以在 Coral 开发板上使用 OpenCV 处理视频和相机。

主要问题是何时使用 VideoWriter 来保存检测的输出。

对于相机脚本,它的使用将 fps 速率从 11 降低到 2.3,将视频脚本从 6-7 降低到 2。

有没有办法解决/优化这个问题。

这是我抓取帧、检测和跟踪然后写入的代码部分。

# Read frames
while(video.isOpened()):

    # Acquire frame and resize to expected shape [1xHxWx3]
    ret, frame = video.read()


    if not ret:
        break

    # Debug info
    frame_count += 1
    print("[INFO] Processing frame: {}".format(frame_count))

    if FLIP:
        frame = cv2.flip(frame, 1)

    if ROTATE != 0:
        frame = cv2.rotate(frame, ROTATE) # Rotate image on given angle


    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert to RGB
    frame = cv2.resize(frame, (VIDEO_WIDTH, VIDEO_HEIGHT)) # resize frame to output dims
    frame_resized = cv2.resize(frame_rgb, (width, height)) # resize to fit tf model dims
    input_data = np.expand_dims(frame_resized, axis=0)

    # Normalize pixel values if using a floating model (i.e. if model is non-quantized)
    if floating_model:
        input_data = (np.float32(input_data) - input_mean) / input_std

    # Initialize writer
    if (writer is None) and (SAVE_VIDEO) :  
        writer = cv2.VideoWriter(VIDEO_OUTPUT, cv2.VideoWriter_fourcc(*'XVID'), args.fps, (VIDEO_WIDTH, VIDEO_HEIGHT))

    # Perform the actual detection by running the model with the image as input
    #s_detection_time = time.time()
    interpreter.set_tensor(input_details[0]['index'],input_data)
    interpreter.invoke()
    #e_detection_time = time.time()

    #print("[INFO] Detection time took: {} seconds".format(e_detection_time-s_detection_time))

    # Retrieve detection results
    boxes = interpreter.get_tensor(output_details[0]['index'])[0] # Bounding box coordinates of detected objects
    classes = interpreter.get_tensor(output_details[1]['index'])[0] # Class index of detected objects
    scores = interpreter.get_tensor(output_details[2]['index'])[0] # Confidence of detected objects
    #num = interpreter.get_tensor(output_details[3]['index'])[0]  # Total number of detected objects (inaccurate and not needed)

    #print("[INFO] Boxes: {}".format(boxes))

    detections = np.array([[]])

    #s_detections_loop = time.time()
    # Loop over all detections and draw detection box if confidence is above minimum threshold
    for i in range(len(scores)):
        if ((scores[i] > min_conf_threshold) and (scores[i] <= 1.0)):

            #print("[INFO] Box ", i , ": ", boxes[i])

            # Get bounding box coordinates and draw box
            # Interpreter can return coordinates that are outside of image dimensions, need to force them to be within image using max() and min()
            ymin = int(max(1,(boxes[i][0] * VIDEO_HEIGHT)))  
            xmin = int(max(1,(boxes[i][1] * VIDEO_WIDTH)))
            ymax = int(min(VIDEO_HEIGHT,(boxes[i][2] * VIDEO_HEIGHT)))
            xmax = int(min(VIDEO_WIDTH,(boxes[i][3] * VIDEO_WIDTH)))

            # Calculate centroid of bounding box
            #centroid_x = int((xmin + xmax) / 2)
            #centroid_y = int((ymin + ymax) / 2)



            # Format detection for sort and append to current detections
            detection = np.array([[xmin, ymin, xmax, ymax]])

            #f.write("Box {}: {}\n".format(i, detection[:4]))
            #print("[INFO] Size of detections: ", detections.size)

            if detections.size == 0: 
                detections = detection
            else:
                detections = np.append(detections, detection, axis=0)

            # Draw a circle indicating centroid
            #print("[INFO] Centroid of box ", i, ": ", (centroid_x, centroid_y))
            #cv2.circle(frame, (centroid_x, centroid_y), 6, (0, 0, 204), -1)

            # Calculate area of rectangle
            #obj_height = (ymin + ymax)
            #print("[INFO] Object height: ", obj_height)

            # Check if centroid passes ROI
            # Draw the bounding box
            #cv2.rectangle(frame, (xmin,ymin), (xmax,ymax), (0, 0, 255), 4)
            #print("[INFO] Object passing ROI")
            #print("[INFO] Object height: ", obj_height)
            #counter += 1
            #print("[INFO] Object out of ROI")
            # Draw the bounding box
            #cv2.rectangle(frame, (xmin,ymin), (xmax,ymax), (10, 255, 0), 4)

            #print("[INFO] Total objects counted: ", counter)


            # Draw label
            """object_name = labels[int(classes[i])] # Look up object name from "labels" array using class index
            label = '%s: %d%%' % (object_name, int(scores[i]*100)) # Example: 'person: 72%'
            labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) # Get font size
            label_ymin = max(ymin, labelSize[1] + 10) # Make sure not to draw label too close to top of window
            cv2.rectangle(frame, (xmin, label_ymin-labelSize[1]-10), (xmin+labelSize[0], label_ymin+baseLine-10), (255, 255, 255), cv2.FILLED) # Draw white box to put label text in
            cv2.putText(frame, label, (xmin, label_ymin-7), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2) # Draw label text
            """
    #f.write("\n")
    #e_detection_loop = time.time()

    #print("[INFO] Detection loop time took {} seconds".format(e_detection_loop-s_detections_loop))

    #s_tracker_update = time.time()
    # Update sort tracker
    print("[INFO] Current Detections: ", detections.astype(int))
    objects_tracked = tracker.update(detections.astype(int))
    #e_tracker_update = time.time()

    #print("[INFO] Updating trackers state took {} seconds".format(e_tracker_update-s_tracker_update))

    #s_draw_tracked = time.time()
    # Process every tracked object
    for object_tracked in objects_tracked:
        if object_tracked.active:
            bbox_color = (0, 128, 255)
        else:
            bbox_color = (10, 255, 0)

        bbox = object_tracked.get_state().astype(int)

        # Draw the bbox rectangle
        cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), bbox_color, 4)

        # Calculate centroid of bounding box
        centroid = (object_tracked.last_centroid[0], object_tracked.last_centroid[1])    

        # Draw the centroid
        cv2.circle(frame, centroid, 6, (0, 0, 204), -1)

        label = '{} [{}]'.format(OBJECT_NAME,object_tracked.id) # Example: 'object [1]'
        labelSize, baseLine = cv2.getTextSize(label, FONT, 0.7, 2) # Get font size
        label_ymin = max(bbox[1], labelSize[1] + 10) # Make sure not to draw label too close to top of window
        cv2.rectangle(frame, (bbox[0], label_ymin-labelSize[1]-10), (bbox[0]+labelSize[0], label_ymin+baseLine-10), (255, 255, 255), cv2.FILLED) # Draw white box to put label text in
        cv2.putText(frame, label, (bbox[0], label_ymin-7), FONT, 0.7, (0, 0, 0), 2) # Draw label text
    #e_draw_tracked = time.time()

    #print("[INFO] Drawing tracked objects took {} seconds".format(e_draw_tracked-s_draw_tracked))


    # Update fps count
    fps.update()
    fps.stop()

    # Prepare fps display
    fps_label = "FPS: {0:.2f}".format(fps.fps())
    cv2.rectangle(frame, (0, 0), (int(VIDEO_WIDTH*0.6), int(VIDEO_HEIGHT*0.07)), (255, 255, 255), cv2.FILLED)
    cv2.putText(frame, fps_label, (int(VIDEO_WIDTH*0.01), int(VIDEO_HEIGHT*0.05)), FONT, 1.5, (10, 255, 0), 3)

    # Prepare total and active objects count display
    total_objects_text = "TOTAL {}S: {}".format(OBJECT_NAME,tracker.total_trackers)
    active_objects_text = "ACTIVE {}S: {}".format(OBJECT_NAME,tracker.active_trackers)
    cv2.putText(frame, total_objects_text, (int(VIDEO_WIDTH*0.1+VIDEO_WIDTH*0.06), int(VIDEO_HEIGHT*0.05)), FONT, 1.5, (0, 0, 255), 3) # Draw label text
    cv2.putText(frame, active_objects_text, (int(VIDEO_WIDTH*0.1+VIDEO_WIDTH*0.27), int(VIDEO_HEIGHT*0.05)), FONT, 1.5, (0, 128, 255), 3) # Draw label text

    # Draw horizontal boundaries
    cv2.line(frame, (LEFT_BOUNDARY, int(VIDEO_HEIGHT*0.07)), (LEFT_BOUNDARY, VIDEO_HEIGHT), (0, 255, 255), 4)
    #cv2.line(frame, (RIGHT_BOUNDARY, 0), (RIGHT_BOUNDARY, VIDEO_HEIGHT), (0, 255, 255), 4)

    #s_trackers_state = time.time()
    tracker.update_trackers_state()
    #e_trackers_state = time.time()

    #print("[INFO] Updating trackers state took {} seconds".format(e_trackers_state-s_trackers_state))

    # All the results have been drawn on the frame, so it's time to display it.
    cv2.imshow('Object detector', frame)

    # Center window
    if not IS_CENTERED:
        cv2.moveWindow('Object detector', 0, 0)
        IS_CENTERED = True

    if SAVE_VIDEO:
        writer.write(frame)

    print("\n\n")

    # Press 'q' to quit
    if cv2.waitKey(1) == ord('q'):
        break

在此先感谢您的帮助!

最佳答案

尝试优化/提高代码性能时,重要的是对代码执行进行分类和衡量。只有在确定真正导致瓶颈或性能下降的原因之后,您才能改进这些代码部分。对于这种方法,我假设您在同一线程中读取和保存帧。因此,如果您因 I/O 延迟而面临性能下降,则此方法可以提供帮助,否则如果您发现问题是由于 CPU 处理限制造成的,则此方法不会为您带来性能提升。

话虽如此,该方法是使用线程。这个想法是创建另一个单独的线程来获取帧 cv2.VideoCapture.read()正在阻塞。这可能很昂贵并导致延迟,因为主线程必须等待直到获得帧。通过将此操作放入一个单独的线程中,该线程只专注于在主线程中抓取帧和处理/保存帧,由于减少了 I/O 延迟,它显着提高了性能。这是一个关于如何使用线程在一个线程中读取帧并在主线程中显示/保存帧的简单示例。一定要改capture_src到你的流。

代码

from threading import Thread
import cv2

class VideoWritingThreading(object):
    def __init__(self, src=0):
        # Create a VideoCapture object
        self.capture = cv2.VideoCapture(src)

        # Default resolutions of the frame are obtained (system dependent)
        self.frame_width = int(self.capture.get(3))
        self.frame_height = int(self.capture.get(4))

        # Set up codec and output video settings
        self.codec = cv2.VideoWriter_fourcc('M','J','P','G')
        self.output_video = cv2.VideoWriter('output.avi', self.codec, 30, (self.frame_width, self.frame_height))

        # Start the thread to read frames from the video stream
        self.thread = Thread(target=self.update, args=())
        self.thread.daemon = True
        self.thread.start()

    def update(self):
        # Read the next frame from the stream in a different thread
        while True:
            if self.capture.isOpened():
                (self.status, self.frame) = self.capture.read()

    def show_frame(self):
        # Display frames in main program
        if self.status:
            cv2.imshow('frame', self.frame)

        # Press Q on keyboard to stop recording
        key = cv2.waitKey(1)
        if key == ord('q'):
            self.capture.release()
            self.output_video.release()
            cv2.destroyAllWindows()
            exit(1)

    def save_frame(self):
        # Save obtained frame into video output file
        self.output_video.write(self.frame)

if __name__ == '__main__':
    capture_src = 'your stream link!'
    video_writing = VideoWritingThreading(capture_src)
    while True:
        try:
            video_writing.show_frame()
            video_writing.save_frame()
        except AttributeError:
            pass

关于python - OpenCV 视频写入会大幅降低 FPS。如何优化性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60217483/

相关文章:

python - 扭曲,ProcessProtocol 如何接收没有缓冲的标准输出?

python - 属性错误: '_io.TextIOWrapper' object has no attribute 'nameofColumn' datatable

python - 多 View 特征点的OpenCV Flann匹配

deep-learning - Caffe Checkerboard 文物,如何解决这个问题?

python - 消息 : Element <option> could not be scrolled into view while trying to click on an option within a dropdown menu through Selenium

python - pandas.to_datetime() 自动转换为 <M8[ns] 且无法使用 numpy.isnat()

opencv - 使用立体相机的距离结果不佳

opencv - PCL、OpenCV和flann冲突

tensorflow - 为什么卷积神经网络核大小经常选择方阵

python - 如何查找图像中的多个条形码