DepthAI v2 has been superseded by DepthAI v3. You are viewing legacy documentation.
此页面由 AI 自动翻译。查看英文原版
DepthAI 教程
DepthAI API 参考

本页目录

  • 演示
  • 设置
  • 源代码
  • 管道

具有运动估计的特征跟踪器

本示例演示了 FeatureTracker 结合运动估计的功能。它使用光流在连续帧之间检测和跟踪特征。 每个特征都被分配一个唯一的 ID。相机的运动是根据跟踪的特征来估计的,估计的运动(例如,上、下、左、右、旋转)会显示在屏幕上。Feature Detector 示例仅检测特征而不估计运动。

演示

特征运动估计的特征跟踪器

设置

请运行 安装脚本 以下载所有必需的依赖项。请注意,此脚本必须在 git 上下文中运行,因此您必须先下载 depthai-python 存储库,然后运行脚本
Command Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
有关更多信息,请遵循 安装指南

源代码

Python

Python
GitHub
1#!/usr/bin/env python3
2
3import numpy as np
4import cv2
5from collections import deque
6import depthai as dai
7
8
9class CameraMotionEstimator:
10    def __init__(self, filter_weight=0.5, motion_threshold=0.01, rotation_threshold=0.05):
11        self.last_avg_flow = np.array([0.0, 0.0])
12        self.filter_weight = filter_weight
13        self.motion_threshold = motion_threshold
14        self.rotation_threshold = rotation_threshold
15
16    def estimate_motion(self, feature_paths):
17        most_prominent_motion = "Camera Staying Still"
18        max_magnitude = 0.0
19        avg_flow = np.array([0.0, 0.0])
20        total_rotation = 0.0
21        vanishing_point = np.array([0.0, 0.0])
22        num_features = len(feature_paths)
23
24        print(f"Number of features: {num_features}")
25
26        if num_features == 0:
27            return most_prominent_motion, vanishing_point
28
29        for path in feature_paths.values():
30            if len(path) >= 2:
31                src = np.array([path[-2].x, path[-2].y])
32                dst = np.array([path[-1].x, path[-1].y])
33                avg_flow += dst - src
34                motion_vector = dst + (dst - src)
35                vanishing_point += motion_vector
36                rotation = np.arctan2(dst[1] - src[1], dst[0] - src[0])
37                total_rotation += rotation
38
39        avg_flow /= num_features
40        avg_rotation = total_rotation / num_features
41        vanishing_point /= num_features
42
43        print(f"Average Flow: {avg_flow}")
44        print(f"Average Rotation: {avg_rotation}")
45
46        avg_flow = (self.filter_weight * self.last_avg_flow +
47                    (1 - self.filter_weight) * avg_flow)
48        self.last_avg_flow = avg_flow
49
50        flow_magnitude = np.linalg.norm(avg_flow)
51        rotation_magnitude = abs(avg_rotation)
52
53        if flow_magnitude > max_magnitude and flow_magnitude > self.motion_threshold:
54            if abs(avg_flow[0]) > abs(avg_flow[1]):
55                most_prominent_motion = 'Right' if avg_flow[0] < 0 else 'Left'
56            else:
57                most_prominent_motion = 'Down' if avg_flow[1] < 0 else 'Up'
58            max_magnitude = flow_magnitude
59
60        if rotation_magnitude > max_magnitude and rotation_magnitude > self.rotation_threshold:
61            most_prominent_motion = 'Rotating'
62
63        return most_prominent_motion, vanishing_point
64
65
66class FeatureTrackerDrawer:
67
68    lineColor = (200, 0, 200)
69    pointColor = (0, 0, 255)
70    vanishingPointColor = (255, 0, 255)  # Violet color for vanishing point
71    circleRadius = 2
72    maxTrackedFeaturesPathLength = 30
73    trackedFeaturesPathLength = 10
74
75    trackedIDs = None
76    trackedFeaturesPath = None
77
78    def trackFeaturePath(self, features):
79
80        newTrackedIDs = set()
81        for currentFeature in features:
82            currentID = currentFeature.id
83            newTrackedIDs.add(currentID)
84
85            if currentID not in self.trackedFeaturesPath:
86                self.trackedFeaturesPath[currentID] = deque()
87
88            path = self.trackedFeaturesPath[currentID]
89            path.append(currentFeature.position)
90
91            while(len(path) > max(1, FeatureTrackerDrawer.trackedFeaturesPathLength)):
92                path.popleft()
93
94            self.trackedFeaturesPath[currentID] = path
95
96        featuresToRemove = set()
97        for oldId in self.trackedIDs:
98            if oldId not in newTrackedIDs:
99                featuresToRemove.add(oldId)
100
101        for id in featuresToRemove:
102            self.trackedFeaturesPath.pop(id)
103
104        self.trackedIDs = newTrackedIDs
105
106    def drawVanishingPoint(self, img, vanishing_point):
107        cv2.circle(img, (int(vanishing_point[0]), int(vanishing_point[1])), self.circleRadius, self.vanishingPointColor, -1, cv2.LINE_AA, 0)
108
109    # Define color mapping for directions
110    direction_colors = {
111        "Up": (0, 255, 255),  # Yellow
112        "Down": (0, 255, 0),  # Green
113        "Left": (255, 0, 0),  # Blue
114        "Right": (0, 0, 255), # Red
115    }
116
117    def drawFeatures(self, img, vanishing_point=None, prominent_motion=None):
118
119        # Get the appropriate point color based on the prominent motion
120        if prominent_motion in self.direction_colors:
121            point_color = self.direction_colors[prominent_motion]
122        else:
123            point_color = self.pointColor
124
125        for featurePath in self.trackedFeaturesPath.values():
126            path = featurePath
127
128            for j in range(len(path) - 1):
129                src = (int(path[j].x), int(path[j].y))
130                dst = (int(path[j + 1].x), int(path[j + 1].y))
131                cv2.line(img, src, dst, point_color, 1, cv2.LINE_AA, 0)
132
133            j = len(path) - 1
134            cv2.circle(img, (int(path[j].x), int(path[j].y)), self.circleRadius, point_color, -1, cv2.LINE_AA, 0)
135
136        # Draw the direction text on the image
137        if prominent_motion:
138            font = cv2.FONT_HERSHEY_SIMPLEX
139            font_scale = 1
140            font_thickness = 2
141            text_size = cv2.getTextSize(prominent_motion, font, font_scale, font_thickness)[0]
142            text_x = (img.shape[1] - text_size[0]) // 2
143            text_y = text_size[1] + 20  # 20 pixels from the top
144
145            # Get the appropriate color based on the prominent motion
146            text_color = self.direction_colors.get(prominent_motion, (255, 255, 255))  # Default to white
147
148            # Draw the text
149            cv2.putText(img, prominent_motion, (text_x, text_y), font, font_scale, text_color, font_thickness, cv2.LINE_AA)
150
151
152        # Draw vanishing point if provided
153        if vanishing_point is not None:
154            self.drawVanishingPoint(img, vanishing_point)
155
156    def __init__(self, windowName):
157        self.windowName = windowName
158        cv2.namedWindow(windowName)
159        self.trackedIDs = set()
160        self.trackedFeaturesPath = dict()
161
162def create_pipeline():
163    pipeline = dai.Pipeline()
164
165    # Create a MonoCamera node and set its properties
166    mono_left = pipeline.create(dai.node.MonoCamera)
167    mono_left.setCamera("left")
168    mono_left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_720_P)
169    mono_left.setFps(15)
170
171    # Create a FeatureTracker node
172    feature_tracker_left = pipeline.create(dai.node.FeatureTracker)
173
174    # Create XLinkOut nodes for output streams
175    xout_tracked_features_left = pipeline.create(dai.node.XLinkOut)
176    xout_passthrough_left = pipeline.create(dai.node.XLinkOut)
177
178    # Set stream names
179    xout_tracked_features_left.setStreamName("trackedFeaturesLeft")
180    xout_passthrough_left.setStreamName("passthroughLeft")
181
182    # Allocate resources for improved performance
183    num_shaves = 2
184    num_memory_slices = 2
185    feature_tracker_left.setHardwareResources(num_shaves, num_memory_slices)
186
187    # Link the nodes
188    mono_left.out.link(feature_tracker_left.inputImage)
189    feature_tracker_left.passthroughInputImage.link(xout_passthrough_left.input)
190    feature_tracker_left.outputFeatures.link(xout_tracked_features_left.input)
191
192    return pipeline
193
194
195if __name__ == '__main__':
196    pipeline = create_pipeline()
197    with dai.Device(pipeline) as device:
198        output_features_left_queue = device.getOutputQueue(
199            "trackedFeaturesLeft", maxSize=4, blocking=False)
200        passthrough_image_left_queue = device.getOutputQueue(
201            "passthroughLeft", maxSize=4, blocking=False)
202
203        left_window_name = "Left"
204        left_feature_drawer = FeatureTrackerDrawer(left_window_name)
205        camera_estimator_left = CameraMotionEstimator(
206            filter_weight=0.5, motion_threshold=0.3, rotation_threshold=0.5)
207
208        while True:
209            in_passthrough_frame_left = passthrough_image_left_queue.get()
210            passthrough_frame_left = in_passthrough_frame_left.getFrame()
211            left_frame = cv2.cvtColor(passthrough_frame_left, cv2.COLOR_GRAY2BGR)
212
213            tracked_features_left = output_features_left_queue.get().trackedFeatures
214            motions_left, vanishing_pt_left = camera_estimator_left.estimate_motion(
215                left_feature_drawer.trackedFeaturesPath)
216
217            left_feature_drawer.trackFeaturePath(tracked_features_left)
218            left_feature_drawer.drawFeatures(left_frame, vanishing_pt_left, motions_left)
219
220            print("Motions:", motions_left)
221            cv2.imshow(left_window_name, left_frame)
222
223            if cv2.waitKey(1) == ord('q'):
224                break

管道

需要帮助?

请前往 Discussion Forum 获取技术支持或提出您可能有的任何其他问题。