DepthAI Tutorials
DepthAI API References

ON THIS PAGE

  • Feature Tracker with Motion Estimation
  • Demo
  • Setup
  • Source code

Feature Tracker with Motion Estimation

This example demonstrates the capabilities of the FeatureTracker combined with motion estimation. It detects and tracks features between consecutive frames using optical flow. Each feature is assigned a unique ID. The motion of the camera is estimated based on the tracked features, and the estimated motion (e.g., Up, Down, Left, Right, Rotating) is displayed on screen.The Feature Detector example only detects features without estimating motion.

Demo

:width: 100% :align: center

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
Command Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
For additional information, please follow the installation guide.

Source code

Python
Python
GitHub
1import numpy as np
2import cv2
3from collections import deque
4import depthai as dai
5
6
7class CameraMotionEstimator:
8    def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
9        self.last_avg_flow = np.array([0.0, 0.0])
10        self.filter_weight = filter_weight
11        self.motion_threshold = motion_threshold
12        self.rotation_threshold = rotation_threshold
13
14    def estimate_motion(self, feature_paths):
15        most_prominent_motion = "Camera Staying Still"
16        max_magnitude = 0.0
17        avg_flow = np.array([0.0, 0.0])
18        total_rotation = 0.0
19        vanishing_point = np.array([0.0, 0.0])
20        num_features = len(feature_paths)
21
22        print(f"Number of features: {num_features}")
23
24        if num_features == 0:
25            return most_prominent_motion, vanishing_point
26
27        for path in feature_paths.values():
28            if len(path) >= 2:
29                src = np.array([path[-2].x, path[-2].y])
30                dst = np.array([path[-1].x, path[-1].y])
31                avg_flow += dst - src
32                motion_vector = dst + (dst - src)
33                vanishing_point += motion_vector
34                rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
35                total_rotation += rotation
36
37        avg_flow /= num_features
38        avg_rotation = total_rotation / num_features
39        vanishing_point /= num_features
40
41        print(f"Average Flow: {avg_flow}")
42        print(f"Average Rotation: {avg_rotation}")
43
44        avg_flow = (self.filter_weight * self.last_avg_flow +
45                    (1 - self.filter_weight) * avg_flow)
46        self.last_avg_flow = avg_flow
47
48        flow_magnitude = np.linalg.norm(avg_flow)
49        rotation_magnitude = abs(avg_rotation)
50
51        if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
52            if abs(avg_flow[0]) > abs(avg_flow[1]):
53                most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
54            else:
55                most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
56            max_magnitude = flow_magnitude
57
58        if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
59            most_prominent_motion = 'Rotating'
60
61        return most_prominent_motion, vanishing_point
62
63
64class FeatureTrackerDrawer:
65
66    lineColor = (200, 0, 200)
67    pointColor = (0, 0, 255)
68    vanishingPointColor = (255, 0, 255)  # Violet color for vanishing point
69    circleRadius = 2
70    maxTrackedFeaturesPathLength = 30
71    trackedFeaturesPathLength = 10
72
73    trackedIDs = None
74    trackedFeaturesPath = None
75
76    def trackFeaturePath(self, features):
77
78        newTrackedIDs = set()
79        for currentFeature in features:
80            currentID = currentFeature.id
81            newTrackedIDs.add(currentID)
82
83            if currentID not in self.trackedFeaturesPath:
84                self.trackedFeaturesPath[currentID] = deque()
85
86            path = self.trackedFeaturesPath[currentID]
87            path.append(currentFeature.position)
88
89            while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
90                path.popleft()
91
92            self.trackedFeaturesPath[currentID] = path
93
94        featuresToRemove = set()
95        for oldId in self.trackedIDs:
96            if oldId not in newTrackedIDs:
97                featuresToRemove.add(oldId)
98
99        for id in featuresToRemove:
100            self.trackedFeaturesPath.pop(id)
101
102        self.trackedIDs = newTrackedIDs
103
104    def drawVanishingPoint(self, img, vanishing_point):
105        cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
106
107    # Define color mapping for directions
108    direction_colors = {
109        "Up": (0, 255, 255),  # Yellow
110        "Down": (0, 255, 0),  # Green
111        "Left": (255, 0, 0),  # Blue
112        "Right": (0, 0, 255), # Red
113    }
114
115    def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
116
117        # Get the appropriate point color based on the prominent motion
118        if prominent_motion in self.direction_colors:
119            point_color = self.direction_colors[prominent_motion]
120        else:
121            point_color = self.pointColor
122
123        for featurePath in self.trackedFeaturesPath.values():
124            path = featurePath
125
126            for j in range(len(path) - 1):
127                src = (int(path[j].x), int(path[j].y))
128                dst = (int(path[j + 1].x), int(path[j + 1].y))
129                cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
130
131            j = len(path) - 1
132            cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
133
134        # Draw the direction text on the image
135        if prominent_motion:
136            font = cv2.FONT_HERSHEY_SIMPLEX
137            font_scale = 1
138            font_thickness = 2
139            text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
140            text_x = (img.shape[1] - text_size[0]) // 2
141            text_y = text_size[1] + 20  # 20 pixels from the top
142
143            # Get the appropriate color based on the prominent motion
144            text_color = self.direction_colors.get(prominent_motion, (255, 255, 255))  # Default to white
145
146            # Draw the text
147            cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
148
149
150        # Draw vanishing point if provided
151        if vanishing_point is not None:
152            self.drawVanishingPoint(img, vanishing_point)
153
154    def __init__(self, windowName):
155        self.windowName = windowName
156        cv2.namedWindow(windowName)
157        self.trackedIDs = set()
158        self.trackedFeaturesPath = dict()
159
160def create_pipeline():
161    pipeline = dai.Pipeline()
162
163    # Create a MonoCamera node and set its properties
164    mono_left = pipeline.create(dai.node.MonoCamera)
165    mono_left.setCamera("left")
166    mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
167    mono_left.setFps(15)
168
169    # Create a FeatureTracker node
170    feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
171
172    # Create XLinkOut nodes for output streams
173    xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
174    xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
175
176    # Set stream names
177    xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
178    xout_passthrough_left.setStreamName("passthroughLeft")
179
180    # Allocate resources for improved performance
181    num_shaves = 2
182    num_memory_slices = 2
183    feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
184
185    # Link the nodes
186    mono_left.out.link(feature_tracker_left.inputImage)
187    feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
188    feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
189
190    return pipeline
191
192
193if __name__ == '__main__':
194    pipeline = create_pipeline()
195    with dai.Device(pipeline) as device:
196        output_features_left_queue = device.getOutputQueue(
197            "trackedFeaturesLeft", maxSize=4, blocking=False)
198        passthrough_image_left_queue = device.getOutputQueue(
199            "passthroughLeft", maxSize=4, blocking=False)
200
201        left_window_name = "Left"
202        left_feature_drawer = FeatureTrackerDrawer(left_window_name)
203        camera_estimator_left = CameraMotionEstimator(
204            filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
205
206        while True:
207            in_passthrough_frame_left = passthrough_image_left_queue.get()
208            passthrough_frame_left = in_passthrough_frame_left.getFrame()
209            left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
210
211            tracked_features_left = output_features_left_queue.get().trackedFeatures
212            motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
213                left_feature_drawer.trackedFeaturesPath)
214
215            left_feature_drawer.trackFeaturePath(tracked_features_left)
216            left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
217
218            print("Motions:", motions_left)
219            cv2.imshow(left_window_name, left_frame)
220
221            if cv2.waitKey(1) == ord('q'):
222                break

Need assistance?

Head over to Discussion Forum for technical support or any other questions you might have.