Python OpenCV使用dlib進行多目標(biāo)跟蹤詳解
在本教程中,您將學(xué)習(xí)如何使用 dlib 庫在實時視頻中有效地跟蹤多個對象。
我們當(dāng)然可以使用 dlib 跟蹤多個對象;但是,為了獲得可能的最佳性能,我們需要利用多處理并將對象跟蹤器分布在處理器的多個內(nèi)核上。
正確利用多處理使我們能夠?qū)?dlib 多對象跟蹤每秒幀數(shù) (FPS) 提高 45% 以上!
1.使用 dlib 進行多目標(biāo)跟蹤
在本指南的第一部分,我將演示如何實現(xiàn)一個簡單、樸素的 dlib 多對象跟蹤腳本。該程序?qū)⒏櫼曨l中的多個對象;但是,我們會注意到腳本運行速度有點慢。 為了提高我們的 FPS,我將向您展示一個更快、更高效的 dlib 多對象跟蹤器實現(xiàn)。 最后,我將討論一些改進和建議,以增強我們的多對象跟蹤實現(xiàn)。
2.項目結(jié)構(gòu)
你可以使用tree命令查看我們的項目結(jié)構(gòu):
mobilenet_ssd/ 目錄包含我們的 MobileNet + SSD Caffe 模型文件,它允許我們檢測人(以及其他對象)。 今天我們將回顧兩個 Python 腳本:
- multi_object_tracking_slow.py:dlib 多對象跟蹤的簡單“樸素”方法。
- multi_object_tracking_fast.py:利用多處理的先進、快速的方法。
3.dlib 多對象跟蹤的簡單“樸素”方法
我們今天要介紹的第一個 dlib 多對象跟蹤實現(xiàn)是“樸素的”,因為它將:
1.使用一個簡單的跟蹤器對象列表。
2.僅使用我們處理器的單個內(nèi)核按順序更新每個跟蹤器。
對于某些對象跟蹤任務(wù),此實現(xiàn)將綽綽有余;然而,為了優(yōu)化我們的 FPS,我們應(yīng)該將對象跟蹤器分布在多個進程中。
我們將從本節(jié)中的簡單實現(xiàn)開始,然后在下一節(jié)中轉(zhuǎn)到更快的方法。 首先,打開multi_object_tracking_slow.py 腳本并插入以下代碼:
# import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2
讓我們解析我們的命令行參數(shù):
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
我們的腳本在運行時處理以下命令行參數(shù):
- --prototxt :Caffe 部署 prototxt 文件的路徑。
- --model : prototxt 附帶的模型文件的路徑。
- --video : 輸入視頻文件的路徑。我們將在此視頻中使用 dlib 執(zhí)行多對象跟蹤。
- --output :輸出視頻文件的可選路徑。如果未指定路徑,則不會將視頻輸出到磁盤。我建議輸出到 .avi 或 .mp4 文件。
- --confidence :對象檢測置信度閾值 ,默認(rèn)是0.2 ,該值表示從對象檢測器過濾弱檢測的最小概率。
讓我們定義這個模型支持的類列表,并從磁盤加載我們的模型:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
我們只關(guān)心今天的賽跑示例中的“人”類,但您可以輕松修改以跟蹤其他類。 我們加載了預(yù)訓(xùn)練的對象檢測器模型。我們將使用我們預(yù)訓(xùn)練的 SSD 來檢測視頻中物體的存在。我們將創(chuàng)建一個 dlib 對象跟蹤器來跟蹤每個檢測到的對象。
我們還有一些初始化要執(zhí)行:
# initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start()
我們初始化我們的視頻流——我們將從輸入視頻中一次讀取一個幀。 隨后,我們的視頻writer被初始化為 None 。在即將到來的 while 循環(huán)中,我們將與視頻writer進行更多合作。 現(xiàn)在初始化我們的跟蹤器和標(biāo)簽列表。 最后,開始我們的每秒幀數(shù)計數(shù)器。 我們都準(zhǔn)備好開始處理視頻了:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
將幀調(diào)整為600像素寬,保持高寬比。然后,為了dlib兼容性,幀被轉(zhuǎn)換為RGB顏色通道排序(OpenCV的默認(rèn)值是BGR,而dlib的默認(rèn)值是RGB)。
讓我們開始對象檢測階段:
# if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward()
為了執(zhí)行對象跟蹤,我們必須首先執(zhí)行對象檢測
- 手動,通過停止視頻流并手動選擇每個對象的邊界框。
- 以編程方式,使用經(jīng)過訓(xùn)練的對象檢測器來檢測對象的存在(這就是我們在這里所做的)。
如果沒有對象跟蹤器,那么我們知道我們還沒有執(zhí)行對象檢測。
我們創(chuàng)建并通過 SSD 網(wǎng)絡(luò)傳遞一個 blob 以檢測對象。
接下來,我們繼續(xù)循環(huán)檢測以查找屬于person類的對象,因為我們的輸入視頻是人類的賽跑:
# loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
我們開始循環(huán)檢測,其中我們:
- 過濾掉弱檢測。
- 確保每個檢測都是一個person。當(dāng)然,您可以刪除這行代碼或根據(jù)您自己的過濾需求對其進行自定義。
現(xiàn)在我們已經(jīng)在框架中定位了每個person,讓我們實例化我們的跟蹤器并繪制我們的初始邊界框 + 類標(biāo)簽:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
要開始跟蹤對象,我們:
- 計算每個檢測到的對象的邊界框。
- 實例化邊界框坐標(biāo)并將其傳遞給跟蹤器。邊界框在這里尤為重要。我們需要為邊界框創(chuàng)建一個 dlib.rectangle 并將其傳遞給 start_track 方法。然后,dlib 可以開始跟蹤對象。
- 最后,我們用單個跟蹤器填充trackers列表。
因此,在下一個代碼塊中,我們將處理已經(jīng)建立跟蹤器并且只需要更新位置的情況。 我們在初始檢測步驟中執(zhí)行了兩個額外的任務(wù):
- 將類標(biāo)簽附加到標(biāo)簽列表。如果您要跟蹤多種類型的對象(例如dog+person),您可能希望知道每個對象的類型。
- 在對象周圍繪制每個邊界框矩形和類標(biāo)簽。
如果我們的檢測列表的長度大于0,我們就知道我們處于目標(biāo)跟蹤階段:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
在目標(biāo)跟蹤階段,我們遍歷所有trackers和相應(yīng)的labels。然后我們繼續(xù)update每個對象的位置。為了更新位置,我們只需傳遞 rgb 圖像。
提取邊界框坐標(biāo)后,我們可以為每個被跟蹤對象繪制一個邊界框rectangle和label。
幀處理循環(huán)中的其余步驟涉及寫入輸出視頻(如有必要)并顯示結(jié)果:
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update()
在這里,我們:
- 如有必要,將frame寫入視頻。
- 顯示輸出幀并捕獲按鍵。如果按下q鍵(退出),我們就會跳出循環(huán)。 最后,我們更新我們的每秒幀數(shù)信息以進行基準(zhǔn)測試。
剩下的步驟是在終端打印FPS信息并釋放指針:
# stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
讓我們評估準(zhǔn)確性和性能。打開終端并執(zhí)行以下命令:
$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \ --video race.mp4 --output race_output_slow.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 24.51 [INFO] approx. FPS: 13.87
看來我們的多目標(biāo)跟蹤器起作用了!
但正如你所看到的,我們只獲得了約13幀/秒。
對于某些應(yīng)用程序來說,這個FPS可能已經(jīng)足夠了——然而,如果你需要更快的FPS,我建議你看看下面我們更高效的dlib多對象跟蹤器。其次,要明白跟蹤的準(zhǔn)確性并不完美。
4.快速、高效的 dlib 多對象跟蹤實現(xiàn)
如果您運行上一節(jié)中的 dlib 多對象跟蹤腳本并同時打開系統(tǒng)的監(jiān)視器,您會注意到只使用了處理器的一個內(nèi)核。
如果您運行上一節(jié)中的 dlib 多對象跟蹤腳本并同時打開系統(tǒng)的活動監(jiān)視器,您會注意到只使用了處理器的一個內(nèi)核。
利用進程使我們的操作系統(tǒng)能夠執(zhí)行更好的進程調(diào)度,將進程映射到我們機器上的特定處理器內(nèi)核(大多數(shù)現(xiàn)代操作系統(tǒng)能夠以并行方式有效地調(diào)度使用大量 CPU 的進程)。
繼續(xù)打開 mutli_object_tracking_fast.py 并插入以下代碼:
# import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2
我們將使用 Python Process 類來生成一個新進程——每個新進程都獨立于原始進程。
為了生成這個進程,我們需要提供一個 Python 可以調(diào)用的函數(shù),然后 Python 將使用該函數(shù)并創(chuàng)建一個全新的進程并執(zhí)行它:
def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect)
start_tracker 的前三個參數(shù)包括:
- box :我們要跟蹤的對象的邊界框坐標(biāo),可能是由某種對象檢測器返回的,無論是手動的還是編程的。
- label :對象的人類可讀標(biāo)簽。
- rgb :我們將用于啟動初始 dlib 對象跟蹤器的 RGB 圖像。
請記住Python多處理是如何工作的——Python將調(diào)用這個函數(shù),然后創(chuàng)建一個全新的解釋器來執(zhí)行其中的代碼。因此,每個生成的start_tracker進程都將獨立于它的父進程。為了與Python驅(qū)動程序腳本通信,我們需要利用管道或隊列(Pipes and Queues)。這兩種類型的對象都是線程/進程安全的,使用鎖和信號量來完成。
本質(zhì)上,我們正在創(chuàng)建一個簡單的生產(chǎn)者/消費者關(guān)系:
- 我們的父進程將生成新幀并將它們添加到特定對象跟蹤器的隊列中。
- 然后子進程將消耗幀,應(yīng)用對象跟蹤,然后返回更新的邊界框坐標(biāo)。
我決定在這篇文章中使用 Queue 對象;但是,請記住,如果您愿意,也可以使用Pipe
現(xiàn)在讓我們開始一個無限循環(huán),它將在進程中運行:
# loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY)))
我們在這里無限循環(huán)——這個函數(shù)將作為守護進程調(diào)用,所以我們不需要擔(dān)心加入它。
首先,我們將嘗試從 inputQueue 中抓取一個新幀。如果幀不為空,我們將抓取幀,然后更新對象跟蹤器,讓我們獲得更新后的邊界框坐標(biāo)。
最后,我們將標(biāo)簽和邊界框?qū)懭?outputQueue,以便父進程可以在腳本的主循環(huán)中使用它們。
回到父進程,我們將解析命令行參數(shù):
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
此腳本的命令行參數(shù)與我們較慢的非多處理腳本完全相同。
讓我們初始化我們的輸入和輸出隊列:
# initialize our lists of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = []
這些隊列將保存我們正在跟蹤的對象。生成的每個進程都需要兩個 Queue 對象:
- 一個從其中讀取輸入幀
- 另一個將結(jié)果寫入
下一個代碼塊與我們之前的腳本相同:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start()
我們定義模型的 CLASSES 并加載模型本身。
現(xiàn)在讓我們開始循環(huán)視頻流中的幀:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
現(xiàn)在讓我們處理沒有 inputQueues 的情況:
# if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
如果沒有 inputQueues,那么我們需要在對象跟蹤之前應(yīng)用對象檢測。 我們應(yīng)用對象檢測,然后繼續(xù)循環(huán)。我們獲取置信度值并過濾掉弱檢測。 如果我們的置信度滿足我們的命令行參數(shù)建立的閾值,我們會考慮檢測,但我們會通過類標(biāo)簽進一步過濾掉它。在這種情況下,我們只尋找person對象。 假設(shè)我們找到了一個person,我們將創(chuàng)建隊列和生成跟蹤進程:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
我們首先計算邊界框坐標(biāo)。從那里我們創(chuàng)建兩個新隊列 iq 和 oq,分別將它們附加到 inputQueues 和 outputQueues。我們生成一個新的 start_tracker 進程,傳遞邊界框、標(biāo)簽、rgb 圖像和 iq + oq。
我們還繪制了檢測到的對象的邊界框rectangle和類標(biāo)簽label。
否則,我們已經(jīng)執(zhí)行了對象檢測,因此我們需要將每個 dlib 對象跟蹤器應(yīng)用于幀:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
遍歷每個 inputQueues ,我們將 rgb 圖像添加到它們。然后我們遍歷每個outputQueues,從每個獨立的對象跟蹤器獲取邊界框坐標(biāo)。最后,我們繪制邊界框+關(guān)聯(lián)的類標(biāo)簽label。
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
如有必要,我們將幀寫入輸出視頻,并將幀顯示到屏幕。 如果按下q鍵,我們退出,跳出循環(huán)。 如果我們繼續(xù)處理幀,我們的 FPS 計算器會更新,然后我們再次在 while 循環(huán)的開頭開始處理。 否則,我們處理完幀,我們顯示 FPS 信息 + 釋放指針并關(guān)閉窗口。
打開終端并執(zhí)行以下命令:
$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \ --video race.mp4 --output race_output_fast.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 14.01 [INFO] approx. FPS: 24.26
如您所見,我們更快、更高效的多對象跟蹤器以 24 FPS 運行,比我們之前的實現(xiàn)提高了 45% 以上?! 此外,如果您在此腳本運行時打開活動監(jiān)視器,您將看到更多系統(tǒng)的CPU 正在被使用。 這種加速是通過允許每個 dlib 對象跟蹤器在單獨的進程中運行來獲得的,這反過來又使您的操作系統(tǒng)能夠執(zhí)行更有效的 CPU 資源調(diào)度。
5.完整代碼
multi_object_tracking_slow.py
# USAGE # python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2 # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") # ap.add_argument("-v", "--video", required=True, # help="path to input video file") ap.add_argument("-v", "--video", help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") # vs = cv2.VideoCapture(args["video"]) vs = cv2.VideoCapture(0) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
multi_object_tracking_fast.py
# USAGE # python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2 def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect) # loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY))) # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize our list of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = [] # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
鏈接:https://pan.baidu.com/s/1WhJr-Qxh5Wu3TsXKRiTHRg 提取碼:1234
6.改進和建議
我今天與大家分享的 dlib 多對象跟蹤 Python 腳本可以很好地處理較短的視頻流;但是,如果您打算將此實現(xiàn)用于長時間運行的生產(chǎn)環(huán)境(大約數(shù)小時到數(shù)天的視頻),我建議您進行兩項主要改進:
第一個改進是利用進程池,而不是為每個要跟蹤的對象生成一個全新的進程。今天在這里介紹的實現(xiàn)為我們需要跟蹤的每個對象構(gòu)建了一個全新的隊列Queue和進程Process。
對于今天的目的來說這很好,但考慮一下如果您想跟蹤視頻中的 50 個對象——這意味著您將生成 50 個進程,每個對象一個。那時,系統(tǒng)管理所有這些進程的開銷將破壞 FPS 的任何增加。相反,您可能希望利用進程池。
如果您的系統(tǒng)有 N 個處理器內(nèi)核,那么您需要創(chuàng)建一個包含 N – 1 個進程的池,將一個內(nèi)核留給您的操作系統(tǒng)來執(zhí)行系統(tǒng)操作。這些進程中的每一個都應(yīng)該執(zhí)行多個對象跟蹤,維護一個對象跟蹤器列表,類似于我們今天介紹的第一個多對象跟蹤。
這種改進將允許您利用處理器的所有內(nèi)核,而無需產(chǎn)生許多獨立進程的開銷。
我要做的第二個改進是清理進程和隊列。如果 dlib 將對象報告為“丟失”或“消失”,我們不會從 start_tracker 函數(shù)返回,這意味著該進程將在父腳本的生命周期內(nèi)存活,并且僅在父腳本退出時被終止。
同樣,這對于我們今天的目的來說很好,但是如果您打算在生產(chǎn)環(huán)境中使用此代碼,您應(yīng)該:
- 更新 start_tracker 函數(shù)以在 dlib 報告對象丟失后返回。
- 同時刪除對應(yīng)進程的 inputQueue 和 outputQueue。
未能執(zhí)行此清理將導(dǎo)致長時間運行作業(yè)的不必要的計算消耗和內(nèi)存開銷。
第三個改進是通過每 N 幀運行一次對象檢測器(而不是在開始時只運行一次)來提高跟蹤精度。
實際上,我在使用 OpenCV 計數(shù)的文章中演示了這一點。它需要更多的邏輯和思考,但會產(chǎn)生更準(zhǔn)確的跟蹤器。 我選擇放棄這個腳本的實現(xiàn),這樣我就可以簡明地教你多處理方法。 理想情況下,除了多處理之外,您還可以使用第三個改進。
以上就是Python OpenCV使用dlib進行多目標(biāo)跟蹤詳解的詳細(xì)內(nèi)容,更多關(guān)于OpenCV dlib多目標(biāo)跟蹤的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程
今天動手開始搭建TensorFlow開發(fā)環(huán)境,所以下面這篇文章主要給大家介紹了關(guān)于TensorFlow安裝并在Pycharm搭建環(huán)境的詳細(xì)圖文教程,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-11-11python實現(xiàn)Dijkstra算法的最短路徑問題
這篇文章主要介紹了python實現(xiàn)Dijkstra算法的最短路徑問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete
這篇文章主要介紹了numpy如何刪除矩陣中的部分?jǐn)?shù)據(jù)numpy.delete問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02Python?使用?pip?安裝?matplotlib?模塊的方法
matplotlib是python中強大的畫圖模塊,這篇文章主要介紹了Python?使用?pip?安裝?matplotlib?模塊(秒解版),本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例
這篇文章主要介紹了python監(jiān)控網(wǎng)卡流量并使用graphite繪圖的示例,需要的朋友可以參考下2014-04-04