Pipeline
Source code
Python
C++
Python
PythonGitHub
1#!/usr/bin/env python3
2
3import depthai as dai
4from argparse import ArgumentParser
5
6parser = ArgumentParser()
7parser.add_argument("--webSocketPort", type=int, default=8765)
8parser.add_argument("--httpPort", type=int, default=8082)
9
10args = parser.parse_args()
11
12remoteConnector = dai.RemoteConnection(webSocketPort=args.webSocketPort, httpPort=args.httpPort)
13ENCODER_PROFILE = dai.VideoEncoderProperties.Profile.MJPEG
14class ImgAnnotationsGenerator(dai.node.ThreadedHostNode):
15 def __init__(self):
16 super().__init__()
17 self.inputDet = self.createInput()
18 self.output = self.createOutput()
19
20 def setLabelMap(self, labelMap):
21 self.labelMap = labelMap
22 def run(self):
23 while self.mainLoop():
24 nnData = self.inputDet.get()
25 detections = nnData.detections
26 imgAnnt = dai.ImgAnnotations()
27 imgAnnt.setTimestamp(nnData.getTimestamp())
28 imgAnnt.setTransformation(nnData.getTransformation())
29 annotation = dai.ImgAnnotation()
30 for detection in detections:
31 pointsAnnotation = dai.PointsAnnotation()
32 pointsAnnotation.type = dai.PointsAnnotationType.LINE_STRIP
33 pointsAnnotation.points = dai.VectorPoint2f([
34 dai.Point2f(detection.xmin, detection.ymin),
35 dai.Point2f(detection.xmax, detection.ymin),
36 dai.Point2f(detection.xmax, detection.ymax),
37 dai.Point2f(detection.xmin, detection.ymax),
38 ])
39 outlineColor = dai.Color(1.0, 0.5, 0.5, 1.0)
40 pointsAnnotation.outlineColor = outlineColor
41 fillColor = dai.Color(0.5, 1.0, 0.5, 0.5)
42 pointsAnnotation.fillColor = fillColor
43 pointsAnnotation.thickness = 2.0
44 text = dai.TextAnnotation()
45 text.position = dai.Point2f(detection.xmin, detection.ymin)
46 text.text = f"{self.labelMap[detection.label]} {int(detection.confidence * 100)}%"
47 text.fontSize = 50.5
48 textColor = dai.Color(0.5, 0.5, 1.0, 1.0)
49 text.textColor = textColor
50 backgroundColor = dai.Color(1.0, 1.0, 0.5, 1.0)
51 text.backgroundColor = backgroundColor
52 annotation.points.append(pointsAnnotation)
53 annotation.texts.append(text)
54
55 imgAnnt.annotations.append(annotation)
56 self.output.send(imgAnnt)
57
58# Create pipeline
59with dai.Pipeline() as pipeline:
60 cameraNode = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A)
61 detectionNetwork = pipeline.create(dai.node.DetectionNetwork).build(
62 cameraNode, dai.NNModelDescription("yolov6-nano")
63 )
64 imageAnnotationsGenerator = pipeline.create(ImgAnnotationsGenerator)
65 outputToEncode = cameraNode.requestOutput((1920, 1440), type=dai.ImgFrame.Type.NV12)
66 encoder = pipeline.create(dai.node.VideoEncoder)
67 encoder.setDefaultProfilePreset(30, ENCODER_PROFILE)
68 outputToEncode.link(encoder.input)
69
70
71 detectionNetwork.out.link(imageAnnotationsGenerator.inputDet)
72 labelMap = detectionNetwork.getClasses()
73 imageAnnotationsGenerator.setLabelMap(labelMap)
74
75 # Add the remote connector topics
76 remoteConnector.addTopic("encoded", encoder.out, "images")
77 remoteConnector.addTopic("detections", detectionNetwork.out, "images")
78 remoteConnector.addTopic("annotations", imageAnnotationsGenerator.output, "images")
79
80 pipeline.start()
81
82 # Register the pipeline with the remote connector
83 remoteConnector.registerPipeline(pipeline)
84
85 while pipeline.isRunning():
86 if remoteConnector.waitKey(1) == ord("q"):
87 pipeline.stop()
88 breakNeed assistance?
Head over to Discussion Forum for technical support or any other questions you might have.