此页面由 AI 自动翻译。查看英文原版

模板应用

让我们通过一个示例来深入了解基础知识。我们将创建一个简单的应用程序,该应用程序运行一个对象检测神经网络, 并流式传输带有可视化神经网络检测结果的彩色视频。此外,当出现有趣的情况时,它会将图像和视频上传到 Luxonis Hub。
1

代码组织

业务逻辑

在较小的应用程序中,标准方法是将所有管道输出路由到一个统一的位置来处理业务逻辑。在此示例中,我们的操作包括:
  • 将带有叠加数据的 h264 编码帧传输到 Luxonis Hub,使我们能够通过“实时视图”选项卡查看实时流。
  • 当检测到人员并且已过 image_event_upload_interval_minutes 设置的时间间隔后,将图像事件发送到 Luxonis Hub。
  • 在检测到人员并且已过 video_event_upload_interval_minutes 设置的时间间隔后,将视频事件发送到 Luxonis Hub。
这三个操作构成了 robothub 框架提供的核心报告功能。

应用程序

派生自 robothub.BaseDepthAIApplication 的类充当应用程序的入口点。
此类要求用户定义两个基本方法:setup_pipeline 和 manage_device。

管道

这是创建管道的位置。
管道充当 OAK 设备的蓝图,定义其设备上的数据处理。
使用 DepthAI 库来定义适合您应用程序需求的管道。

主循环

当执行到达 manage_device 方法时,设备已在执行管道。
为了将数据传入我们的应用程序,我们必须创建输出队列并从中检索数据。
管道中定义的每个 XLinkOut 都对应一个输出队列。

配置更改

这是一个可选的实现方法,当 Luxonis Hub 中的应用程序配置更改时会调用它。
默认行为是重新启动应用程序。
2

管道定义

RGB 传感器

我们正在创建一个 RGB 传感器的对象。此传感器将提供 1080p 分辨率并以配置的
每秒帧数 (FPS) 运行。
通常,我们的设备有三个传感器:中间一个 RGB 传感器,两侧各一个单色传感器。

编码器

对于几乎所有的感知应用程序,您将从两个编码器开始。MJPEG 编码器为您提供 .jpeg 帧以用于图像报告。
同时,H264 编码器提供将实时视频流式传输到 Luxonis Hub 所需的帧。

神经网络

接下来,我们正在设置一个处理图像的神经网络。我们正在使用预训练模型集合中的一个模型, 该模型与 Myriad X 加速器兼容。 预期输入是形状为 640x352x3 [w,h,ch] 的图像。这就是我们将预览尺寸设置为此确切形状的原因。
Python
1preview_resolution = (640, 352)
2node.setPreviewSize(*preview_resolution)

RGB 传感器

将任何 管道节点 连接到 XLinkOut 节点,并将节点数据传入您的应用程序。
1import logging as log
2import time
3
4import blobconverter
5import depthai as dai
6import object_detector_config as nn_config
7import robothub as rh
8
9
10class BusinessLogic:
11 def __init__(self, frame_buffer: rh.FrameBuffer, live_view: rh.DepthaiLiveView):
12 self.live_view: rh.DepthaiLiveView = live_view
13 self.frame_buffer: rh.FrameBuffer = frame_buffer
14
15 self.last_image_event_upload_seconds = time.time()
16 self.last_video_event_upload_seconds = time.time()
17
18 def process_pipeline_outputs(self, h264_frame: dai.ImgFrame, mjpeg_frame: dai.ImgFrame, object_detections: dai.ImgDetections):
19 self.frame_buffer.add_frame(h264_frame) # make sure to store every h264 frame
20 for detection in object_detections.detections:
21 # visualize bounding box in the live view
22 bbox = (detection.xmin, detection.ymin, detection.xmax, detection.ymax)
23 self.live_view.add_rectangle(bbox, label=nn_config.labels[detection.label])
24
25 current_time_seconds = time.time()
26 # arbitrary condition for sending image events to RobotHub
27 if current_time_seconds - self.last_image_event_upload_seconds > rh.CONFIGURATION["image_event_upload_interval_minutes"] * 60:
28 if nn_config.labels[detection.label] == 'person':
29 self.last_image_event_upload_seconds = current_time_seconds
30 rh.send_image_event(image=mjpeg_frame.getCvFrame(), title='Person detected')
31 # arbitrary condition for sending video events to RobotHub
32 if current_time_seconds - self.last_video_event_upload_seconds > rh.CONFIGURATION["video_event_upload_interval_minutes"] * 60:
33 if nn_config.labels[detection.label] == 'person':
34 self.last_video_event_upload_seconds = current_time_seconds
35 self.frame_buffer.save_video_event(before_seconds=60, after_seconds=60, title="Interesting video",
36 fps=rh.CONFIGURATION["fps"], frame_width=self.live_view.frame_width,
37 frame_height=self.live_view.frame_height)
38 self.live_view.publish(h264_frame=h264_frame.getCvFrame())
39
40
41
42class Application(rh.BaseDepthAIApplication):
43
44 def __init__(self):
45 super().__init__()
46 self.live_view = rh.DepthaiLiveView(name="live_view", unique_key="rgb",
47 width=1920, height=1080)
48 frame_buffer = rh.FrameBuffer(maxlen=rh.CONFIGURATION["fps"] * 60 * 2) # buffer last 2 minutes
49 self.business_logic = BusinessLogic(frame_buffer=frame_buffer, live_view=self.live_view)
50
51 def setup_pipeline(self) -> dai.Pipeline:
52 """Define the pipeline using DepthAI."""
53
54 log.info(f"App config: {rh.CONFIGURATION}")
55 pipeline = dai.Pipeline()
56 rgb_sensor = create_rgb_sensor(pipeline=pipeline, preview_resolution=(640, 352))
57 rgb_h264_encoder = create_h264_encoder(node_input=rgb_sensor.video, pipeline=pipeline)
58 rgb_mjpeg_encoder = create_mjpeg_encoder(node_input=rgb_sensor.video, pipeline=pipeline)
59 object_detection_nn = create_yolov7tiny_coco_nn(node_input=rgb_sensor.preview, pipeline=pipeline)
60
61 create_output(pipeline=pipeline, node_input=rgb_h264_encoder.bitstream, stream_name="h264_frames")
62 create_output(pipeline=pipeline, node_input=rgb_mjpeg_encoder.bitstream, stream_name="mjpeg_frames")
63 create_output(pipeline=pipeline, node_input=object_detection_nn.out, stream_name="object_detections")
64 return pipeline
65
66
67 def manage_device(self, device: dai.Device):
68 log.info(f"{device.getMxId()} creating output queues...")
69 h264_frames_queue = device.getOutputQueue(name="h264_frames", maxSize=10, blocking=True)
70 mjpeg_frames_queue = device.getOutputQueue(name="mjpeg_frames", maxSize=10, blocking=True)
71 object_detections_queue = device.getOutputQueue(name="object_detections", maxSize=10, blocking=True)
72
73 log.info(f"{device.getMxId()} Application started")
74 while rh.app_is_running() and self.device_is_running:
75 h264_frame = h264_frames_queue.get()
76 mjpeg_frame = mjpeg_frames_queue.get()
77 object_detections = object_detections_queue.get()
78 self.business_logic.process_pipeline_outputs(h264_frame=h264_frame, mjpeg_frame=mjpeg_frame, object_detections=object_detections)
79 time.sleep(0.001)
80
81
82 def on_configuration_changed(self, configuration_changes: dict) -> None:
83 log.info(f"CONFIGURATION CHANGES: {configuration_changes}")
84 if "fps" in configuration_changes:
85 log.info(f"FPS change needs a new pipeline. Restarting OAK device...")
86 self.restart_device()
87
88
89
90def create_rgb_sensor(pipeline: dai.Pipeline,
91 fps: int = 30,
92 resolution: dai.ColorCameraProperties.SensorResolution = dai.ColorCameraProperties.SensorResolution.THE_1080_P,
93 preview_resolution: tuple = (1280, 720),
94 ) -> dai.node.ColorCamera:
95 node = pipeline.createColorCamera()
96 node.setBoardSocket(dai.CameraBoardSocket.CAM_A)
97 node.setInterleaved(False)
98 node.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
99 node.setPreviewNumFramesPool(4)
100 node.setPreviewSize(*preview_resolution)
101 node.setVideoSize(1920, 1080)
102 node.setResolution(resolution)
103 node.setFps(fps)
104 return node
105
106
107def create_h264_encoder(node_input: dai.Node.Output, pipeline: dai.Pipeline, fps: int = 30):
108 rh_encoder = pipeline.createVideoEncoder()
109 rh_encoder_profile = dai.VideoEncoderProperties.Profile.H264_MAIN
110 rh_encoder.setDefaultProfilePreset(fps, rh_encoder_profile)
111 rh_encoder.input.setQueueSize(2)
112 rh_encoder.input.setBlocking(False)
113 rh_encoder.setKeyframeFrequency(fps)
114 rh_encoder.setRateControlMode(dai.VideoEncoderProperties.RateControlMode.CBR)
115 rh_encoder.setNumFramesPool(3)
116 node_input.link(rh_encoder.input)
117 return rh_encoder
118
119
120def create_mjpeg_encoder(node_input: dai.Node.Output, pipeline: dai.Pipeline, fps: int = 30, quality: int = 100):
121 encoder = pipeline.createVideoEncoder()
122 encoder_profile = dai.VideoEncoderProperties.Profile.MJPEG
123 encoder.setDefaultProfilePreset(fps, encoder_profile)
124 encoder.setQuality(quality)
125 node_input.link(encoder.input)
126 return encoder
127
128
129def create_yolov7tiny_coco_nn(node_input: dai.Node.Output, pipeline: dai.Pipeline) -> dai.node.YoloDetectionNetwork:
130 model = "yolov7tiny_coco_640x352"
131 node = pipeline.createYoloDetectionNetwork()
132 blob = dai.OpenVINO.Blob(blobconverter.from_zoo(name=model, zoo_type="depthai", shaves=6))
133 node.setBlob(blob)
134 node_input.link(node.input)
135 node.input.setBlocking(False)
136 # Yolo specific parameters
137 node.setConfidenceThreshold(0.5)
138 node.setNumClasses(80)
139 node.setCoordinateSize(4)
140 node.setAnchors([12.0, 16.0, 19.0, 36.0, 40.0, 28.0, 36.0, 75.0, 76.0, 55.0, 72.0, 146.0, 142.0, 110.0, 192.0, 243.0, 459.0, 401.0])
141 node.setAnchorMasks({
142 "side80": [0, 1, 2],
143 "side40": [3, 4, 5],
144 "side20": [6, 7, 8]
145 })
146 node.setIouThreshold(0.5)
147 return node
148
149
150def create_output(pipeline, node_input: dai.Node.Output, stream_name: str):
151 xout = pipeline.createXLinkOut()
152 xout.setStreamName(stream_name)
153 node_input.link(xout.input)
154
155
156if __name__ == "__main__":
157 app = Application()
158 app.run()
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174