Script change pipeline flow
This example shows how you can change the flow of data inside your pipeline in runtime using the Script node. In this example, we send a message from the host to choose whether we want Script node to forwards color frame to the MobileNetDetectionNetwork.Demo
Pipeline Graph
Setup
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the scriptCommand Line
1git clone https://github.com/luxonis/depthai-python.git
2cd depthai-python/examples
3python3 install_requirements.py
Source code
Python
C++
Python
PythonGitHub
1#!/usr/bin/env python3
2import depthai as dai
3import cv2
4from pathlib import Path
5import numpy as np
6
7parentDir = Path(__file__).parent
8nnPath = str((parentDir / Path('../models/mobilenet-ssd_openvino_2021.4_5shave.blob')).resolve().absolute())
9
10pipeline = dai.Pipeline()
11
12cam = pipeline.createColorCamera()
13cam.setBoardSocket(dai.CameraBoardSocket.CAM_A)
14cam.setInterleaved(False)
15cam.setIspScale(2,3)
16cam.setVideoSize(720,720)
17cam.setPreviewSize(300,300)
18
19xoutRgb = pipeline.create(dai.node.XLinkOut)
20xoutRgb.setStreamName('rgb')
21cam.video.link(xoutRgb.input)
22
23script = pipeline.createScript()
24
25xin = pipeline.create(dai.node.XLinkIn)
26xin.setStreamName('in')
27xin.out.link(script.inputs['toggle'])
28
29cam.preview.link(script.inputs['rgb'])
30script.setScript("""
31 toggle = False
32 while True:
33 msg = node.io['toggle'].tryGet()
34 if msg is not None:
35 toggle = msg.getData()[0]
36 node.warn('Toggle! Perform NN inferencing: ' + str(toggle))
37
38 frame = node.io['rgb'].get()
39
40 if toggle:
41 node.io['nn'].send(frame)
42""")
43
44nn = pipeline.create(dai.node.MobileNetDetectionNetwork)
45nn.setBlobPath(nnPath)
46script.outputs['nn'].link(nn.input)
47
48xoutNn = pipeline.create(dai.node.XLinkOut)
49xoutNn.setStreamName('nn')
50nn.out.link(xoutNn.input)
51
52# Connect to device with pipeline
53with dai.Device(pipeline) as device:
54 inQ = device.getInputQueue("in")
55 qRgb = device.getOutputQueue("rgb")
56 qNn = device.getOutputQueue("nn")
57
58 runNn = False
59
60 def frameNorm(frame, bbox):
61 normVals = np.full(len(bbox), frame.shape[0])
62 normVals[::2] = frame.shape[1]
63 return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
64
65 color = (255, 127, 0)
66 def drawDetections(frame, detections):
67 for detection in detections:
68 bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
69 cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
70 cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
71
72
73 while True:
74 frame = qRgb.get().getCvFrame()
75
76 if qNn.has():
77 detections = qNn.get().detections
78 drawDetections(frame, detections)
79
80 cv2.putText(frame, f"NN inferencing: {runNn}", (20,20), cv2.FONT_HERSHEY_TRIPLEX, 0.7, color)
81 cv2.imshow('Color frame', frame)
82
83 key = cv2.waitKey(1)
84 if key == ord('q'):
85 break
86 elif key == ord('t'):
87 runNn = not runNn
88 print(f"{'Enabling' if runNn else 'Disabling'} NN inferencing")
89 buf = dai.Buffer()
90 buf.setData(runNn)
91 inQ.send(buf)
Pipeline
Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.