图像处理中的多线程——视频python opencv

问题描述

我正在使用 opencv python 从实时流视频中进行对象检测。我的程序在单个线程上运行,因为在屏幕上显示的结果视频甚至看起来都不像视频,因为检测过程存在延迟。因此,我尝试使用多个线程重新实现它。我使用一个线程来读取帧,另一个线程用于显示检测结果和大约 5 个线程一次在多个帧上运行检测算法。我已经编写了以下代码,但结果与单线程程序没有区别。我是 Python 新手。因此,任何帮助表示赞赏。

import threading,time
import cv2
import queue


def detect_object():
    while True:
        print("get")
        frame = input_buffer.get()
        if frame is not None:
            time.sleep(1)
            detection_buffer.put(frame)
        else:
            break
    return


def show():
    while True:
        print("show")
        frame = detection_buffer.get()
        if frame is not None:
            cv2.imshow("Video",frame)
        else:
            break
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    return


if __name__ == "__main__":

    input_buffer = queue.Queue()
    detection_buffer = queue.Queue()

    cap = cv2.VideoCapture(0)

    for i in range(5):
        t = threading.Thread(target=detect_object)
        t.start()

    t1 = threading.Thread(target=show)
    t1.start()

    while True:
        ret,frame = cap.read()
        if ret:
            input_buffer.put(frame)
            time.sleep(0.025)
        else:
            break

    print("program ended")

解决方法

假设检测算法是 CPU 密集型的,您需要使用多处理而不是多线程,因为由于争用全局解释器锁,多个线程不会并行运行 Python 字节码。您还应该摆脱对 sleep 的所有调用。当您运行多个线程或以您正在执行的方式处理时,也不清楚什么保证帧将以正确的顺序输出,也就是说,第二帧的处理可以在第一帧的处理之前完成,并且首先写入 detection_buffer

以下使用了 6 个进程的处理池(现在不需要隐式输入队列)。

from multiprocessing import Pool,Queue
import time
import cv2

# intialize global variables for the pool processes:
def init_pool(d_b):
    global detection_buffer
    detection_buffer = d_b


def detect_object(frame):
    time.sleep(1)
    detection_buffer.put(frame)


def show():
    while True:
        print("show")
        frame = detection_buffer.get()
        if frame is not None:
            cv2.imshow("Video",frame)
        else:
            break
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    return


# required for Windows:
if __name__ == "__main__":

    detection_buffer = Queue()
    # 6 workers: 1 for the show task and 5 to process frames:
    pool = Pool(6,initializer=init_pool,initargs=(detection_buffer,))
    # run the "show" task:
    show_future = pool.apply_async(show)

    cap = cv2.VideoCapture(0)

    futures = []
    while True:
        ret,frame = cap.read()
        if ret:
            f = pool.apply_async(detect_object,args=(frame,))
            futures.append(f)
            time.sleep(0.025)
        else:
            break
    # wait for all the frame-putting tasks to complete:
    for f in futures:
        f.get()
    # signal the "show" task to end by placing None in the queue
    detection_buffer.put(None)
    show_future.get()
    print("program ended")