Template app

Let's dive into the basics using an example. We'll create a simple application that runs an object detection neural network and streams color video with visualized neural network detections. Additionally, it uploads images and videos to Luxonis Hub when an interesting situation occurs.
1

Code organization

Business logic

In smaller applications, the standard approach is to route all pipeline outputs to a unified location for processing business logic. In this example, our actions include:
  • Transmitting h264 encoded frames with overlay data to the Luxonis Hub, enabling us to view the live stream via the Live View tab.
  • Sending an image event to the Luxonis Hub when a person is detected and the interval set by image_event_upload_interval_minutes has passed.
  • Sending a video event to the Luxonis Hub upon person detection and after the video_event_upload_interval_minutes interval.
These three actions constitute the core reporting functionalities provided by the robothub framework.

Application

A class derived from robothub.BaseDepthAIApplication acts as the entry point for the application.
This class requires the user to define two essential methods: setup_pipeline and manage_device.

Pipeline

This location is where the pipeline is created.
The pipeline acts as a blueprint for the OAK device, defining its on-device data processing.
Use the DepthAI library to define the pipeline that fits the needs of your application.

Main loop

By the time execution reaches the manage_device method, the device is already executing the pipeline.
To get the data into our application, we must create output queues and retrieve data from them.
Each XLinkOut defined in the pipeline corresponds to one output queue.

Config change

This is an optional method to implement and it is called when the configuration of the application is changed in Luxonis Hub.
The default behavior is the application restart.
2

Pipeline definition

RGB sensor

We're making an object for the RGB sensor. This sensor will provide 1080p resolution and operate at configured
frames per second (FPS) rate.
Typically, our devices have three sensors: one RGB sensor in the middle and two monochrome sensors on the sides.

Encoders

For nearly all Perception apps, you'll start with two encoders. The MJPEG encoder gives you .jpeg frames for image reports.
Meanwhile, the H264 encoder provides frames necessary for streaming live video to Luxonis Hub.

Neural network

Next, we're setting up a neural network that processes images. We're using a model from our collection of pre-trained models, compatible with the Myriad X accelerator. The expected input is an image with shape 640x352x3 [w,h,ch]. This is the reason why we set the preview size to this exact shape.
Python
1preview_resolution = (640, 352)
2node.setPreviewSize(*preview_resolution)

RGB sensor

Connect any pipeline node to a XLinkOut node and get the node data into your application.
1import logging as log
2import time
3
4import blobconverter
5import depthai as dai
6import object_detector_config as nn_config
7import robothub as rh
8
9
10class BusinessLogic:
11 def __init__(self, frame_buffer: rh.FrameBuffer, live_view: rh.DepthaiLiveView):
12 self.live_view: rh.DepthaiLiveView = live_view
13 self.frame_buffer: rh.FrameBuffer = frame_buffer
14
15 self.last_image_event_upload_seconds = time.time()
16 self.last_video_event_upload_seconds = time.time()
17
18 def process_pipeline_outputs(self, h264_frame: dai.ImgFrame, mjpeg_frame: dai.ImgFrame, object_detections: dai.ImgDetections):
19 self.frame_buffer.add_frame(h264_frame) # make sure to store every h264 frame
20 for detection in object_detections.detections:
21 # visualize bounding box in the live view
22 bbox = (detection.xmin, detection.ymin, detection.xmax, detection.ymax)
23 self.live_view.add_rectangle(bbox, label=nn_config.labels[detection.label])
24
25 current_time_seconds = time.time()
26 # arbitrary condition for sending image events to RobotHub
27 if current_time_seconds - self.last_image_event_upload_seconds > rh.CONFIGURATION["image_event_upload_interval_minutes"] * 60:
28 if nn_config.labels[detection.label] == 'person':
29 self.last_image_event_upload_seconds = current_time_seconds
30 rh.send_image_event(image=mjpeg_frame.getCvFrame(), title='Person detected')
31 # arbitrary condition for sending video events to RobotHub
32 if current_time_seconds - self.last_video_event_upload_seconds > rh.CONFIGURATION["video_event_upload_interval_minutes"] * 60:
33 if nn_config.labels[detection.label] == 'person':
34 self.last_video_event_upload_seconds = current_time_seconds
35 self.frame_buffer.save_video_event(before_seconds=60, after_seconds=60, title="Interesting video",
36 fps=rh.CONFIGURATION["fps"], frame_width=self.live_view.frame_width,
37 frame_height=self.live_view.frame_height)
38 self.live_view.publish(h264_frame=h264_frame.getCvFrame())
39
40
41
42class Application(rh.BaseDepthAIApplication):
43
44 def __init__(self):
45 super().__init__()
46 self.live_view = rh.DepthaiLiveView(name="live_view", unique_key="rgb",
47 width=1920, height=1080)
48 frame_buffer = rh.FrameBuffer(maxlen=rh.CONFIGURATION["fps"] * 60 * 2) # buffer last 2 minutes
49 self.business_logic = BusinessLogic(frame_buffer=frame_buffer, live_view=self.live_view)
50
51 def setup_pipeline(self) -> dai.Pipeline:
52 """Define the pipeline using DepthAI."""
53
54 log.info(f"App config: {rh.CONFIGURATION}")
55 pipeline = dai.Pipeline()
56 rgb_sensor = create_rgb_sensor(pipeline=pipeline, preview_resolution=(640, 352))
57 rgb_h264_encoder = create_h264_encoder(node_input=rgb_sensor.video, pipeline=pipeline)
58 rgb_mjpeg_encoder = create_mjpeg_encoder(node_input=rgb_sensor.video, pipeline=pipeline)
59 object_detection_nn = create_yolov7tiny_coco_nn(node_input=rgb_sensor.preview, pipeline=pipeline)
60
61 create_output(pipeline=pipeline, node_input=rgb_h264_encoder.bitstream, stream_name="h264_frames")
62 create_output(pipeline=pipeline, node_input=rgb_mjpeg_encoder.bitstream, stream_name="mjpeg_frames")
63 create_output(pipeline=pipeline, node_input=object_detection_nn.out, stream_name="object_detections")
64 return pipeline
65
66
67 def manage_device(self, device: dai.Device):
68 log.info(f"{device.getMxId()} creating output queues...")
69 h264_frames_queue = device.getOutputQueue(name="h264_frames", maxSize=10, blocking=True)
70 mjpeg_frames_queue = device.getOutputQueue(name="mjpeg_frames", maxSize=10, blocking=True)
71 object_detections_queue = device.getOutputQueue(name="object_detections", maxSize=10, blocking=True)
72
73 log.info(f"{device.getMxId()} Application started")
74 while rh.app_is_running() and self.device_is_running:
75 h264_frame = h264_frames_queue.get()
76 mjpeg_frame = mjpeg_frames_queue.get()
77 object_detections = object_detections_queue.get()
78 self.business_logic.process_pipeline_outputs(h264_frame=h264_frame, mjpeg_frame=mjpeg_frame, object_detections=object_detections)
79 time.sleep(0.001)
80
81
82 def on_configuration_changed(self, configuration_changes: dict) -> None:
83 log.info(f"CONFIGURATION CHANGES: {configuration_changes}")
84 if "fps" in configuration_changes:
85 log.info(f"FPS change needs a new pipeline. Restarting OAK device...")
86 self.restart_device()
87
88
89
90def create_rgb_sensor(pipeline: dai.Pipeline,
91 fps: int = 30,
92 resolution: dai.ColorCameraProperties.SensorResolution = dai.ColorCameraProperties.SensorResolution.THE_1080_P,
93 preview_resolution: tuple = (1280, 720),
94 ) -> dai.node.ColorCamera:
95 node = pipeline.createColorCamera()
96 node.setBoardSocket(dai.CameraBoardSocket.CAM_A)
97 node.setInterleaved(False)
98 node.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
99 node.setPreviewNumFramesPool(4)
100 node.setPreviewSize(*preview_resolution)
101 node.setVideoSize(1920, 1080)
102 node.setResolution(resolution)
103 node.setFps(fps)
104 return node
105
106
107def create_h264_encoder(node_input: dai.Node.Output, pipeline: dai.Pipeline, fps: int = 30):
108 rh_encoder = pipeline.createVideoEncoder()
109 rh_encoder_profile = dai.VideoEncoderProperties.Profile.H264_MAIN
110 rh_encoder.setDefaultProfilePreset(fps, rh_encoder_profile)
111 rh_encoder.input.setQueueSize(2)
112 rh_encoder.input.setBlocking(False)
113 rh_encoder.setKeyframeFrequency(fps)
114 rh_encoder.setRateControlMode(dai.VideoEncoderProperties.RateControlMode.CBR)
115 rh_encoder.setNumFramesPool(3)
116 node_input.link(rh_encoder.input)
117 return rh_encoder
118
119
120def create_mjpeg_encoder(node_input: dai.Node.Output, pipeline: dai.Pipeline, fps: int = 30, quality: int = 100):
121 encoder = pipeline.createVideoEncoder()
122 encoder_profile = dai.VideoEncoderProperties.Profile.MJPEG
123 encoder.setDefaultProfilePreset(fps, encoder_profile)
124 encoder.setQuality(quality)
125 node_input.link(encoder.input)
126 return encoder
127
128
129def create_yolov7tiny_coco_nn(node_input: dai.Node.Output, pipeline: dai.Pipeline) -> dai.node.YoloDetectionNetwork:
130 model = "yolov7tiny_coco_640x352"
131 node = pipeline.createYoloDetectionNetwork()
132 blob = dai.OpenVINO.Blob(blobconverter.from_zoo(name=model, zoo_type="depthai", shaves=6))
133 node.setBlob(blob)
134 node_input.link(node.input)
135 node.input.setBlocking(False)
136 # Yolo specific parameters
137 node.setConfidenceThreshold(0.5)
138 node.setNumClasses(80)
139 node.setCoordinateSize(4)
140 node.setAnchors([12.0, 16.0, 19.0, 36.0, 40.0, 28.0, 36.0, 75.0, 76.0, 55.0, 72.0, 146.0, 142.0, 110.0, 192.0, 243.0, 459.0, 401.0])
141 node.setAnchorMasks({
142 "side80": [0, 1, 2],
143 "side40": [3, 4, 5],
144 "side20": [6, 7, 8]
145 })
146 node.setIouThreshold(0.5)
147 return node
148
149
150def create_output(pipeline, node_input: dai.Node.Output, stream_name: str):
151 xout = pipeline.createXLinkOut()
152 xout.setStreamName(stream_name)
153 node_input.link(xout.input)
154
155
156if __name__ == "__main__":
157 app = Application()
158 app.run()
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174