Python - OpenCV - 从视频库中检测对象

问题描述

我需要从视频输入的库中检测特定对象。

图书馆里有各种各样的图片。应该逐渐加载并与视频输入进行比较。如果从库中读取的图像与视频输入上的一个对象匹配,则程序应停止并打印输出日志(检测所需的时间、检测精度等)

我尝试使用 OpenCV - Python、TM_CCOEFF_norMED 函数来仅在静态图像(而不是视频)上检测它,但我不能。

你能帮我吗?

这是一个带有库和视频输入的文件链接

https://drive.google.com/drive/folders/1rR9U2jkoHwbY7tW1-XXhThNahYQHAPHA?usp=sharing

解决方法

您可以使用带有 open cv 的 mobilenet 模型进行对象检测

import datetime
import imutils
import numpy as np
import csv 
# from centroidtracker import CentroidTracker
from pyimagesearch.centroidtracker import CentroidTracker
protopath = "mobilenet_ss/MobileNetSSD_deploy.prototxt"
modelpath = "mobilenet_ss/MobileNetSSD_deploy.caffemodel"


detector = cv2.dnn.readNetFromCaffe(prototxt=protopath,caffeModel=modelpath)
# detector.setPreferableBackend(cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE)
detector.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)


  
outputlist=[]        

CLASSES = ["background","aeroplane","bicycle","bird","boat","bottle","bus","car","cat","chair","cow","diningtable","dog","horse","motorbike","person","pottedplant","sheep","sofa","train","tvmonitor"]

tracker = CentroidTracker(maxDisappeared=80,maxDistance=90)


def non_max_suppression_fast(boxes,overlapThresh):
    try:
        if len(boxes) == 0:
            return []

        if boxes.dtype.kind == "i":
            boxes = boxes.astype("float")

        pick = []

        x1 = boxes[:,0]
        y1 = boxes[:,1]
        x2 = boxes[:,2]
        y2 = boxes[:,3]

        area = (x2 - x1 + 1) * (y2 - y1 + 1)
        idxs = np.argsort(y2)

        while len(idxs) > 0:
            last = len(idxs) - 1
            i = idxs[last]
            pick.append(i)

            xx1 = np.maximum(x1[i],x1[idxs[:last]])
            yy1 = np.maximum(y1[i],y1[idxs[:last]])
            xx2 = np.minimum(x2[i],x2[idxs[:last]])
            yy2 = np.minimum(y2[i],y2[idxs[:last]])

            w = np.maximum(0,xx2 - xx1 + 1)
            h = np.maximum(0,yy2 - yy1 + 1)

            overlap = (w * h) / area[idxs[:last]]

            idxs = np.delete(idxs,np.concatenate(([last],np.where(overlap > overlapThresh)[0])))

        return boxes[pick].astype("int")
    except Exception as e:
        print("Exception occurred in non_max_suppression : {}".format(e))


def main():
    cap = cv2.VideoCapture('project_video.mp4')

    fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
    out = cv2.VideoWriter("output/output.mp4",fourcc,5.0,(600,337))


    fps_start_time = datetime.datetime.now()
    fps = 0
    total_frames = 0
    lpc_count = 0
    opc_count = 0
    object_id_list = []

    
    # dtime = dict()
    # dwell_time = dict()

    while True:
        ret,frame = cap.read()
        if not ret:
            break
        frame = imutils.resize(frame,width=600)
        total_frames = total_frames + 1

        (H,W) = frame.shape[:2]
        #print("h,w",H,W)
        blob = cv2.dnn.blobFromImage(frame,0.007843,(W,H),127.5)

        detector.setInput(blob)
        person_detections = detector.forward()
        rects = []
        for i in np.arange(0,person_detections.shape[2]):
            confidence = person_detections[0,i,2]
            if confidence > 0.5:
                idx = int(person_detections[0,1])

                person_box = person_detections[0,3:7] * np.array([W,W,H])
                (startX,startY,endX,endY) = person_box.astype("int")
                rects.append(person_box)

        boundingboxes = np.array(rects)
        boundingboxes = boundingboxes.astype(int)
        rects = non_max_suppression_fast(boundingboxes,0.3)

        objects = tracker.update(rects)
        for (objectId,bbox) in objects.items():
            x1,y1,x2,y2 = bbox
            x1 = int(x1)
            y1 = int(y1)
            x2 = int(x2)
            y2 = int(y2)

            cv2.rectangle(frame,(x1,y1),(x2,y2),(0,255),2)
            text = "ID: {}".format(objectId)
            cv2.putText(frame,text,y1-5),cv2.FONT_HERSHEY_COMPLEX_SMALL,1,1)

            if objectId not in object_id_list:
                object_id_list.append(objectId)
                # dtime[objectId] = datetime.datetime.now()
                # dwell_time[objectId] = 0
            # else:
            #     curr_time = datetime.datetime.now()
            #     old_time = dtime[objectId]
            #     time_diff = curr_time - old_time
            #     dtime[objectId] = datetime.datetime.now()
            #     sec = time_diff.total_seconds()
            #     dwell_time[objectId] += sec

            # text = "{}|{}".format(objectId,int(dwell_time[objectId]))
            # cv2.putText(frame,1)
        fps_end_time = datetime.datetime.now()
        time_diff = fps_end_time - fps_start_time
        if time_diff.seconds == 0:
            fps = 0.0
        else:
            fps = (total_frames / time_diff.seconds)

        fps_text = "FPS: {:.2f}".format(fps)
        
        cv2.putText(frame,fps_text,(5,30),1)

        lpc_count = len(objects)
        opc_count = len(object_id_list)

        lpc_txt = "LPC: {}".format(lpc_count)
        opc_txt = "OPC: {}".format(opc_count)
        # writing to csv file  
        outputlist.append([lpc_count,opc_count])
        
            
        cv2.putText(frame,lpc_txt,60),1)
        cv2.putText(frame,opc_txt,90),1)

        out.write(frame)
        cv2.imshow("Application",frame)
        key = cv2.waitKey(1)
        if key == ord('q'):
            break
        
    cv2.destroyAllWindows()
   

main()```

**create CentroidTracker.py file and paste the below code on CentroidTracker.py file and import CentroidTracker.py where you want to use**

    # import the necessary packages
from scipy.spatial import distance as dist
from collections import OrderedDict
import numpy as np


class CentroidTracker:
    def __init__(self,maxDisappeared=50,maxDistance=50):
        # initialize the next unique object ID along with two ordered
        # dictionaries used to keep track of mapping a given object
        # ID to its centroid and number of consecutive frames it has
        # been marked as "disappeared",respectively
        self.nextObjectID = 0
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()
        self.bbox = OrderedDict()  # CHANGE

        # store the number of maximum consecutive frames a given
        # object is allowed to be marked as "disappeared" until we
        # need to deregister the object from tracking
        self.maxDisappeared = maxDisappeared

        # store the maximum distance between centroids to associate
        # an object -- if the distance is larger than this maximum
        # distance we'll start to mark the object as "disappeared"
        self.maxDistance = maxDistance

    def register(self,centroid,inputRect):
        # when registering an object we use the next available object
        # ID to store the centroid
        self.objects[self.nextObjectID] = centroid
        self.bbox[self.nextObjectID] = inputRect  # CHANGE
        self.disappeared[self.nextObjectID] = 0
        self.nextObjectID += 1

    def deregister(self,objectID):
        # to deregister an object ID we delete the object ID from
        # both of our respective dictionaries
        del self.objects[objectID]
        del self.disappeared[objectID]
        del self.bbox[objectID]  # CHANGE

    def update(self,rects):
        # check to see if the list of input bounding box rectangles
        # is empty
        if len(rects) == 0:
            # loop over any existing tracked objects and mark them
            # as disappeared
            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1

                # if we have reached a maximum number of consecutive
                # frames where a given object has been marked as
                # missing,deregister it
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)

            # return early as there are no centroids or tracking info
            # to update
            # return self.objects
            return self.bbox

        # initialize an array of input centroids for the current frame
        inputCentroids = np.zeros((len(rects),2),dtype="int")
        inputRects = []
        # loop over the bounding box rectangles
        for (i,(startX,endY)) in enumerate(rects):
            # use the bounding box coordinates to derive the centroid
            cX = int((startX + endX) / 2.0)
            cY = int((startY + endY) / 2.0)
            inputCentroids[i] = (cX,cY)
            inputRects.append(rects[i])  # CHANGE

        # if we are currently not tracking any objects take the input
        # centroids and register each of them
        if len(self.objects) == 0:
            for i in range(0,len(inputCentroids)):
                self.register(inputCentroids[i],inputRects[i])  # CHANGE

        # otherwise,are are currently tracking objects so we need to
        # try to match the input centroids to existing object
        # centroids
        else:
            # grab the set of object IDs and corresponding centroids
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())

            # compute the distance between each pair of object
            # centroids and input centroids,respectively -- our
            # goal will be to match an input centroid to an existing
            # object centroid
            D = dist.cdist(np.array(objectCentroids),inputCentroids)

            # in order to perform this matching we must (1) find the
            # smallest value in each row and then (2) sort the row
            # indexes based on their minimum values so that the row
            # with the smallest value as at the *front* of the index
            # list
            rows = D.min(axis=1).argsort()

            # next,we perform a similar process on the columns by
            # finding the smallest value in each column and then
            # sorting using the previously computed row index list
            cols = D.argmin(axis=1)[rows]

            # in order to determine if we need to update,register,# or deregister an object we need to keep track of which
            # of the rows and column indexes we have already examined
            usedRows = set()
            usedCols = set()

            # loop over the combination of the (row,column) index
            # tuples
            for (row,col) in zip(rows,cols):
                # if we have already examined either the row or
                # column value before,ignore it
                if row in usedRows or col in usedCols:
                    continue

                # if the distance between centroids is greater than
                # the maximum distance,do not associate the two
                # centroids to the same object
                if D[row,col] > self.maxDistance:
                    continue

                # otherwise,grab the object ID for the current row,# set its new centroid,and reset the disappeared
                # counter
                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.bbox[objectID] = inputRects[col]  # CHANGE
                self.disappeared[objectID] = 0

                # indicate that we have examined each of the row and
                # column indexes,respectively
                usedRows.add(row)
                usedCols.add(col)

            # compute both the row and column index we have NOT yet
            # examined
            unusedRows = set(range(0,D.shape[0])).difference(usedRows)
            unusedCols = set(range(0,D.shape[1])).difference(usedCols)

            # in the event that the number of object centroids is
            # equal or greater than the number of input centroids
            # we need to check and see if some of these objects have
            # potentially disappeared
            if D.shape[0] >= D.shape[1]:
                # loop over the unused row indexes
                for row in unusedRows:
                    # grab the object ID for the corresponding row
                    # index and increment the disappeared counter
                    objectID = objectIDs[row]
                    self.disappeared[objectID] += 1

                    # check to see if the number of consecutive
                    # frames the object has been marked "disappeared"
                    # for warrants deregistering the object
                    if self.disappeared[objectID] > self.maxDisappeared:
                        self.deregister(objectID)

            # otherwise,if the number of input centroids is greater
            # than the number of existing object centroids we need to
            # register each new input centroid as a trackable object
            else:
                for col in unusedCols:
                    self.register(inputCentroids[col],inputRects[col])

        # return the set of trackable objects
        # return self.objects
        return self.bbox