文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Python OpenCV如何使用dlib进行多目标跟踪

2023-06-29 11:43

关注

这篇文章主要讲解了“Python OpenCV如何使用dlib进行多目标跟踪”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Python OpenCV如何使用dlib进行多目标跟踪”吧!

1.使用 dlib 进行多目标跟踪

在本指南的第一部分,我将演示如何实现一个简单、朴素的 dlib 多对象跟踪脚本。该程序将跟踪视频中的多个对象;但是,我们会注意到脚本运行速度有点慢。 为了提高我们的 FPS,我将向您展示一个更快、更高效的 dlib 多对象跟踪器实现。 最后,我将讨论一些改进和建议,以增强我们的多对象跟踪实现。

2.项目结构

你可以使用tree命令查看我们的项目结构:

Python OpenCV如何使用dlib进行多目标跟踪

mobilenet_ssd/ 目录包含我们的 MobileNet + SSD Caffe 模型文件,它允许我们检测人(以及其他对象)。 今天我们将回顾两个 Python 脚本:

3.dlib 多对象跟踪的简单“朴素”方法

我们今天要介绍的第一个 dlib 多对象跟踪实现是“朴素的”,因为它将:

使用一个简单的跟踪器对象列表。

仅使用我们处理器的单个内核按顺序更新每个跟踪器。

对于某些对象跟踪任务,此实现将绰绰有余;然而,为了优化我们的 FPS,我们应该将对象跟踪器分布在多个进程中。

我们将从本节中的简单实现开始,然后在下一节中转到更快的方法。 首先,打开multi_object_tracking_slow.py 脚本并插入以下代码:

# import the necessary packagesfrom imutils.video import FPSimport numpy as npimport argparseimport imutilsimport dlibimport cv2

让我们解析我们的命令行参数:

# construct the argument parser and parse the argumentsap = argparse.ArgumentParser()ap.add_argument("-p", "--prototxt", required=True,    help="path to Caffe 'deploy' prototxt file")ap.add_argument("-m", "--model", required=True,    help="path to Caffe pre-trained model")ap.add_argument("-v", "--video", required=True,    help="path to input video file")ap.add_argument("-o", "--output", type=str,    help="path to optional output video file")ap.add_argument("-c", "--confidence", type=float, default=0.2,    help="minimum probability to filter weak detections")args = vars(ap.parse_args())

我们的脚本在运行时处理以下命令行参数:

让我们定义这个模型支持的类列表,并从磁盘加载我们的模型:

# initialize the list of class labels MobileNet SSD was trained to# detectCLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",    "sofa", "train", "tvmonitor"]# load our serialized model from diskprint("[INFO] loading model...")net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

我们只关心今天的赛跑示例中的“人”类,但您可以轻松修改以跟踪其他类。 我们加载了预训练的对象检测器模型。我们将使用我们预训练的 SSD 来检测视频中物体的存在。我们将创建一个 dlib 对象跟踪器来跟踪每个检测到的对象。

我们还有一些初始化要执行:

# initialize the video stream and output video writerprint("[INFO] starting video stream...")vs = cv2.VideoCapture(args["video"])writer = None# initialize the list of object trackers and corresponding class# labelstrackers = []labels = []# start the frames per second throughput estimatorfps = FPS().start()

我们初始化我们的视频流——我们将从输入视频中一次读取一个帧。 随后,我们的视频writer被初始化为 None 。在即将到来的 while 循环中,我们将与视频writer进行更多合作。 现在初始化我们的跟踪器和标签列表。 最后,开始我们的每秒帧数计数器。 我们都准备好开始处理视频了:

# loop over frames from the video file streamwhile True:    # grab the next frame from the video file    (grabbed, frame) = vs.read()    # check to see if we have reached the end of the video file    if frame is None:        break    # resize the frame for faster processing and then convert the    # frame from BGR to RGB ordering (dlib needs RGB ordering)    frame = imutils.resize(frame, width=600)    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    # if we are supposed to be writing a video to disk, initialize    # the writer    if args["output"] is not None and writer is None:        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        writer = cv2.VideoWriter(args["output"], fourcc, 30,            (frame.shape[1], frame.shape[0]), True)

将帧调整为600像素宽,保持高宽比。然后,为了dlib兼容性,帧被转换为RGB颜色通道排序(OpenCV的默认值是BGR,而dlib的默认值是RGB)。

让我们开始对象检测阶段:

    # if there are no object trackers we first need to detect objects    # and then create a tracker for each object    if len(trackers) == 0:        # grab the frame dimensions and convert the frame to a blob        (h, w) = frame.shape[:2]        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)        # pass the blob through the network and obtain the detections        # and predictions        net.setInput(blob)        detections = net.forward()

为了执行对象跟踪,我们必须首先执行对象检测

如果没有对象跟踪器,那么我们知道我们还没有执行对象检测。

我们创建并通过 SSD 网络传递一个 blob 以检测对象。

接下来,我们继续循环检测以查找属于person类的对象,因为我们的输入视频是人类的赛跑:

        # loop over the detections        for i in np.arange(0, detections.shape[2]):            # extract the confidence (i.e., probability) associated            # with the prediction            confidence = detections[0, 0, i, 2]            # filter out weak detections by requiring a minimum            # confidence            if confidence > args["confidence"]:                # extract the index of the class label from the                # detections list                idx = int(detections[0, 0, i, 1])                label = CLASSES[idx]                # if the class label is not a person, ignore it                if CLASSES[idx] != "person":                    continue

我们开始循环检测,其中我们:

现在我们已经在框架中定位了每个person,让我们实例化我们的跟踪器并绘制我们的初始边界框 + 类标签:

                # compute the (x, y)-coordinates of the bounding box                # for the object                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])                (startX, startY, endX, endY) = box.astype("int")                # construct a dlib rectangle object from the bounding                # box coordinates and start the correlation tracker                t = dlib.correlation_tracker()                rect = dlib.rectangle(startX, startY, endX, endY)                t.start_track(rgb, rect)                # update our set of trackers and corresponding class                # labels                labels.append(label)                trackers.append(t)                # grab the corresponding class label for the detection                # and draw the bounding box                cv2.rectangle(frame, (startX, startY), (endX, endY),                    (0, 255, 0), 2)                cv2.putText(frame, label, (startX, startY - 15),                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

要开始跟踪对象,我们:

因此,在下一个代码块中,我们将处理已经建立跟踪器并且只需要更新位置的情况。 我们在初始检测步骤中执行了两个额外的任务:

如果我们的检测列表的长度大于0,我们就知道我们处于目标跟踪阶段:

    # otherwise, we've already performed detection so let's track    # multiple objects    else:        # loop over each of the trackers        for (t, l) in zip(trackers, labels):            # update the tracker and grab the position of the tracked            # object            t.update(rgb)            pos = t.get_position()            # unpack the position object            startX = int(pos.left())            startY = int(pos.top())            endX = int(pos.right())            endY = int(pos.bottom())            # draw the bounding box from the correlation object tracker            cv2.rectangle(frame, (startX, startY), (endX, endY),                (0, 255, 0), 2)            cv2.putText(frame, l, (startX, startY - 15),                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

在目标跟踪阶段,我们遍历所有trackers和相应的labels。然后我们继续update每个对象的位置。为了更新位置,我们只需传递 rgb 图像。

提取边界框坐标后,我们可以为每个被跟踪对象绘制一个边界框rectangle和label。

帧处理循环中的其余步骤涉及写入输出视频(如有必要)并显示结果:

    # check to see if we should write the frame to disk    if writer is not None:        writer.write(frame)    # show the output frame    cv2.imshow("Frame", frame)    key = cv2.waitKey(1) & 0xFF    # if the `q` key was pressed, break from the loop    if key == ord("q"):        break    # update the FPS counter    fps.update()

在这里,我们:

剩下的步骤是在终端打印FPS信息并释放指针:

# stop the timer and display FPS informationfps.stop()print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))# check to see if we need to release the video writer pointerif writer is not None:    writer.release()# do a bit of cleanupcv2.destroyAllWindows()vs.release()

让我们评估准确性和性能。打开终端并执行以下命令:

$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \    --video race.mp4 --output race_output_slow.avi[INFO] loading model...[INFO] starting video stream...[INFO] elapsed time: 24.51[INFO] approx. FPS: 13.87

看来我们的多目标跟踪器起作用了!

但正如你所看到的,我们只获得了约13帧/秒。

对于某些应用程序来说,这个FPS可能已经足够了——然而,如果你需要更快的FPS,我建议你看看下面我们更高效的dlib多对象跟踪器。其次,要明白跟踪的准确性并不完美。

4.快速、高效的 dlib 多对象跟踪实现

如果您运行上一节中的 dlib 多对象跟踪脚本并同时打开系统的监视器,您会注意到只使用了处理器的一个内核。

如果您运行上一节中的 dlib 多对象跟踪脚本并同时打开系统的活动监视器,您会注意到只使用了处理器的一个内核。

利用进程使我们的操作系统能够执行更好的进程调度,将进程映射到我们机器上的特定处理器内核(大多数现代操作系统能够以并行方式有效地调度使用大量 CPU 的进程)。

继续打开 mutli_object_tracking_fast.py 并插入以下代码:

# import the necessary packagesfrom imutils.video import FPSimport multiprocessingimport numpy as npimport argparseimport imutilsimport dlibimport cv2

我们将使用 Python Process 类来生成一个新进程——每个新进程都独立于原始进程。

为了生成这个进程,我们需要提供一个 Python 可以调用的函数,然后 Python 将使用该函数并创建一个全新的进程并执行它:

def start_tracker(box, label, rgb, inputQueue, outputQueue):    # construct a dlib rectangle object from the bounding box    # coordinates and then start the correlation tracker    t = dlib.correlation_tracker()    rect = dlib.rectangle(box[0], box[1], box[2], box[3])    t.start_track(rgb, rect)

start_tracker 的前三个参数包括:

请记住Python多处理是如何工作的——Python将调用这个函数,然后创建一个全新的解释器来执行其中的代码。因此,每个生成的start_tracker进程都将独立于它的父进程。为了与Python驱动程序脚本通信,我们需要利用管道或队列(Pipes and Queues)。这两种类型的对象都是线程/进程安全的,使用锁和信号量来完成。

本质上,我们正在创建一个简单的生产者/消费者关系:

我决定在这篇文章中使用 Queue 对象;但是,请记住,如果您愿意,也可以使用Pipe

现在让我们开始一个无限循环,它将在进程中运行:

    # loop indefinitely -- this function will be called as a daemon    # process so we don't need to worry about joining it    while True:        # attempt to grab the next frame from the input queue        rgb = inputQueue.get()        # if there was an entry in our queue, process it        if rgb is not None:            # update the tracker and grab the position of the tracked            # object            t.update(rgb)            pos = t.get_position()            # unpack the position object            startX = int(pos.left())            startY = int(pos.top())            endX = int(pos.right())            endY = int(pos.bottom())            # add the label + bounding box coordinates to the output            # queue            outputQueue.put((label, (startX, startY, endX, endY)))

我们在这里无限循环——这个函数将作为守护进程调用,所以我们不需要担心加入它。

首先,我们将尝试从 inputQueue 中抓取一个新帧。如果帧不为空,我们将抓取帧,然后更新对象跟踪器,让我们获得更新后的边界框坐标。

最后,我们将标签和边界框写入 outputQueue,以便父进程可以在脚本的主循环中使用它们。

回到父进程,我们将解析命令行参数:

# construct the argument parser and parse the argumentsap = argparse.ArgumentParser()ap.add_argument("-p", "--prototxt", required=True,    help="path to Caffe 'deploy' prototxt file")ap.add_argument("-m", "--model", required=True,    help="path to Caffe pre-trained model")ap.add_argument("-v", "--video", required=True,    help="path to input video file")ap.add_argument("-o", "--output", type=str,    help="path to optional output video file")ap.add_argument("-c", "--confidence", type=float, default=0.2,    help="minimum probability to filter weak detections")args = vars(ap.parse_args())

此脚本的命令行参数与我们较慢的非多处理脚本完全相同。

让我们初始化我们的输入和输出队列:

# initialize our lists of queues -- both input queue and output queue# for *every* object that we will be trackinginputQueues = []outputQueues = []

这些队列将保存我们正在跟踪的对象。生成的每个进程都需要两个 Queue 对象:

下一个代码块与我们之前的脚本相同:

# initialize the list of class labels MobileNet SSD was trained to# detectCLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",    "sofa", "train", "tvmonitor"]# load our serialized model from diskprint("[INFO] loading model...")net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])# initialize the video stream and output video writerprint("[INFO] starting video stream...")vs = cv2.VideoCapture(args["video"])writer = None# start the frames per second throughput estimatorfps = FPS().start()

我们定义模型的 CLASSES 并加载模型本身。

现在让我们开始循环视频流中的帧:

# loop over frames from the video file streamwhile True:    # grab the next frame from the video file    (grabbed, frame) = vs.read()    # check to see if we have reached the end of the video file    if frame is None:        break    # resize the frame for faster processing and then convert the    # frame from BGR to RGB ordering (dlib needs RGB ordering)    frame = imutils.resize(frame, width=600)    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)    # if we are supposed to be writing a video to disk, initialize    # the writer    if args["output"] is not None and writer is None:        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        writer = cv2.VideoWriter(args["output"], fourcc, 30,            (frame.shape[1], frame.shape[0]), True)

现在让我们处理没有 inputQueues 的情况:

    # if our list of queues is empty then we know we have yet to    # create our first object tracker    if len(inputQueues) == 0:        # grab the frame dimensions and convert the frame to a blob        (h, w) = frame.shape[:2]        blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)        # pass the blob through the network and obtain the detections        # and predictions        net.setInput(blob)        detections = net.forward()        # loop over the detections        for i in np.arange(0, detections.shape[2]):            # extract the confidence (i.e., probability) associated            # with the prediction            confidence = detections[0, 0, i, 2]            # filter out weak detections by requiring a minimum            # confidence            if confidence > args["confidence"]:                # extract the index of the class label from the                # detections list                idx = int(detections[0, 0, i, 1])                label = CLASSES[idx]                # if the class label is not a person, ignore it                if CLASSES[idx] != "person":                    continue

如果没有 inputQueues,那么我们需要在对象跟踪之前应用对象检测。 我们应用对象检测,然后继续循环。我们获取置信度值并过滤掉弱检测。 如果我们的置信度满足我们的命令行参数建立的阈值,我们会考虑检测,但我们会通过类标签进一步过滤掉它。在这种情况下,我们只寻找person对象。 假设我们找到了一个person,我们将创建队列和生成跟踪进程:

                # compute the (x, y)-coordinates of the bounding box                # for the object                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])                (startX, startY, endX, endY) = box.astype("int")                bb = (startX, startY, endX, endY)                # create two brand new input and output queues,                # respectively                iq = multiprocessing.Queue()                oq = multiprocessing.Queue()                inputQueues.append(iq)                outputQueues.append(oq)                # spawn a daemon process for a new object tracker                p = multiprocessing.Process(                    target=start_tracker,                    args=(bb, label, rgb, iq, oq))                p.daemon = True                p.start()                # grab the corresponding class label for the detection                # and draw the bounding box                cv2.rectangle(frame, (startX, startY), (endX, endY),                    (0, 255, 0), 2)                cv2.putText(frame, label, (startX, startY - 15),                    cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

我们首先计算边界框坐标。从那里我们创建两个新队列 iq 和 oq,分别将它们附加到 inputQueues 和 outputQueues。我们生成一个新的 start_tracker 进程,传递边界框、标签、rgb 图像和 iq + oq。

我们还绘制了检测到的对象的边界框rectangle和类标签label。

否则,我们已经执行了对象检测,因此我们需要将每个 dlib 对象跟踪器应用于帧:

    # otherwise, we've already performed detection so let's track    # multiple objects    else:        # loop over each of our input ques and add the input RGB        # frame to it, enabling us to update each of the respective        # object trackers running in separate processes        for iq in inputQueues:            iq.put(rgb)        # loop over each of the output queues        for oq in outputQueues:            # grab the updated bounding box coordinates for the            # object -- the .get method is a blocking operation so            # this will pause our execution until the respective            # process finishes the tracking update            (label, (startX, startY, endX, endY)) = oq.get()            # draw the bounding box from the correlation object            # tracker            cv2.rectangle(frame, (startX, startY), (endX, endY),                (0, 255, 0), 2)            cv2.putText(frame, label, (startX, startY - 15),                cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)

遍历每个 inputQueues ,我们将 rgb 图像添加到它们。然后我们遍历每个outputQueues,从每个独立的对象跟踪器获取边界框坐标。最后,我们绘制边界框+关联的类标签label。

    # check to see if we should write the frame to disk    if writer is not None:        writer.write(frame)    # show the output frame    cv2.imshow("Frame", frame)    key = cv2.waitKey(1) & 0xFF    # if the `q` key was pressed, break from the loop    if key == ord("q"):        break    # update the FPS counter    fps.update()# stop the timer and display FPS informationfps.stop()print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))# check to see if we need to release the video writer pointerif writer is not None:    writer.release()# do a bit of cleanupcv2.destroyAllWindows()vs.release()

如有必要,我们将帧写入输出视频,并将帧显示到屏幕。 如果按下q键,我们退出,跳出循环。 如果我们继续处理帧,我们的 FPS 计算器会更新,然后我们再次在 while 循环的开头开始处理。 否则,我们处理完帧,我们显示 FPS 信息 + 释放指针并关闭窗口。

打开终端并执行以下命令:

$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \    --video race.mp4 --output race_output_fast.avi[INFO] loading model...[INFO] starting video stream...[INFO] elapsed time: 14.01[INFO] approx. FPS: 24.26

如您所见,我们更快、更高效的多对象跟踪器以 24 FPS 运行,比我们之前的实现提高了 45% 以上?! 此外,如果您在此脚本运行时打开活动监视器,您将看到更多系统的CPU 正在被使用。 这种加速是通过允许每个 dlib 对象跟踪器在单独的进程中运行来获得的,这反过来又使您的操作系统能够执行更有效的 CPU 资源调度。

5.完整代码

multi_object_tracking_slow.py

# USAGE# python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \# --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4# import the necessary packagesfrom imutils.video import FPSimport numpy as npimport argparseimport imutilsimport dlibimport cv2# construct the argument parser and parse the argumentsap = argparse.ArgumentParser()ap.add_argument("-p", "--prototxt", required=True,help="path to Caffe 'deploy' prototxt file")ap.add_argument("-m", "--model", required=True,help="path to Caffe pre-trained model")# ap.add_argument("-v", "--video", required=True,# help="path to input video file")ap.add_argument("-v", "--video",help="path to input video file")ap.add_argument("-o", "--output", type=str,help="path to optional output video file")ap.add_argument("-c", "--confidence", type=float, default=0.2,help="minimum probability to filter weak detections")args = vars(ap.parse_args())# initialize the list of class labels MobileNet SSD was trained to# detectCLASSES = ["background", "aeroplane", "bicycle", "bird", "boat","bottle", "bus", "car", "cat", "chair", "cow", "diningtable","dog", "horse", "motorbike", "person", "pottedplant", "sheep","sofa", "train", "tvmonitor"]# load our serialized model from diskprint("[INFO] loading model...")net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])# initialize the video stream and output video writerprint("[INFO] starting video stream...")# vs = cv2.VideoCapture(args["video"])vs = cv2.VideoCapture(0)writer = None# initialize the list of object trackers and corresponding class# labelstrackers = []labels = []# start the frames per second throughput estimatorfps = FPS().start()# loop over frames from the video file streamwhile True:# grab the next frame from the video file(grabbed, frame) = vs.read()# check to see if we have reached the end of the video fileif frame is None:break# resize the frame for faster processing and then convert the# frame from BGR to RGB ordering (dlib needs RGB ordering)frame = imutils.resize(frame, width=600)rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)# if we are supposed to be writing a video to disk, initialize# the writerif args["output"] is not None and writer is None:fourcc = cv2.VideoWriter_fourcc(*"MJPG")writer = cv2.VideoWriter(args["output"], fourcc, 30,(frame.shape[1], frame.shape[0]), True)# if there are no object trackers we first need to detect objects# and then create a tracker for each objectif len(trackers) == 0:# grab the frame dimensions and convert the frame to a blob(h, w) = frame.shape[:2]blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)# pass the blob through the network and obtain the detections# and predictionsnet.setInput(blob)detections = net.forward()# loop over the detectionsfor i in np.arange(0, detections.shape[2]):# extract the confidence (i.e., probability) associated# with the predictionconfidence = detections[0, 0, i, 2]# filter out weak detections by requiring a minimum# confidenceif confidence > args["confidence"]:# extract the index of the class label from the# detections listidx = int(detections[0, 0, i, 1])label = CLASSES[idx]# if the class label is not a person, ignore itif CLASSES[idx] != "person":continue# compute the (x, y)-coordinates of the bounding box# for the objectbox = detections[0, 0, i, 3:7] * np.array([w, h, w, h])(startX, startY, endX, endY) = box.astype("int")# construct a dlib rectangle object from the bounding# box coordinates and start the correlation trackert = dlib.correlation_tracker()rect = dlib.rectangle(startX, startY, endX, endY)t.start_track(rgb, rect)# update our set of trackers and corresponding class# labelslabels.append(label)trackers.append(t)# grab the corresponding class label for the detection# and draw the bounding boxcv2.rectangle(frame, (startX, startY), (endX, endY),(0, 255, 0), 2)cv2.putText(frame, label, (startX, startY - 15),cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)# otherwise, we've already performed detection so let's track# multiple objectselse:# loop over each of the trackersfor (t, l) in zip(trackers, labels):# update the tracker and grab the position of the tracked# objectt.update(rgb)pos = t.get_position()# unpack the position objectstartX = int(pos.left())startY = int(pos.top())endX = int(pos.right())endY = int(pos.bottom())# draw the bounding box from the correlation object trackercv2.rectangle(frame, (startX, startY), (endX, endY),(0, 255, 0), 2)cv2.putText(frame, l, (startX, startY - 15),cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)# check to see if we should write the frame to diskif writer is not None:writer.write(frame)# show the output framecv2.imshow("Frame", frame)key = cv2.waitKey(1) & 0xFF# if the `q` key was pressed, break from the loopif key == ord("q"):break# update the FPS counterfps.update()# stop the timer and display FPS informationfps.stop()print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))# check to see if we need to release the video writer pointerif writer is not None:writer.release()# do a bit of cleanupcv2.destroyAllWindows()vs.release()

multi_object_tracking_fast.py

# USAGE# python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \#--model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4# import the necessary packagesfrom imutils.video import FPSimport multiprocessingimport numpy as npimport argparseimport imutilsimport dlibimport cv2def start_tracker(box, label, rgb, inputQueue, outputQueue):# construct a dlib rectangle object from the bounding box# coordinates and then start the correlation trackert = dlib.correlation_tracker()rect = dlib.rectangle(box[0], box[1], box[2], box[3])t.start_track(rgb, rect)# loop indefinitely -- this function will be called as a daemon# process so we don't need to worry about joining itwhile True:# attempt to grab the next frame from the input queuergb = inputQueue.get()# if there was an entry in our queue, process itif rgb is not None:# update the tracker and grab the position of the tracked# objectt.update(rgb)pos = t.get_position()# unpack the position objectstartX = int(pos.left())startY = int(pos.top())endX = int(pos.right())endY = int(pos.bottom())# add the label + bounding box coordinates to the output# queueoutputQueue.put((label, (startX, startY, endX, endY)))# construct the argument parser and parse the argumentsap = argparse.ArgumentParser()ap.add_argument("-p", "--prototxt", required=True,help="path to Caffe 'deploy' prototxt file")ap.add_argument("-m", "--model", required=True,help="path to Caffe pre-trained model")ap.add_argument("-v", "--video", required=True,help="path to input video file")ap.add_argument("-o", "--output", type=str,help="path to optional output video file")ap.add_argument("-c", "--confidence", type=float, default=0.2,help="minimum probability to filter weak detections")args = vars(ap.parse_args())# initialize our list of queues -- both input queue and output queue# for *every* object that we will be trackinginputQueues = []outputQueues = []# initialize the list of class labels MobileNet SSD was trained to# detectCLASSES = ["background", "aeroplane", "bicycle", "bird", "boat","bottle", "bus", "car", "cat", "chair", "cow", "diningtable","dog", "horse", "motorbike", "person", "pottedplant", "sheep","sofa", "train", "tvmonitor"]# load our serialized model from diskprint("[INFO] loading model...")net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])# initialize the video stream and output video writerprint("[INFO] starting video stream...")vs = cv2.VideoCapture(args["video"])writer = None# start the frames per second throughput estimatorfps = FPS().start()# loop over frames from the video file streamwhile True:# grab the next frame from the video file(grabbed, frame) = vs.read()# check to see if we have reached the end of the video fileif frame is None:break# resize the frame for faster processing and then convert the# frame from BGR to RGB ordering (dlib needs RGB ordering)frame = imutils.resize(frame, width=600)rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)# if we are supposed to be writing a video to disk, initialize# the writerif args["output"] is not None and writer is None:fourcc = cv2.VideoWriter_fourcc(*"MJPG")writer = cv2.VideoWriter(args["output"], fourcc, 30,(frame.shape[1], frame.shape[0]), True)# if our list of queues is empty then we know we have yet to# create our first object trackerif len(inputQueues) == 0:# grab the frame dimensions and convert the frame to a blob(h, w) = frame.shape[:2]blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5)# pass the blob through the network and obtain the detections# and predictionsnet.setInput(blob)detections = net.forward()# loop over the detectionsfor i in np.arange(0, detections.shape[2]):# extract the confidence (i.e., probability) associated# with the predictionconfidence = detections[0, 0, i, 2]# filter out weak detections by requiring a minimum# confidenceif confidence > args["confidence"]:# extract the index of the class label from the# detections listidx = int(detections[0, 0, i, 1])label = CLASSES[idx]# if the class label is not a person, ignore itif CLASSES[idx] != "person":continue# compute the (x, y)-coordinates of the bounding box# for the objectbox = detections[0, 0, i, 3:7] * np.array([w, h, w, h])(startX, startY, endX, endY) = box.astype("int")bb = (startX, startY, endX, endY)# create two brand new input and output queues,# respectivelyiq = multiprocessing.Queue()oq = multiprocessing.Queue()inputQueues.append(iq)outputQueues.append(oq)# spawn a daemon process for a new object trackerp = multiprocessing.Process(target=start_tracker,args=(bb, label, rgb, iq, oq))p.daemon = Truep.start()# grab the corresponding class label for the detection# and draw the bounding boxcv2.rectangle(frame, (startX, startY), (endX, endY),(0, 255, 0), 2)cv2.putText(frame, label, (startX, startY - 15),cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)# otherwise, we've already performed detection so let's track# multiple objectselse:# loop over each of our input ques and add the input RGB# frame to it, enabling us to update each of the respective# object trackers running in separate processesfor iq in inputQueues:iq.put(rgb)# loop over each of the output queuesfor oq in outputQueues:# grab the updated bounding box coordinates for the# object -- the .get method is a blocking operation so# this will pause our execution until the respective# process finishes the tracking update(label, (startX, startY, endX, endY)) = oq.get()# draw the bounding box from the correlation object# trackercv2.rectangle(frame, (startX, startY), (endX, endY),(0, 255, 0), 2)cv2.putText(frame, label, (startX, startY - 15),cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)# check to see if we should write the frame to diskif writer is not None:writer.write(frame)# show the output framecv2.imshow("Frame", frame)key = cv2.waitKey(1) & 0xFF# if the `q` key was pressed, break from the loopif key == ord("q"):break# update the FPS counterfps.update()# stop the timer and display FPS informationfps.stop()print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))# check to see if we need to release the video writer pointerif writer is not None:writer.release()# do a bit of cleanupcv2.destroyAllWindows()vs.release()

6.改进和建议

我今天与大家分享的 dlib 多对象跟踪 Python 脚本可以很好地处理较短的视频流;但是,如果您打算将此实现用于长时间运行的生产环境(大约数小时到数天的视频),我建议您进行两项主要改进:

第一个改进是利用进程池,而不是为每个要跟踪的对象生成一个全新的进程。今天在这里介绍的实现为我们需要跟踪的每个对象构建了一个全新的队列Queue和进程Process。

对于今天的目的来说这很好,但考虑一下如果您想跟踪视频中的 50 个对象——这意味着您将生成 50 个进程,每个对象一个。那时,系统管理所有这些进程的开销将破坏 FPS 的任何增加。相反,您可能希望利用进程池。

如果您的系统有 N 个处理器内核,那么您需要创建一个包含 N – 1 个进程的池,将一个内核留给您的操作系统来执行系统操作。这些进程中的每一个都应该执行多个对象跟踪,维护一个对象跟踪器列表,类似于我们今天介绍的第一个多对象跟踪。

这种改进将允许您利用处理器的所有内核,而无需产生许多独立进程的开销。

我要做的第二个改进是清理进程和队列。如果 dlib 将对象报告为“丢失”或“消失”,我们不会从 start_tracker 函数返回,这意味着该进程将在父脚本的生命周期内存活,并且仅在父脚本退出时被终止。

同样,这对于我们今天的目的来说很好,但是如果您打算在生产环境中使用此代码,您应该:

未能执行此清理将导致长时间运行作业的不必要的计算消耗和内存开销。

第三个改进是通过每 N 帧运行一次对象检测器(而不是在开始时只运行一次)来提高跟踪精度。

实际上,我在使用 OpenCV 计数的文章中演示了这一点。它需要更多的逻辑和思考,但会产生更准确的跟踪器。 我选择放弃这个脚本的实现,这样我就可以简明地教你多处理方法。 理想情况下,除了多处理之外,您还可以使用第三个改进。

感谢各位的阅读,以上就是“Python OpenCV如何使用dlib进行多目标跟踪”的内容了,经过本文的学习后,相信大家对Python OpenCV如何使用dlib进行多目标跟踪这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯