Threaded Host Nodes
This guide covers the implementation of a custom DepthAI pipeline using three custom nodes:TestSource
, TestPassthrough
, and TestSink
. These nodes demonstrate basic data flow in a DepthAI pipeline, where buffers are generated by the source, passed through a passthrough node, and then received by a sink node.Overview
The pipeline consists of three custom nodes:- TestSource: Generates and sends buffers at regular intervals.
- TestPassthrough: Receives buffers from the source and forwards them to the sink.
- TestSink: Receives buffers from the passthrough node and processes them.
Demo output
Command Line
1Hello, this is passthrough
2Hello, this is passthroughSubnodes_subnode1
3Hello, this is passthroughSubnodes_subnode2
4Hello, this is sink1
5Hello, this is sink2
6source node is sending a buffer!
7The passthrough node received a buffer!
8The passthrough node received a buffer!
9The passthrough node received a buffer!
10sink2 node received a buffer!
11sink1 node received a buffer!
12Pipeline is running...
13source node is sending a buffer!
14The passthrough node received a buffer!
15The passthrough node received a buffer!
16sink1 node received a buffer!
17The passthrough node received a buffer!
18sink2 node received a buffer!
19...
Pipeline
Source Code
Python
C++
Python
PythonGitHub
1import depthai as dai
2import time
3
4class TestPassthrough(dai.node.ThreadedHostNode):
5 def __init__(self, name: str):
6 super().__init__()
7 self.name = name
8 self.input = self.createInput()
9 self.output = self.createOutput()
10
11 # Possible API 1:
12 self.input.setPossibleDatatypes([dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)])
13 self.output.setPossibleDatatypes([dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)])
14
15 # Possible API 2:
16 self.input.setPossibleDatatypes([
17 (dai.DatatypeEnum.ImgFrame, True),
18 (dai.DatatypeEnum.Buffer, True)
19 ])
20 self.output.setPossibleDatatypes([
21 (dai.DatatypeEnum.ImgFrame, True),
22 (dai.DatatypeEnum.Buffer, True)
23 ])
24
25
26
27 def onStart(self):
28 print("Hello, this is", self.name)
29
30 def onStop(self):
31 print("Goodbye from", self.name)
32
33 def run(self):
34 while self.isRunning():
35 buffer = self.input.get()
36 print("The passthrough node received a buffer!")
37 self.output.send(buffer)
38
39class TestSink(dai.node.ThreadedHostNode):
40 def __init__(self, name: str):
41 super().__init__()
42 self.input = self.createInput()
43
44 self.name = name
45
46 def onStart(self):
47 print("Hello, this is", self.name)
48
49 def run(self):
50 while self.isRunning():
51 buffer = self.input.get()
52 del buffer
53 print(f"{self.name} node received a buffer!")
54
55class TestSource(dai.node.ThreadedHostNode):
56 def __init__(self, name: str):
57 super().__init__()
58 self.name = name
59 self.output = self.createOutput()
60
61 def run(self):
62 while self.isRunning():
63 buffer = dai.Buffer()
64 print(f"{self.name} node is sending a buffer!")
65 self.output.send(buffer)
66 time.sleep(1)
67
68class TestPassthroughSubnodes(dai.node.ThreadedHostNode):
69 def __init__(self, name: str):
70 super().__init__()
71 self.passthrough1 = self.createSubnode(TestPassthrough, name + "_subnode1")
72 self.passthrough2 = self.createSubnode(TestPassthrough, name + "_subnode2")
73 self.input = self.passthrough1.input
74 self.output = self.passthrough2.output
75
76 # Link the two subnodes together
77 self.passthrough1.output.link(self.passthrough2.input)
78
79 def run(self):
80 while self.isRunning():
81 buffer = self.input.get()
82 self.output.send(buffer)
83
84with dai.Pipeline(False) as p:
85 """
86 Create the following pipeline:
87 source -> passthrough -> sink1
88 |
89 -> passthroughSubnodes -> sink2
90 """
91
92 # Create nodes
93 source = TestSource("source")
94 passthrough = TestPassthrough("passthrough")
95 passthroughSubnodes = TestPassthroughSubnodes("passthroughSubnodes")
96 sink1 = TestSink("sink1")
97 sink2 = TestSink("sink2")
98
99 # Link nodes
100 source.output.link(passthrough.input)
101 source.output.link(passthroughSubnodes.input)
102 passthrough.output.link(sink1.input)
103 passthroughSubnodes.output.link(sink2.input)
104
105 p.start()
106 while p.isRunning():
107 time.sleep(1)
108 print("Pipeline is running...")
Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.