DepthAI Tutorials
DepthAI API References

ON THIS PAGE

  • Feature Tracker with Motion Estimation
  • Demo
  • Setup
  • Source code
  • Pipeline

Feature Tracker with Motion Estimation

This example demonstrates the capabilities of the FeatureTracker combined with motion estimation. It detects and tracks features between consecutive frames using optical flow. Each feature is assigned a unique ID. The motion of the camera is estimated based on the tracked features, and the estimated motion (e.g., Up, Down, Left, Right, Rotating) is displayed on screen.The Feature Detector example only detects features without estimating motion.

Demo

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
Command Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
For additional information, please follow the installation guide.

Source code

Python

Python

Python
GitHub
1#!/usr/bin/env python3
2
3import numpy as np
4import cv2
5from collections import deque
6import depthai as dai
7
8
9class CameraMotionEstimator:
10    def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
11        self.last_avg_flow = np.array([0.0, 0.0])
12        self.filter_weight = filter_weight
13        self.motion_threshold = motion_threshold
14        self.rotation_threshold = rotation_threshold
15
16    def estimate_motion(self, feature_paths):
17        most_prominent_motion = "Camera Staying Still"
18        max_magnitude = 0.0
19        avg_flow = np.array([0.0, 0.0])
20        total_rotation = 0.0
21        vanishing_point = np.array([0.0, 0.0])
22        num_features = len(feature_paths)
23
24        print(f"Number of features: {num_features}")
25
26        if num_features == 0:
27            return most_prominent_motion, vanishing_point
28
29        for path in feature_paths.values():
30            if len(path) >= 2:
31                src = np.array([path[-2].x, path[-2].y])
32                dst = np.array([path[-1].x, path[-1].y])
33                avg_flow += dst - src
34                motion_vector = dst + (dst - src)
35                vanishing_point += motion_vector
36                rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
37                total_rotation += rotation
38
39        avg_flow /= num_features
40        avg_rotation = total_rotation / num_features
41        vanishing_point /= num_features
42
43        print(f"Average Flow: {avg_flow}")
44        print(f"Average Rotation: {avg_rotation}")
45
46        avg_flow = (self.filter_weight * self.last_avg_flow +
47                    (1 - self.filter_weight) * avg_flow)
48        self.last_avg_flow = avg_flow
49
50        flow_magnitude = np.linalg.norm(avg_flow)
51        rotation_magnitude = abs(avg_rotation)
52
53        if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
54            if abs(avg_flow[0]) > abs(avg_flow[1]):
55                most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
56            else:
57                most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
58            max_magnitude = flow_magnitude
59
60        if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
61            most_prominent_motion = 'Rotating'
62
63        return most_prominent_motion, vanishing_point
64
65
66class FeatureTrackerDrawer:
67
68    lineColor = (200, 0, 200)
69    pointColor = (0, 0, 255)
70    vanishingPointColor = (255, 0, 255)  # Violet color for vanishing point
71    circleRadius = 2
72    maxTrackedFeaturesPathLength = 30
73    trackedFeaturesPathLength = 10
74
75    trackedIDs = None
76    trackedFeaturesPath = None
77
78    def trackFeaturePath(self, features):
79
80        newTrackedIDs = set()
81        for currentFeature in features:
82            currentID = currentFeature.id
83            newTrackedIDs.add(currentID)
84
85            if currentID not in self.trackedFeaturesPath:
86                self.trackedFeaturesPath[currentID] = deque()
87
88            path = self.trackedFeaturesPath[currentID]
89            path.append(currentFeature.position)
90
91            while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
92                path.popleft()
93
94            self.trackedFeaturesPath[currentID] = path
95
96        featuresToRemove = set()
97        for oldId in self.trackedIDs:
98            if oldId not in newTrackedIDs:
99                featuresToRemove.add(oldId)
100
101        for id in featuresToRemove:
102            self.trackedFeaturesPath.pop(id)
103
104        self.trackedIDs = newTrackedIDs
105
106    def drawVanishingPoint(self, img, vanishing_point):
107        cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
108
109    # Define color mapping for directions
110    direction_colors = {
111        "Up": (0, 255, 255),  # Yellow
112        "Down": (0, 255, 0),  # Green
113        "Left": (255, 0, 0),  # Blue
114        "Right": (0, 0, 255), # Red
115    }
116
117    def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
118
119        # Get the appropriate point color based on the prominent motion
120        if prominent_motion in self.direction_colors:
121            point_color = self.direction_colors[prominent_motion]
122        else:
123            point_color = self.pointColor
124
125        for featurePath in self.trackedFeaturesPath.values():
126            path = featurePath
127
128            for j in range(len(path) - 1):
129                src = (int(path[j].x), int(path[j].y))
130                dst = (int(path[j + 1].x), int(path[j + 1].y))
131                cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
132
133            j = len(path) - 1
134            cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
135
136        # Draw the direction text on the image
137        if prominent_motion:
138            font = cv2.FONT_HERSHEY_SIMPLEX
139            font_scale = 1
140            font_thickness = 2
141            text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
142            text_x = (img.shape[1] - text_size[0]) // 2
143            text_y = text_size[1] + 20  # 20 pixels from the top
144
145            # Get the appropriate color based on the prominent motion
146            text_color = self.direction_colors.get(prominent_motion, (255, 255, 255))  # Default to white
147
148            # Draw the text
149            cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
150
151
152        # Draw vanishing point if provided
153        if vanishing_point is not None:
154            self.drawVanishingPoint(img, vanishing_point)
155
156    def __init__(self, windowName):
157        self.windowName = windowName
158        cv2.namedWindow(windowName)
159        self.trackedIDs = set()
160        self.trackedFeaturesPath = dict()
161
162def create_pipeline():
163    pipeline = dai.Pipeline()
164
165    # Create a MonoCamera node and set its properties
166    mono_left = pipeline.create(dai.node.MonoCamera)
167    mono_left.setCamera("left")
168    mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
169    mono_left.setFps(15)
170
171    # Create a FeatureTracker node
172    feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
173
174    # Create XLinkOut nodes for output streams
175    xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
176    xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
177
178    # Set stream names
179    xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
180    xout_passthrough_left.setStreamName("passthroughLeft")
181
182    # Allocate resources for improved performance
183    num_shaves = 2
184    num_memory_slices = 2
185    feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
186
187    # Link the nodes
188    mono_left.out.link(feature_tracker_left.inputImage)
189    feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
190    feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
191
192    return pipeline
193
194
195if __name__ == '__main__':
196    pipeline = create_pipeline()
197    with dai.Device(pipeline) as device:
198        output_features_left_queue = device.getOutputQueue(
199            "trackedFeaturesLeft", maxSize=4, blocking=False)
200        passthrough_image_left_queue = device.getOutputQueue(
201            "passthroughLeft", maxSize=4, blocking=False)
202
203        left_window_name = "Left"
204        left_feature_drawer = FeatureTrackerDrawer(left_window_name)
205        camera_estimator_left = CameraMotionEstimator(
206            filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
207
208        while True:
209            in_passthrough_frame_left = passthrough_image_left_queue.get()
210            passthrough_frame_left = in_passthrough_frame_left.getFrame()
211            left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
212
213            tracked_features_left = output_features_left_queue.get().trackedFeatures
214            motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
215                left_feature_drawer.trackedFeaturesPath)
216
217            left_feature_drawer.trackFeaturePath(tracked_features_left)
218            left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
219
220            print("Motions:", motions_left)
221            cv2.imshow(left_window_name, left_frame)
222
223            if cv2.waitKey(1) == ord('q'):
224                break

Pipeline

Need assistance?

Head over to Discussion Forum for technical support or any other questions you might have.