Template app
Let's dive into the basics using an example. We'll create a simple application that runs an object detection neural network and streams color video with visualized neural network detections. Additionally, it uploads images and videos to Luxonis Hub when an interesting situation occurs.1
Code organization
Business logic
In smaller applications, the standard approach is to route all pipeline outputs to a unified location for processing business logic. In this example, our actions include:- Transmitting h264 encoded frames with overlay data to the Luxonis Hub, enabling us to view the live stream via the Live View tab.
- Sending an image event to the Luxonis Hub when a person is detected and the interval set by image_event_upload_interval_minutes has passed.
- Sending a video event to the Luxonis Hub upon person detection and after the video_event_upload_interval_minutes interval.
Application
A class derived from robothub.BaseDepthAIApplication acts as the entry point for the application.This class requires the user to define two essential methods: setup_pipeline and manage_device.
Pipeline
This location is where the pipeline is created.The pipeline acts as a blueprint for the OAK device, defining its on-device data processing.
Use the DepthAI library to define the pipeline that fits the needs of your application.
Main loop
By the time execution reaches the manage_device method, the device is already executing the pipeline.To get the data into our application, we must create output queues and retrieve data from them.
Each XLinkOut defined in the pipeline corresponds to one output queue.
Config change
This is an optional method to implement and it is called when the configuration of the application is changed in Luxonis Hub.The default behavior is the application restart.
2
Pipeline definition
RGB sensor
We're making an object for the RGB sensor. This sensor will provide 1080p resolution and operate at configuredframes per second (FPS) rate.
Typically, our devices have three sensors: one RGB sensor in the middle and two monochrome sensors on the sides.
Encoders
For nearly all Perception apps, you'll start with two encoders. The MJPEG encoder gives you .jpeg frames for image reports.Meanwhile, the H264 encoder provides frames necessary for streaming live video to Luxonis Hub.
Neural network
Next, we're setting up a neural network that processes images. We're using a model from our collection of pre-trained models, compatible with the Myriad X accelerator. The expected input is an image with shape 640x352x3 [w,h,ch]. This is the reason why we set the preview size to this exact shape.Python
1preview_resolution = (640, 352)
2node.setPreviewSize(*preview_resolution)
RGB sensor
Connect any pipeline node to a XLinkOut node and get the node data into your application.1import logging as log2import time34import blobconverter5import depthai as dai6import object_detector_config as nn_config7import robothub as rh8910class BusinessLogic:11 def __init__(self, frame_buffer: rh.FrameBuffer, live_view: rh.DepthaiLiveView):12 self.live_view: rh.DepthaiLiveView = live_view13 self.frame_buffer: rh.FrameBuffer = frame_buffer1415 self.last_image_event_upload_seconds = time.time()16 self.last_video_event_upload_seconds = time.time()1718 def process_pipeline_outputs(self, h264_frame: dai.ImgFrame, mjpeg_frame: dai.ImgFrame, object_detections: dai.ImgDetections):19 self.frame_buffer.add_frame(h264_frame) # make sure to store every h264 frame20 for detection in object_detections.detections:21 # visualize bounding box in the live view22 bbox = (detection.xmin, detection.ymin, detection.xmax, detection.ymax)23 self.live_view.add_rectangle(bbox, label=nn_config.labels[detection.label])2425 current_time_seconds = time.time()26 # arbitrary condition for sending image events to RobotHub27 if current_time_seconds - self.last_image_event_upload_seconds > rh.CONFIGURATION["image_event_upload_interval_minutes"] * 60:28 if nn_config.labels[detection.label] == 'person':29 self.last_image_event_upload_seconds = current_time_seconds30 rh.send_image_event(image=mjpeg_frame.getCvFrame(), title='Person detected')31 # arbitrary condition for sending video events to RobotHub32 if current_time_seconds - self.last_video_event_upload_seconds > rh.CONFIGURATION["video_event_upload_interval_minutes"] * 60:33 if nn_config.labels[detection.label] == 'person':34 self.last_video_event_upload_seconds = current_time_seconds35 self.frame_buffer.save_video_event(before_seconds=60, after_seconds=60, title="Interesting video",36 fps=rh.CONFIGURATION["fps"], frame_width=self.live_view.frame_width,37 frame_height=self.live_view.frame_height)38 self.live_view.publish(h264_frame=h264_frame.getCvFrame())39404142class Application(rh.BaseDepthAIApplication):4344 def __init__(self):45 super().__init__()46 self.live_view = rh.DepthaiLiveView(name="live_view", unique_key="rgb",47 width=1920, height=1080)48 frame_buffer = rh.FrameBuffer(maxlen=rh.CONFIGURATION["fps"] * 60 * 2) # buffer last 2 minutes49 self.business_logic = BusinessLogic(frame_buffer=frame_buffer, live_view=self.live_view)5051 def setup_pipeline(self) -> dai.Pipeline:52 """Define the pipeline using DepthAI."""5354 log.info(f"App config: {rh.CONFIGURATION}")55 pipeline = dai.Pipeline()56 rgb_sensor = create_rgb_sensor(pipeline=pipeline, preview_resolution=(640, 352))57 rgb_h264_encoder = create_h264_encoder(node_input=rgb_sensor.video, pipeline=pipeline)58 rgb_mjpeg_encoder = create_mjpeg_encoder(node_input=rgb_sensor.video, pipeline=pipeline)59 object_detection_nn = create_yolov7tiny_coco_nn(node_input=rgb_sensor.preview, pipeline=pipeline)6061 create_output(pipeline=pipeline, node_input=rgb_h264_encoder.bitstream, stream_name="h264_frames")62 create_output(pipeline=pipeline, node_input=rgb_mjpeg_encoder.bitstream, stream_name="mjpeg_frames")63 create_output(pipeline=pipeline, node_input=object_detection_nn.out, stream_name="object_detections")64 return pipeline656667 def manage_device(self, device: dai.Device):68 log.info(f"{device.getMxId()} creating output queues...")69 h264_frames_queue = device.getOutputQueue(name="h264_frames", maxSize=10, blocking=True)70 mjpeg_frames_queue = device.getOutputQueue(name="mjpeg_frames", maxSize=10, blocking=True)71 object_detections_queue = device.getOutputQueue(name="object_detections", maxSize=10, blocking=True)7273 log.info(f"{device.getMxId()} Application started")74 while rh.app_is_running() and self.device_is_running:75 h264_frame = h264_frames_queue.get()76 mjpeg_frame = mjpeg_frames_queue.get()77 object_detections = object_detections_queue.get()78 self.business_logic.process_pipeline_outputs(h264_frame=h264_frame, mjpeg_frame=mjpeg_frame, object_detections=object_detections)79 time.sleep(0.001)808182 def on_configuration_changed(self, configuration_changes: dict) -> None:83 log.info(f"CONFIGURATION CHANGES: {configuration_changes}")84 if "fps" in configuration_changes:85 log.info(f"FPS change needs a new pipeline. Restarting OAK device...")86 self.restart_device()87888990def create_rgb_sensor(pipeline: dai.Pipeline,91 fps: int = 30,92 resolution: dai.ColorCameraProperties.SensorResolution = dai.ColorCameraProperties.SensorResolution.THE_1080_P,93 preview_resolution: tuple = (1280, 720),94 ) -> dai.node.ColorCamera:95 node = pipeline.createColorCamera()96 node.setBoardSocket(dai.CameraBoardSocket.CAM_A)97 node.setInterleaved(False)98 node.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)99 node.setPreviewNumFramesPool(4)100 node.setPreviewSize(*preview_resolution)101 node.setVideoSize(1920, 1080)102 node.setResolution(resolution)103 node.setFps(fps)104 return node105106107def create_h264_encoder(node_input: dai.Node.Output, pipeline: dai.Pipeline, fps: int = 30):108 rh_encoder = pipeline.createVideoEncoder()109 rh_encoder_profile = dai.VideoEncoderProperties.Profile.H264_MAIN110 rh_encoder.setDefaultProfilePreset(fps, rh_encoder_profile)111 rh_encoder.input.setQueueSize(2)112 rh_encoder.input.setBlocking(False)113 rh_encoder.setKeyframeFrequency(fps)114 rh_encoder.setRateControlMode(dai.VideoEncoderProperties.RateControlMode.CBR)115 rh_encoder.setNumFramesPool(3)116 node_input.link(rh_encoder.input)117 return rh_encoder118119120def create_mjpeg_encoder(node_input: dai.Node.Output, pipeline: dai.Pipeline, fps: int = 30, quality: int = 100):121 encoder = pipeline.createVideoEncoder()122 encoder_profile = dai.VideoEncoderProperties.Profile.MJPEG123 encoder.setDefaultProfilePreset(fps, encoder_profile)124 encoder.setQuality(quality)125 node_input.link(encoder.input)126 return encoder127128129def create_yolov7tiny_coco_nn(node_input: dai.Node.Output, pipeline: dai.Pipeline) -> dai.node.YoloDetectionNetwork:130 model = "yolov7tiny_coco_640x352"131 node = pipeline.createYoloDetectionNetwork()132 blob = dai.OpenVINO.Blob(blobconverter.from_zoo(name=model, zoo_type="depthai", shaves=6))133 node.setBlob(blob)134 node_input.link(node.input)135 node.input.setBlocking(False)136 # Yolo specific parameters137 node.setConfidenceThreshold(0.5)138 node.setNumClasses(80)139 node.setCoordinateSize(4)140 node.setAnchors([12.0, 16.0, 19.0, 36.0, 40.0, 28.0, 36.0, 75.0, 76.0, 55.0, 72.0, 146.0, 142.0, 110.0, 192.0, 243.0, 459.0, 401.0])141 node.setAnchorMasks({142 "side80": [0, 1, 2],143 "side40": [3, 4, 5],144 "side20": [6, 7, 8]145 })146 node.setIouThreshold(0.5)147 return node148149150def create_output(pipeline, node_input: dai.Node.Output, stream_name: str):151 xout = pipeline.createXLinkOut()152 xout.setStreamName(stream_name)153 node_input.link(xout.input)154155156if __name__ == "__main__":157 app = Application()158 app.run()159160161162163164165166167168169170171172173174