问题描述
我需要从视频输入的库中检测特定对象。
图书馆里有各种各样的图片。应该逐渐加载并与视频输入进行比较。如果从库中读取的图像与视频输入上的一个对象匹配,则程序应停止并打印输出日志(检测所需的时间、检测精度等)
我尝试使用 OpenCV - Python、TM_CCOEFF_norMED 函数来仅在静态图像(而不是视频)上检测它,但我不能。
你能帮我吗?
https://drive.google.com/drive/folders/1rR9U2jkoHwbY7tW1-XXhThNahYQHAPHA?usp=sharing
解决方法
您可以使用带有 open cv 的 mobilenet 模型进行对象检测
import datetime
import imutils
import numpy as np
import csv
# from centroidtracker import CentroidTracker
from pyimagesearch.centroidtracker import CentroidTracker
protopath = "mobilenet_ss/MobileNetSSD_deploy.prototxt"
modelpath = "mobilenet_ss/MobileNetSSD_deploy.caffemodel"
detector = cv2.dnn.readNetFromCaffe(prototxt=protopath,caffeModel=modelpath)
# detector.setPreferableBackend(cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE)
detector.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
outputlist=[]
CLASSES = ["background","aeroplane","bicycle","bird","boat","bottle","bus","car","cat","chair","cow","diningtable","dog","horse","motorbike","person","pottedplant","sheep","sofa","train","tvmonitor"]
tracker = CentroidTracker(maxDisappeared=80,maxDistance=90)
def non_max_suppression_fast(boxes,overlapThresh):
try:
if len(boxes) == 0:
return []
if boxes.dtype.kind == "i":
boxes = boxes.astype("float")
pick = []
x1 = boxes[:,0]
y1 = boxes[:,1]
x2 = boxes[:,2]
y2 = boxes[:,3]
area = (x2 - x1 + 1) * (y2 - y1 + 1)
idxs = np.argsort(y2)
while len(idxs) > 0:
last = len(idxs) - 1
i = idxs[last]
pick.append(i)
xx1 = np.maximum(x1[i],x1[idxs[:last]])
yy1 = np.maximum(y1[i],y1[idxs[:last]])
xx2 = np.minimum(x2[i],x2[idxs[:last]])
yy2 = np.minimum(y2[i],y2[idxs[:last]])
w = np.maximum(0,xx2 - xx1 + 1)
h = np.maximum(0,yy2 - yy1 + 1)
overlap = (w * h) / area[idxs[:last]]
idxs = np.delete(idxs,np.concatenate(([last],np.where(overlap > overlapThresh)[0])))
return boxes[pick].astype("int")
except Exception as e:
print("Exception occurred in non_max_suppression : {}".format(e))
def main():
cap = cv2.VideoCapture('project_video.mp4')
fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
out = cv2.VideoWriter("output/output.mp4",fourcc,5.0,(600,337))
fps_start_time = datetime.datetime.now()
fps = 0
total_frames = 0
lpc_count = 0
opc_count = 0
object_id_list = []
# dtime = dict()
# dwell_time = dict()
while True:
ret,frame = cap.read()
if not ret:
break
frame = imutils.resize(frame,width=600)
total_frames = total_frames + 1
(H,W) = frame.shape[:2]
#print("h,w",H,W)
blob = cv2.dnn.blobFromImage(frame,0.007843,(W,H),127.5)
detector.setInput(blob)
person_detections = detector.forward()
rects = []
for i in np.arange(0,person_detections.shape[2]):
confidence = person_detections[0,i,2]
if confidence > 0.5:
idx = int(person_detections[0,1])
person_box = person_detections[0,3:7] * np.array([W,W,H])
(startX,startY,endX,endY) = person_box.astype("int")
rects.append(person_box)
boundingboxes = np.array(rects)
boundingboxes = boundingboxes.astype(int)
rects = non_max_suppression_fast(boundingboxes,0.3)
objects = tracker.update(rects)
for (objectId,bbox) in objects.items():
x1,y1,x2,y2 = bbox
x1 = int(x1)
y1 = int(y1)
x2 = int(x2)
y2 = int(y2)
cv2.rectangle(frame,(x1,y1),(x2,y2),(0,255),2)
text = "ID: {}".format(objectId)
cv2.putText(frame,text,y1-5),cv2.FONT_HERSHEY_COMPLEX_SMALL,1,1)
if objectId not in object_id_list:
object_id_list.append(objectId)
# dtime[objectId] = datetime.datetime.now()
# dwell_time[objectId] = 0
# else:
# curr_time = datetime.datetime.now()
# old_time = dtime[objectId]
# time_diff = curr_time - old_time
# dtime[objectId] = datetime.datetime.now()
# sec = time_diff.total_seconds()
# dwell_time[objectId] += sec
# text = "{}|{}".format(objectId,int(dwell_time[objectId]))
# cv2.putText(frame,1)
fps_end_time = datetime.datetime.now()
time_diff = fps_end_time - fps_start_time
if time_diff.seconds == 0:
fps = 0.0
else:
fps = (total_frames / time_diff.seconds)
fps_text = "FPS: {:.2f}".format(fps)
cv2.putText(frame,fps_text,(5,30),1)
lpc_count = len(objects)
opc_count = len(object_id_list)
lpc_txt = "LPC: {}".format(lpc_count)
opc_txt = "OPC: {}".format(opc_count)
# writing to csv file
outputlist.append([lpc_count,opc_count])
cv2.putText(frame,lpc_txt,60),1)
cv2.putText(frame,opc_txt,90),1)
out.write(frame)
cv2.imshow("Application",frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
cv2.destroyAllWindows()
main()```
**create CentroidTracker.py file and paste the below code on CentroidTracker.py file and import CentroidTracker.py where you want to use**
# import the necessary packages
from scipy.spatial import distance as dist
from collections import OrderedDict
import numpy as np
class CentroidTracker:
def __init__(self,maxDisappeared=50,maxDistance=50):
# initialize the next unique object ID along with two ordered
# dictionaries used to keep track of mapping a given object
# ID to its centroid and number of consecutive frames it has
# been marked as "disappeared",respectively
self.nextObjectID = 0
self.objects = OrderedDict()
self.disappeared = OrderedDict()
self.bbox = OrderedDict() # CHANGE
# store the number of maximum consecutive frames a given
# object is allowed to be marked as "disappeared" until we
# need to deregister the object from tracking
self.maxDisappeared = maxDisappeared
# store the maximum distance between centroids to associate
# an object -- if the distance is larger than this maximum
# distance we'll start to mark the object as "disappeared"
self.maxDistance = maxDistance
def register(self,centroid,inputRect):
# when registering an object we use the next available object
# ID to store the centroid
self.objects[self.nextObjectID] = centroid
self.bbox[self.nextObjectID] = inputRect # CHANGE
self.disappeared[self.nextObjectID] = 0
self.nextObjectID += 1
def deregister(self,objectID):
# to deregister an object ID we delete the object ID from
# both of our respective dictionaries
del self.objects[objectID]
del self.disappeared[objectID]
del self.bbox[objectID] # CHANGE
def update(self,rects):
# check to see if the list of input bounding box rectangles
# is empty
if len(rects) == 0:
# loop over any existing tracked objects and mark them
# as disappeared
for objectID in list(self.disappeared.keys()):
self.disappeared[objectID] += 1
# if we have reached a maximum number of consecutive
# frames where a given object has been marked as
# missing,deregister it
if self.disappeared[objectID] > self.maxDisappeared:
self.deregister(objectID)
# return early as there are no centroids or tracking info
# to update
# return self.objects
return self.bbox
# initialize an array of input centroids for the current frame
inputCentroids = np.zeros((len(rects),2),dtype="int")
inputRects = []
# loop over the bounding box rectangles
for (i,(startX,endY)) in enumerate(rects):
# use the bounding box coordinates to derive the centroid
cX = int((startX + endX) / 2.0)
cY = int((startY + endY) / 2.0)
inputCentroids[i] = (cX,cY)
inputRects.append(rects[i]) # CHANGE
# if we are currently not tracking any objects take the input
# centroids and register each of them
if len(self.objects) == 0:
for i in range(0,len(inputCentroids)):
self.register(inputCentroids[i],inputRects[i]) # CHANGE
# otherwise,are are currently tracking objects so we need to
# try to match the input centroids to existing object
# centroids
else:
# grab the set of object IDs and corresponding centroids
objectIDs = list(self.objects.keys())
objectCentroids = list(self.objects.values())
# compute the distance between each pair of object
# centroids and input centroids,respectively -- our
# goal will be to match an input centroid to an existing
# object centroid
D = dist.cdist(np.array(objectCentroids),inputCentroids)
# in order to perform this matching we must (1) find the
# smallest value in each row and then (2) sort the row
# indexes based on their minimum values so that the row
# with the smallest value as at the *front* of the index
# list
rows = D.min(axis=1).argsort()
# next,we perform a similar process on the columns by
# finding the smallest value in each column and then
# sorting using the previously computed row index list
cols = D.argmin(axis=1)[rows]
# in order to determine if we need to update,register,# or deregister an object we need to keep track of which
# of the rows and column indexes we have already examined
usedRows = set()
usedCols = set()
# loop over the combination of the (row,column) index
# tuples
for (row,col) in zip(rows,cols):
# if we have already examined either the row or
# column value before,ignore it
if row in usedRows or col in usedCols:
continue
# if the distance between centroids is greater than
# the maximum distance,do not associate the two
# centroids to the same object
if D[row,col] > self.maxDistance:
continue
# otherwise,grab the object ID for the current row,# set its new centroid,and reset the disappeared
# counter
objectID = objectIDs[row]
self.objects[objectID] = inputCentroids[col]
self.bbox[objectID] = inputRects[col] # CHANGE
self.disappeared[objectID] = 0
# indicate that we have examined each of the row and
# column indexes,respectively
usedRows.add(row)
usedCols.add(col)
# compute both the row and column index we have NOT yet
# examined
unusedRows = set(range(0,D.shape[0])).difference(usedRows)
unusedCols = set(range(0,D.shape[1])).difference(usedCols)
# in the event that the number of object centroids is
# equal or greater than the number of input centroids
# we need to check and see if some of these objects have
# potentially disappeared
if D.shape[0] >= D.shape[1]:
# loop over the unused row indexes
for row in unusedRows:
# grab the object ID for the corresponding row
# index and increment the disappeared counter
objectID = objectIDs[row]
self.disappeared[objectID] += 1
# check to see if the number of consecutive
# frames the object has been marked "disappeared"
# for warrants deregistering the object
if self.disappeared[objectID] > self.maxDisappeared:
self.deregister(objectID)
# otherwise,if the number of input centroids is greater
# than the number of existing object centroids we need to
# register each new input centroid as a trackable object
else:
for col in unusedCols:
self.register(inputCentroids[col],inputRects[col])
# return the set of trackable objects
# return self.objects
return self.bbox