Feature Tracker with Motion Estimation
This example demonstrates the capabilities of the FeatureTracker combined with motion estimation. It detects and tracks features between consecutive frames using optical flow. Each feature is assigned a unique ID. The motion of the camera is estimated based on the tracked features, and the estimated motion (e.g., Up, Down, Left, Right, Rotating) is displayed on screen.The Feature Detector example only detects features without estimating motion.Demo
Setup
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the scriptCommand Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
Source code
Python
Python
PythonGitHub
1import numpy as np
2import cv2
3from collections import deque
4import depthai as dai
5
6
7class CameraMotionEstimator:
8 def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
9 self.last_avg_flow = np.array([0.0, 0.0])
10 self.filter_weight = filter_weight
11 self.motion_threshold = motion_threshold
12 self.rotation_threshold = rotation_threshold
13
14 def estimate_motion(self, feature_paths):
15 most_prominent_motion = "Camera Staying Still"
16 max_magnitude = 0.0
17 avg_flow = np.array([0.0, 0.0])
18 total_rotation = 0.0
19 vanishing_point = np.array([0.0, 0.0])
20 num_features = len(feature_paths)
21
22 print(f"Number of features: {num_features}")
23
24 if num_features == 0:
25 return most_prominent_motion, vanishing_point
26
27 for path in feature_paths.values():
28 if len(path) >= 2:
29 src = np.array([path[-2].x, path[-2].y])
30 dst = np.array([path[-1].x, path[-1].y])
31 avg_flow += dst - src
32 motion_vector = dst + (dst - src)
33 vanishing_point += motion_vector
34 rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
35 total_rotation += rotation
36
37 avg_flow /= num_features
38 avg_rotation = total_rotation / num_features
39 vanishing_point /= num_features
40
41 print(f"Average Flow: {avg_flow}")
42 print(f"Average Rotation: {avg_rotation}")
43
44 avg_flow = (self.filter_weight * self.last_avg_flow +
45 (1 - self.filter_weight) * avg_flow)
46 self.last_avg_flow = avg_flow
47
48 flow_magnitude = np.linalg.norm(avg_flow)
49 rotation_magnitude = abs(avg_rotation)
50
51 if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
52 if abs(avg_flow[0]) > abs(avg_flow[1]):
53 most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
54 else:
55 most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
56 max_magnitude = flow_magnitude
57
58 if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
59 most_prominent_motion = 'Rotating'
60
61 return most_prominent_motion, vanishing_point
62
63
64class FeatureTrackerDrawer:
65
66 lineColor = (200, 0, 200)
67 pointColor = (0, 0, 255)
68 vanishingPointColor = (255, 0, 255) # Violet color for vanishing point
69 circleRadius = 2
70 maxTrackedFeaturesPathLength = 30
71 trackedFeaturesPathLength = 10
72
73 trackedIDs = None
74 trackedFeaturesPath = None
75
76 def trackFeaturePath(self, features):
77
78 newTrackedIDs = set()
79 for currentFeature in features:
80 currentID = currentFeature.id
81 newTrackedIDs.add(currentID)
82
83 if currentID not in self.trackedFeaturesPath:
84 self.trackedFeaturesPath[currentID] = deque()
85
86 path = self.trackedFeaturesPath[currentID]
87 path.append(currentFeature.position)
88
89 while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
90 path.popleft()
91
92 self.trackedFeaturesPath[currentID] = path
93
94 featuresToRemove = set()
95 for oldId in self.trackedIDs:
96 if oldId not in newTrackedIDs:
97 featuresToRemove.add(oldId)
98
99 for id in featuresToRemove:
100 self.trackedFeaturesPath.pop(id)
101
102 self.trackedIDs = newTrackedIDs
103
104 def drawVanishingPoint(self, img, vanishing_point):
105 cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
106
107 # Define color mapping for directions
108 direction_colors = {
109 "Up": (0, 255, 255), # Yellow
110 "Down": (0, 255, 0), # Green
111 "Left": (255, 0, 0), # Blue
112 "Right": (0, 0, 255), # Red
113 }
114
115 def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
116
117 # Get the appropriate point color based on the prominent motion
118 if prominent_motion in self.direction_colors:
119 point_color = self.direction_colors[prominent_motion]
120 else:
121 point_color = self.pointColor
122
123 for featurePath in self.trackedFeaturesPath.values():
124 path = featurePath
125
126 for j in range(len(path) - 1):
127 src = (int(path[j].x), int(path[j].y))
128 dst = (int(path[j + 1].x), int(path[j + 1].y))
129 cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
130
131 j = len(path) - 1
132 cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
133
134 # Draw the direction text on the image
135 if prominent_motion:
136 font = cv2.FONT_HERSHEY_SIMPLEX
137 font_scale = 1
138 font_thickness = 2
139 text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
140 text_x = (img.shape[1] - text_size[0]) // 2
141 text_y = text_size[1] + 20 # 20 pixels from the top
142
143 # Get the appropriate color based on the prominent motion
144 text_color = self.direction_colors.get(prominent_motion, (255, 255, 255)) # Default to white
145
146 # Draw the text
147 cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
148
149
150 # Draw vanishing point if provided
151 if vanishing_point is not None:
152 self.drawVanishingPoint(img, vanishing_point)
153
154 def __init__(self, windowName):
155 self.windowName = windowName
156 cv2.namedWindow(windowName)
157 self.trackedIDs = set()
158 self.trackedFeaturesPath = dict()
159
160def create_pipeline():
161 pipeline = dai.Pipeline()
162
163 # Create a MonoCamera node and set its properties
164 mono_left = pipeline.create(dai.node.MonoCamera)
165 mono_left.setCamera("left")
166 mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
167 mono_left.setFps(15)
168
169 # Create a FeatureTracker node
170 feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
171
172 # Create XLinkOut nodes for output streams
173 xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
174 xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
175
176 # Set stream names
177 xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
178 xout_passthrough_left.setStreamName("passthroughLeft")
179
180 # Allocate resources for improved performance
181 num_shaves = 2
182 num_memory_slices = 2
183 feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
184
185 # Link the nodes
186 mono_left.out.link(feature_tracker_left.inputImage)
187 feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
188 feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
189
190 return pipeline
191
192
193if __name__ == '__main__':
194 pipeline = create_pipeline()
195 with dai.Device(pipeline) as device:
196 output_features_left_queue = device.getOutputQueue(
197 "trackedFeaturesLeft", maxSize=4, blocking=False)
198 passthrough_image_left_queue = device.getOutputQueue(
199 "passthroughLeft", maxSize=4, blocking=False)
200
201 left_window_name = "Left"
202 left_feature_drawer = FeatureTrackerDrawer(left_window_name)
203 camera_estimator_left = CameraMotionEstimator(
204 filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
205
206 while True:
207 in_passthrough_frame_left = passthrough_image_left_queue.get()
208 passthrough_frame_left = in_passthrough_frame_left.getFrame()
209 left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
210
211 tracked_features_left = output_features_left_queue.get().trackedFeatures
212 motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
213 left_feature_drawer.trackedFeaturesPath)
214
215 left_feature_drawer.trackFeaturePath(tracked_features_left)
216 left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
217
218 print("Motions:", motions_left)
219 cv2.imshow(left_window_name, left_frame)
220
221 if cv2.waitKey(1) == ord('q'):
222 break
Pipeline
Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.