坚持使用队列在 `readFrame` 和 `vehicleDetection` 之间创建生产者-消费者关系

问题描述

我目前正在努力加快车辆检测流程。我想创建一个消费者生产者模型来读取帧并处理它们。这是我试图完成的基本结构:

(Read frames from input - live stream/video) --> [Store in input buffer] -->(do vehicle detection) --> [store in vehicledetection buffer]

鉴于从摄像头/视频读取帧是 I/O 绑定任务,因此我使用 threading.Thread,而车辆检测是 CPU 绑定进程,我为此使用 multiprocessing.Process

import os
import cv2
import argparse
from time import sleep
import multiprocessing as mp
import threading
from queue import Queue

inputQueue = Queue(10)

class cvSkeleton():
    def __init__(self,args):
        self.args = args
        self.frameId = 0
        self.stopped=False
        self.grabbed = True
        s#elf.inputQueue = Queue(10)
        self.vehicleDetectionQueue = mp.Queue(10)
        self.outputQueue = mp.Queue(10)
        self.videoBuffer = mp.Queue(10)
        self.name = "output window"
        if(self.args.video):
            self.videoCaptureObject = cv2.VideoCapture(args.video)
            sleep(2)
        else:
            raise Exception("No video loaded. Bad path!")
        (self.grabbed,self.frame) = self.videoCaptureObject.read()


    global inputQueue       

    def readFrame(self):        
        while True:
            grabbed,frame = self.videoCaptureObject.read()
            inputQueue.put((frame,self.frameId))
            self.frameId+=1
            print(f'input queue size = {inputQueue.qsize()}\n')
            
            if(inputQueue.full()):
                break
                

    def vehicleDetection(self):
        (frame,frameId) = inputQueue.get()
        print(f'got frame with id {frameId}\n')
        return
        
        
        
    
    def numberPlateOCRdetection():

        pass 
        # performs number plate extraction and ocr.
    
    def displayFrame():
        pass
        #displays frame
    
    def saveVideo():
        pass
        #takes a list of frames with frame id and merges them into a video.

if __name__ == '__main__':
    import cProfile

    app_profiler = cProfile.Profile()

    parser = argparse.ArgumentParser(description='BitSilica Traffic Analysis Solution')
    parser.add_argument('--image',help=' Full Path to image file.')
    parser.add_argument('--video',help='Full Path to video file.')
    parser.add_argument('--realtime',help='Camera Connected Input')
      
    args = parser.parse_args()
    #enable profiler here.
    app_profiler.enable()
    
    app = cvSkeleton(args)
    readFrameProcessThread = threading.Thread(target=app.readFrame)
    vehicleDetectionProcess1 = mp.Process(target=app.vehicleDetection)
    
    
    readFrameProcessThread.start() 
    sleep(1)
    vehicleDetectionProcess1.start()
    
    readFrameProcessThread.join()
    vehicleDetectionProcess1.join()
    #disable profiler here.
    app_profiler.disable()
    
    profile_name = str('temp.prof'.format(os.path.basename(args.video)[0:-4]))
    print("------------------------\nEnd of execution,dumping profile stats\n-------------------------")
    app_profiler.dump_stats(profile_name)  

错误:

input queue size = 1

input queue size = 2

input queue size = 3

input queue size = 4

input queue size = 5

input queue size = 6

input queue size = 7

input queue size = 8

input queue size = 9

input queue size = 10

^CTraceback (most recent call last):
  File "skeleton.py",line 95,in <module>
Process Process-1:
    readFrameProcessThread.join()
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/multiprocessing/process.py",line 140,in join
    res = self._popen.wait(timeout)
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/multiprocessing/popen_fork.py",line 48,in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/multiprocessing/popen_fork.py",line 28,in poll
    pid,sts = os.waitpid(self.pid,flag)
KeyboardInterrupt
Traceback (most recent call last):
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/multiprocessing/process.py",line 297,in _bootstrap
    self.run()
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/multiprocessing/process.py",line 99,in run
    self._target(*self._args,**self._kwargs)
  File "skeleton.py",line 44,in vehicleDetection
    (frame,frameId) = inputQueue.get()
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/queue.py",line 170,in get
    self.not_empty.wait()
  File "/home/atmadeep/anaconda3/envs/work-env/lib/python3.7/threading.py",line 296,in wait
    waiter.acquire()
KeyboardInterrupt
(work-env) atmadeep:trafficAI-master/ (multiprocessing✗) $ python3 skeleton.py --video short_video.mp4                                                                                                                    [16:50:55]
input queue size = 1

input queue size = 2

input queue size = 3

input queue size = 4

input queue size = 5

input queue size = 6

input queue size = 7

input queue size = 8

input queue size = 9

input queue size = 10

got frame with id 0

------------------------
End of execution,dumping profile stats
-------------------------

readFrame 将填充 inputQueue 到给定的 maxsize (=10) 并且 vehicleDetection 将仅读取第一帧并退出。需要帮助解决这个问题。我可能没有完全编写代码。我确实阅读了 Adrian rosebrock 的教程和许多其他人的代码,但他们都在 main 函数中使用了一个循环,这不是我想要做的,因为这样循环中的每次迭代都会产生进程并且内存会溢出。 tl:博士;在 readFramevehicleDetection 函数之间创建生产者-消费者关系。 readFrame 是一个线程,vehicleDetection 是一个进程。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)