Feature Tracker with Motion Estimation
Demo
This example requires the DepthAI v3 API, see installation instructions.Source code
Python
Python
PythonGitHub
1#!/usr/bin/env python3
2
3import numpy as np
4import cv2
5from collections import deque
6import depthai as dai
7
8
9class CameraMotionEstimator:
10 def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
11 self.last_avg_flow = np.array([0.0, 0.0])
12 self.filter_weight = filter_weight
13 self.motion_threshold = motion_threshold
14 self.rotation_threshold = rotation_threshold
15
16 def estimate_motion(self, feature_paths):
17 most_prominent_motion = "Camera Staying Still"
18 max_magnitude = 0.0
19 avg_flow = np.array([0.0, 0.0])
20 total_rotation = 0.0
21 vanishing_point = np.array([0.0, 0.0])
22 num_features = len(feature_paths)
23
24 print(f"Number of features: {num_features}")
25
26 if num_features == 0:
27 return most_prominent_motion, vanishing_point
28
29 for path in feature_paths.values():
30 if len(path) >= 2:
31 src = np.array([path[-2].x, path[-2].y])
32 dst = np.array([path[-1].x, path[-1].y])
33 avg_flow += dst - src
34 motion_vector = dst + (dst - src)
35 vanishing_point += motion_vector
36 rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
37 total_rotation += rotation
38
39 avg_flow /= num_features
40 avg_rotation = total_rotation / num_features
41 vanishing_point /= num_features
42
43 print(f"Average Flow: {avg_flow}")
44 print(f"Average Rotation: {avg_rotation}")
45
46 avg_flow = (self.filter_weight * self.last_avg_flow +
47 (1 - self.filter_weight) * avg_flow)
48 self.last_avg_flow = avg_flow
49
50 flow_magnitude = np.linalg.norm(avg_flow)
51 rotation_magnitude = abs(avg_rotation)
52
53 if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
54 if abs(avg_flow[0]) > abs(avg_flow[1]):
55 most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
56 else:
57 most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
58 max_magnitude = flow_magnitude
59
60 if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
61 most_prominent_motion = 'Rotating'
62
63 return most_prominent_motion, vanishing_point
64
65
66class FeatureTrackerDrawer:
67
68 lineColor = (200, 0, 200)
69 pointColor = (0, 0, 255)
70 vanishingPointColor = (255, 0, 255) # Violet color for vanishing point
71 circleRadius = 2
72 maxTrackedFeaturesPathLength = 30
73 trackedFeaturesPathLength = 10
74
75 trackedIDs = None
76 trackedFeaturesPath = None
77
78 def trackFeaturePath(self, features):
79
80 newTrackedIDs = set()
81 for currentFeature in features:
82 currentID = currentFeature.id
83 newTrackedIDs.add(currentID)
84
85 if currentID not in self.trackedFeaturesPath:
86 self.trackedFeaturesPath[currentID] = deque()
87
88 path = self.trackedFeaturesPath[currentID]
89 path.append(currentFeature.position)
90
91 while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
92 path.popleft()
93
94 self.trackedFeaturesPath[currentID] = path
95
96 featuresToRemove = set()
97 for oldId in self.trackedIDs:
98 if oldId not in newTrackedIDs:
99 featuresToRemove.add(oldId)
100
101 for id in featuresToRemove:
102 self.trackedFeaturesPath.pop(id)
103
104 self.trackedIDs = newTrackedIDs
105
106 def drawVanishingPoint(self, img, vanishing_point):
107 cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
108
109 # Define color mapping for directions
110 direction_colors = {
111 "Up": (0, 255, 255), # Yellow
112 "Down": (0, 255, 0), # Green
113 "Left": (255, 0, 0), # Blue
114 "Right": (0, 0, 255), # Red
115 }
116
117 def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
118
119 # Get the appropriate point color based on the prominent motion
120 if prominent_motion in self.direction_colors:
121 point_color = self.direction_colors[prominent_motion]
122 else:
123 point_color = self.pointColor
124
125 for featurePath in self.trackedFeaturesPath.values():
126 path = featurePath
127
128 for j in range(len(path) - 1):
129 src = (int(path[j].x), int(path[j].y))
130 dst = (int(path[j + 1].x), int(path[j + 1].y))
131 cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
132
133 j = len(path) - 1
134 cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
135
136 # Draw the direction text on the image
137 if prominent_motion:
138 font = cv2.FONT_HERSHEY_SIMPLEX
139 font_scale = 1
140 font_thickness = 2
141 text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
142 text_x = (img.shape[1] - text_size[0]) // 2
143 text_y = text_size[1] + 20 # 20 pixels from the top
144
145 # Get the appropriate color based on the prominent motion
146 text_color = self.direction_colors.get(prominent_motion, (255, 255, 255)) # Default to white
147
148 # Draw the text
149 cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
150
151
152 # Draw vanishing point if provided
153 if vanishing_point is not None:
154 self.drawVanishingPoint(img, vanishing_point)
155
156 def __init__(self, windowName):
157 self.windowName = windowName
158 cv2.namedWindow(windowName)
159 self.trackedIDs = set()
160 self.trackedFeaturesPath = dict()
161
162def create_pipeline():
163 pipeline = dai.Pipeline()
164
165 # Create a MonoCamera node and set its properties
166 mono_left = pipeline.create(dai.node.MonoCamera)
167 mono_left.setCamera("left")
168 mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
169 mono_left.setFps(15)
170
171 # Create a FeatureTracker node
172 feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
173
174 # Create XLinkOut nodes for output streams
175 xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
176 xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
177
178 # Set stream names
179 xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
180 xout_passthrough_left.setStreamName("passthroughLeft")
181
182 # Allocate resources for improved performance
183 num_shaves = 2
184 num_memory_slices = 2
185 feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
186
187 # Link the nodes
188 mono_left.out.link(feature_tracker_left.inputImage)
189 feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
190 feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
191
192 return pipeline
193
194
195if __name__ == '__main__':
196 pipeline = create_pipeline()
197 with dai.Device(pipeline) as device:
198 output_features_left_queue = device.getOutputQueue(
199 "trackedFeaturesLeft", maxSize=4, blocking=False)
200 passthrough_image_left_queue = device.getOutputQueue(
201 "passthroughLeft", maxSize=4, blocking=False)
202
203 left_window_name = "Left"
204 left_feature_drawer = FeatureTrackerDrawer(left_window_name)
205 camera_estimator_left = CameraMotionEstimator(
206 filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
207
208 while True:
209 in_passthrough_frame_left = passthrough_image_left_queue.get()
210 passthrough_frame_left = in_passthrough_frame_left.getFrame()
211 left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
212
213 tracked_features_left = output_features_left_queue.get().trackedFeatures
214 motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
215 left_feature_drawer.trackedFeaturesPath)
216
217 left_feature_drawer.trackFeaturePath(tracked_features_left)
218 left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
219
220 print("Motions:", motions_left)
221 cv2.imshow(left_window_name, left_frame)
222
223 if cv2.waitKey(1) == ord('q'):
224 breakPipeline
Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.