Feature Tracker
Example shows capabilities of FeatureTracker. It detects features and tracks them between consecutive frames using optical flow by assigning unique ID to matching features. Feature Detector example only detects features.Demo
Setup
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the scriptCommand Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
Source code
Python
C++
Python
PythonGitHub
1#!/usr/bin/env python3
2
3import cv2
4import depthai as dai
5from collections import deque
6
7class FeatureTrackerDrawer:
8
9 lineColor = (200, 0, 200)
10 pointColor = (0, 0, 255)
11 circleRadius = 2
12 maxTrackedFeaturesPathLength = 30
13 # for how many frames the feature is tracked
14 trackedFeaturesPathLength = 10
15
16 trackedIDs = None
17 trackedFeaturesPath = None
18
19 def onTrackBar(self, val):
20 FeatureTrackerDrawer.trackedFeaturesPathLength = val
21 pass
22
23 def trackFeaturePath(self, features):
24
25 newTrackedIDs = set()
26 for currentFeature in features:
27 currentID = currentFeature.id
28 newTrackedIDs.add(currentID)
29
30 if currentID not in self.trackedFeaturesPath:
31 self.trackedFeaturesPath[currentID] = deque()
32
33 path = self.trackedFeaturesPath[currentID]
34
35 path.append(currentFeature.position)
36 while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
37 path.popleft()
38
39 self.trackedFeaturesPath[currentID] = path
40
41 featuresToRemove = set()
42 for oldId in self.trackedIDs:
43 if oldId not in newTrackedIDs:
44 featuresToRemove.add(oldId)
45
46 for id in featuresToRemove:
47 self.trackedFeaturesPath.pop(id)
48
49 self.trackedIDs = newTrackedIDs
50
51 def drawFeatures(self, img):
52
53 cv2.setTrackbarPos(self.trackbarName, self.windowName, FeatureTrackerDrawer.trackedFeaturesPathLength)
54
55 for featurePath in self.trackedFeaturesPath.values():
56 path = featurePath
57
58 for j in range(len(path) - 1):
59 src = (int(path[j].x), int(path[j].y))
60 dst = (int(path[j + 1].x), int(path[j + 1].y))
61 cv2.line(img, src, dst, self.lineColor, 1, cv2.LINE_AA, 0)
62 j = len(path) - 1
63 cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, self.pointColor, -1, cv2.LINE_AA, 0)
64
65 def __init__(self, trackbarName, windowName):
66 self.trackbarName = trackbarName
67 self.windowName = windowName
68 cv2.namedWindow(windowName)
69 cv2.createTrackbar(trackbarName, windowName, FeatureTrackerDrawer.trackedFeaturesPathLength, FeatureTrackerDrawer.maxTrackedFeaturesPathLength, self.onTrackBar)
70 self.trackedIDs = set()
71 self.trackedFeaturesPath = dict()
72
73
74# Create pipeline
75pipeline = dai.Pipeline()
76
77# Define sources and outputs
78monoLeft = pipeline.create(dai.node.MonoCamera)
79monoRight = pipeline.create(dai.node.MonoCamera)
80featureTrackerLeft = pipeline.create(dai.node.FeatureTracker)
81featureTrackerRight = pipeline.create(dai.node.FeatureTracker)
82
83xoutPassthroughFrameLeft = pipeline.create(dai.node.XLinkOut)
84xoutTrackedFeaturesLeft = pipeline.create(dai.node.XLinkOut)
85xoutPassthroughFrameRight = pipeline.create(dai.node.XLinkOut)
86xoutTrackedFeaturesRight = pipeline.create(dai.node.XLinkOut)
87xinTrackedFeaturesConfig = pipeline.create(dai.node.XLinkIn)
88
89xoutPassthroughFrameLeft.setStreamName("passthroughFrameLeft")
90xoutTrackedFeaturesLeft.setStreamName("trackedFeaturesLeft")
91xoutPassthroughFrameRight.setStreamName("passthroughFrameRight")
92xoutTrackedFeaturesRight.setStreamName("trackedFeaturesRight")
93xinTrackedFeaturesConfig.setStreamName("trackedFeaturesConfig")
94
95# Properties
96monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
97monoLeft.setCamera("left")
98monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
99monoRight.setCamera("right")
100
101# Linking
102monoLeft.out.link(featureTrackerLeft.inputImage)
103featureTrackerLeft.passthroughInputImage.link(xoutPassthroughFrameLeft.input)
104featureTrackerLeft.outputFeatures.link(xoutTrackedFeaturesLeft.input)
105xinTrackedFeaturesConfig.out.link(featureTrackerLeft.inputConfig)
106
107monoRight.out.link(featureTrackerRight.inputImage)
108featureTrackerRight.passthroughInputImage.link(xoutPassthroughFrameRight.input)
109featureTrackerRight.outputFeatures.link(xoutTrackedFeaturesRight.input)
110xinTrackedFeaturesConfig.out.link(featureTrackerRight.inputConfig)
111
112# By default the least mount of resources are allocated
113# increasing it improves performance
114numShaves = 2
115numMemorySlices = 2
116featureTrackerLeft.setHardwareResources(numShaves, numMemorySlices)
117featureTrackerRight.setHardwareResources(numShaves, numMemorySlices)
118
119featureTrackerConfig = featureTrackerRight.initialConfig.get()
120print("Press 's' to switch between Lucas-Kanade optical flow and hardware accelerated motion estimation!")
121
122# Connect to device and start pipeline
123with dai.Device(pipeline) as device:
124
125 # Output queues used to receive the results
126 passthroughImageLeftQueue = device.getOutputQueue("passthroughFrameLeft", 8, False)
127 outputFeaturesLeftQueue = device.getOutputQueue("trackedFeaturesLeft", 8, False)
128 passthroughImageRightQueue = device.getOutputQueue("passthroughFrameRight", 8, False)
129 outputFeaturesRightQueue = device.getOutputQueue("trackedFeaturesRight", 8, False)
130
131 inputFeatureTrackerConfigQueue = device.getInputQueue("trackedFeaturesConfig")
132
133 leftWindowName = "left"
134 leftFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", leftWindowName)
135
136 rightWindowName = "right"
137 rightFeatureDrawer = FeatureTrackerDrawer("Feature tracking duration (frames)", rightWindowName)
138
139 while True:
140 inPassthroughFrameLeft = passthroughImageLeftQueue.get()
141 passthroughFrameLeft = inPassthroughFrameLeft.getFrame()
142 leftFrame = cv2.cvtColor(passthroughFrameLeft, cv2.COLOR_GRAY2BGR)
143
144 inPassthroughFrameRight = passthroughImageRightQueue.get()
145 passthroughFrameRight = inPassthroughFrameRight.getFrame()
146 rightFrame = cv2.cvtColor(passthroughFrameRight, cv2.COLOR_GRAY2BGR)
147
148 trackedFeaturesLeft = outputFeaturesLeftQueue.get().trackedFeatures
149 leftFeatureDrawer.trackFeaturePath(trackedFeaturesLeft)
150 leftFeatureDrawer.drawFeatures(leftFrame)
151
152 trackedFeaturesRight = outputFeaturesRightQueue.get().trackedFeatures
153 rightFeatureDrawer.trackFeaturePath(trackedFeaturesRight)
154 rightFeatureDrawer.drawFeatures(rightFrame)
155
156 # Show the frame
157 cv2.imshow(leftWindowName, leftFrame)
158 cv2.imshow(rightWindowName, rightFrame)
159
160 key = cv2.waitKey(1)
161 if key == ord('q'):
162 break
163 elif key == ord('s'):
164 if featureTrackerConfig.motionEstimator.type == dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW:
165 featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.HW_MOTION_ESTIMATION
166 print("Switching to hardware accelerated motion estimation")
167 else:
168 featureTrackerConfig.motionEstimator.type = dai.FeatureTrackerConfig.MotionEstimator.Type.LUCAS_KANADE_OPTICAL_FLOW
169 print("Switching to Lucas-Kanade optical flow")
170
171 cfg = dai.FeatureTrackerConfig()
172 cfg.set(featureTrackerConfig)
173 inputFeatureTrackerConfigQueue.send(cfg)
Pipeline
Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.