Software Stack
DepthAI

ON THIS PAGE

  • Visualizer Encoded
  • Setup
  • Pipeline
  • Source code

Visualizer Encoded

The example utilizes DepthAI to create a pipeline that performs YOLOv6-Nano object detection, generates custom image annotations with bounding boxes and labels, encodes high-resolution (1920x1440) camera frames as MJPEG, and streams the results over to a remote connection for visualization.

Setup

This example requires the DepthAI v3 API, see installation instructions.

Pipeline

Source code

Python
C++

Python

Python
GitHub
1#!/usr/bin/env python3
2
3import depthai as dai
4from argparse import ArgumentParser
5
6parser = ArgumentParser()
7parser.add_argument("--webSocketPort", type=int, default=8765)
8parser.add_argument("--httpPort", type=int, default=8080)
9
10args = parser.parse_args()
11
12remoteConnector = dai.RemoteConnection(webSocketPort=args.webSocketPort, httpPort=args.httpPort)
13ENCODER_PROFILE = dai.VideoEncoderProperties.Profile.MJPEG
14class ImgAnnotationsGenerator(dai.node.ThreadedHostNode):
15    def __init__(self):
16        super().__init__()
17        self.inputDet = self.createInput()
18        self.output = self.createOutput()
19
20    def setLabelMap(self, labelMap):
21        self.labelMap = labelMap
22    def run(self):
23        while self.isRunning():
24            nnData = self.inputDet.get()
25            detections = nnData.detections
26            imgAnnt = dai.ImgAnnotations()
27            imgAnnt.setTimestamp(nnData.getTimestamp())
28            annotation = dai.ImgAnnotation()
29            for detection in detections:
30                pointsAnnotation = dai.PointsAnnotation()
31                pointsAnnotation.type = dai.PointsAnnotationType.LINE_STRIP
32                pointsAnnotation.points = dai.VectorPoint2f([
33                    dai.Point2f(detection.xmin, detection.ymin),
34                    dai.Point2f(detection.xmax, detection.ymin),
35                    dai.Point2f(detection.xmax, detection.ymax),
36                    dai.Point2f(detection.xmin, detection.ymax),
37                ])
38                outlineColor = dai.Color(1.0, 0.5, 0.5, 1.0)
39                pointsAnnotation.outlineColor = outlineColor
40                fillColor = dai.Color(0.5, 1.0, 0.5, 0.5)
41                pointsAnnotation.fillColor = fillColor
42                pointsAnnotation.thickness = 2.0
43                text = dai.TextAnnotation()
44                text.position = dai.Point2f(detection.xmin, detection.ymin)
45                text.text = f"{self.labelMap[detection.label]} {int(detection.confidence * 100)}%"
46                text.fontSize = 50.5
47                textColor = dai.Color(0.5, 0.5, 1.0, 1.0)
48                text.textColor = textColor
49                backgroundColor = dai.Color(1.0, 1.0, 0.5, 1.0)
50                text.backgroundColor = backgroundColor
51                annotation.points.append(pointsAnnotation)
52                annotation.texts.append(text)
53
54            imgAnnt.annotations.append(annotation)
55            self.output.send(imgAnnt)
56
57# Create pipeline
58with dai.Pipeline() as pipeline:
59    cameraNode = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A)
60    detectionNetwork = pipeline.create(dai.node.DetectionNetwork).build(
61        cameraNode, dai.NNModelDescription("yolov6-nano")
62    )
63    imageAnnotationsGenerator = pipeline.create(ImgAnnotationsGenerator)
64    outputToEncode = cameraNode.requestOutput((1920, 1440), type=dai.ImgFrame.Type.NV12)
65    encoder = pipeline.create(dai.node.VideoEncoder)
66    encoder.setDefaultProfilePreset(30, ENCODER_PROFILE)
67    outputToEncode.link(encoder.input)
68
69
70    detectionNetwork.out.link(imageAnnotationsGenerator.inputDet)
71    labelMap = detectionNetwork.getClasses()
72    imageAnnotationsGenerator.setLabelMap(labelMap)
73
74    # Add the remote connector topics
75    remoteConnector.addTopic("encoded", encoder.out, "images")
76    remoteConnector.addTopic("detections", detectionNetwork.out, "images")
77    remoteConnector.addTopic("annotations", imageAnnotationsGenerator.output, "images")
78
79    pipeline.start()
80
81    # Register the pipeline with the remote connector
82    remoteConnector.registerPipeline(pipeline)
83
84    while pipeline.isRunning():
85        if remoteConnector.waitKey(1) == ord("q"):
86            pipeline.stop()
87            break

Need assistance?

Head over to Discussion Forum for technical support or any other questions you might have.