Feature Tracker with Motion Estimation
This example demonstrates the capabilities of the FeatureTracker combined with motion estimation. It detects and tracks features between consecutive frames using optical flow. Each feature is assigned a unique ID. The motion of the camera is estimated based on the tracked features, and the estimated motion (e.g., Up, Down, Left, Right, Rotating) is displayed on screen.The Feature Detector example only detects features without estimating motion.Demo
Setup
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the scriptCommand Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
Source code
Python
Python
PythonGitHub
1#!/usr/bin/env python3
2
3import numpy as np
4import cv2
5from collections import deque
6import depthai as dai
7
8
9class CameraMotionEstimator:
10 def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
11 self.last_avg_flow = np.array([0.0, 0.0])
12 self.filter_weight = filter_weight
13 self.motion_threshold = motion_threshold
14 self.rotation_threshold = rotation_threshold
15
16 def estimate_motion(self, feature_paths):
17 most_prominent_motion = "Camera Staying Still"
18 max_magnitude = 0.0
19 avg_flow = np.array([0.0, 0.0])
20 total_rotation = 0.0
21 vanishing_point = np.array([0.0, 0.0])
22 num_features = len(feature_paths)
23
24 print(f"Number of features: {num_features}")
25
26 if num_features == 0:
27 return most_prominent_motion, vanishing_point
28
29 for path in feature_paths.values():
30 if len(path) >= 2:
31 src = np.array([path[-2].x, path[-2].y])
32 dst = np.array([path[-1].x, path[-1].y])
33 avg_flow += dst - src
34 motion_vector = dst + (dst - src)
35 vanishing_point += motion_vector
36 rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
37 total_rotation += rotation
38
39 avg_flow /= num_features
40 avg_rotation = total_rotation / num_features
41 vanishing_point /= num_features
42
43 print(f"Average Flow: {avg_flow}")
44 print(f"Average Rotation: {avg_rotation}")
45
46 avg_flow = (self.filter_weight * self.last_avg_flow +
47 (1 - self.filter_weight) * avg_flow)
48 self.last_avg_flow = avg_flow
49
50 flow_magnitude = np.linalg.norm(avg_flow)
51 rotation_magnitude = abs(avg_rotation)
52
53 if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
54 if abs(avg_flow[0]) > abs(avg_flow[1]):
55 most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
56 else:
57 most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
58 max_magnitude = flow_magnitude
59
60 if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
61 most_prominent_motion = 'Rotating'
62
63 return most_prominent_motion, vanishing_point
64
65
66class FeatureTrackerDrawer:
67
68 lineColor = (200, 0, 200)
69 pointColor = (0, 0, 255)
70 vanishingPointColor = (255, 0, 255) # Violet color for vanishing point
71 circleRadius = 2
72 maxTrackedFeaturesPathLength = 30
73 trackedFeaturesPathLength = 10
74
75 trackedIDs = None
76 trackedFeaturesPath = None
77
78 def trackFeaturePath(self, features):
79
80 newTrackedIDs = set()
81 for currentFeature in features:
82 currentID = currentFeature.id
83 newTrackedIDs.add(currentID)
84
85 if currentID not in self.trackedFeaturesPath:
86 self.trackedFeaturesPath[currentID] = deque()
87
88 path = self.trackedFeaturesPath[currentID]
89 path.append(currentFeature.position)
90
91 while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
92 path.popleft()
93
94 self.trackedFeaturesPath[currentID] = path
95
96 featuresToRemove = set()
97 for oldId in self.trackedIDs:
98 if oldId not in newTrackedIDs:
99 featuresToRemove.add(oldId)
100
101 for id in featuresToRemove:
102 self.trackedFeaturesPath.pop(id)
103
104 self.trackedIDs = newTrackedIDs
105
106 def drawVanishingPoint(self, img, vanishing_point):
107 cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
108
109 # Define color mapping for directions
110 direction_colors = {
111 "Up": (0, 255, 255), # Yellow
112 "Down": (0, 255, 0), # Green
113 "Left": (255, 0, 0), # Blue
114 "Right": (0, 0, 255), # Red
115 }
116
117 def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
118
119 # Get the appropriate point color based on the prominent motion
120 if prominent_motion in self.direction_colors:
121 point_color = self.direction_colors[prominent_motion]
122 else:
123 point_color = self.pointColor
124
125 for featurePath in self.trackedFeaturesPath.values():
126 path = featurePath
127
128 for j in range(len(path) - 1):
129 src = (int(path[j].x), int(path[j].y))
130 dst = (int(path[j + 1].x), int(path[j + 1].y))
131 cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
132
133 j = len(path) - 1
134 cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
135
136 # Draw the direction text on the image
137 if prominent_motion:
138 font = cv2.FONT_HERSHEY_SIMPLEX
139 font_scale = 1
140 font_thickness = 2
141 text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
142 text_x = (img.shape[1] - text_size[0]) // 2
143 text_y = text_size[1] + 20 # 20 pixels from the top
144
145 # Get the appropriate color based on the prominent motion
146 text_color = self.direction_colors.get(prominent_motion, (255, 255, 255)) # Default to white
147
148 # Draw the text
149 cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
150
151
152 # Draw vanishing point if provided
153 if vanishing_point is not None:
154 self.drawVanishingPoint(img, vanishing_point)
155
156 def __init__(self, windowName):
157 self.windowName = windowName
158 cv2.namedWindow(windowName)
159 self.trackedIDs = set()
160 self.trackedFeaturesPath = dict()
161
162def create_pipeline():
163 pipeline = dai.Pipeline()
164
165 # Create a MonoCamera node and set its properties
166 mono_left = pipeline.create(dai.node.MonoCamera)
167 mono_left.setCamera("left")
168 mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
169 mono_left.setFps(15)
170
171 # Create a FeatureTracker node
172 feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
173
174 # Create XLinkOut nodes for output streams
175 xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
176 xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
177
178 # Set stream names
179 xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
180 xout_passthrough_left.setStreamName("passthroughLeft")
181
182 # Allocate resources for improved performance
183 num_shaves = 2
184 num_memory_slices = 2
185 feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
186
187 # Link the nodes
188 mono_left.out.link(feature_tracker_left.inputImage)
189 feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
190 feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
191
192 return pipeline
193
194
195if __name__ == '__main__':
196 pipeline = create_pipeline()
197 with dai.Device(pipeline) as device:
198 output_features_left_queue = device.getOutputQueue(
199 "trackedFeaturesLeft", maxSize=4, blocking=False)
200 passthrough_image_left_queue = device.getOutputQueue(
201 "passthroughLeft", maxSize=4, blocking=False)
202
203 left_window_name = "Left"
204 left_feature_drawer = FeatureTrackerDrawer(left_window_name)
205 camera_estimator_left = CameraMotionEstimator(
206 filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
207
208 while True:
209 in_passthrough_frame_left = passthrough_image_left_queue.get()
210 passthrough_frame_left = in_passthrough_frame_left.getFrame()
211 left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
212
213 tracked_features_left = output_features_left_queue.get().trackedFeatures
214 motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
215 left_feature_drawer.trackedFeaturesPath)
216
217 left_feature_drawer.trackFeaturePath(tracked_features_left)
218 left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
219
220 print("Motions:", motions_left)
221 cv2.imshow(left_window_name, left_frame)
222
223 if cv2.waitKey(1) == ord('q'):
224 break
Pipeline
Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.