Software Stack
DepthAI
  • DepthAI Components
    • AprilTags
    • Benchmark
    • Camera
    • DetectionNetwork
    • EdgeDetector
    • Events
    • FeatureTracker
    • HostNodes
    • ImageAlign
    • ImageManip
    • IMU
    • Misc
    • Modelzoo
    • NeuralNetwork
    • RecordReplay
    • RGBD
    • Script
    • SpatialDetectionNetwork
    • SpatialLocationCalculator
    • StereoDepth
    • Sync
    • SystemLogger
    • VideoEncoder
    • Visualizer
    • Warp
    • RVC2-specific
  • Advanced Tutorials
  • API Reference
  • Tools

ON THIS PAGE

  • Threaded Host Nodes
  • Overview
  • Demo output
  • Pipeline
  • Source Code

Threaded Host Nodes

This guide covers the implementation of a custom DepthAI pipeline using three custom nodes: TestSource, TestPassthrough, and TestSink. These nodes demonstrate basic data flow in a DepthAI pipeline, where buffers are generated by the source, passed through a passthrough node, and then received by a sink node.

Overview

The pipeline consists of three custom nodes:
  • TestSource: Generates and sends buffers at regular intervals.
  • TestPassthrough: Receives buffers from the source and forwards them to the sink.
  • TestSink: Receives buffers from the passthrough node and processes them.
These nodes demonstrate the basic structure of threaded host nodes in DepthAI, which can be used to implement custom logic within a pipeline.

Demo output

Command Line
1Hello, this is passthrough
2Hello, this is passthroughSubnodes_subnode1
3Hello, this is passthroughSubnodes_subnode2
4Hello, this is sink1
5Hello, this is sink2
6source node is sending a buffer!
7The passthrough node received a buffer!
8The passthrough node received a buffer!
9The passthrough node received a buffer!
10sink2 node received a buffer!
11sink1 node received a buffer!
12Pipeline is running...
13source node is sending a buffer!
14The passthrough node received a buffer!
15The passthrough node received a buffer!
16sink1 node received a buffer!
17The passthrough node received a buffer!
18sink2 node received a buffer!
19...

Pipeline

Source Code

Python
C++

Python

Python
GitHub
1import depthai as dai
2import time
3
4class TestPassthrough(dai.node.ThreadedHostNode):
5    def __init__(self, name: str):
6        super().__init__()
7        self.name = name
8        self.input = self.createInput()
9        self.output = self.createOutput()
10
11        # Possible API 1:
12        self.input.setPossibleDatatypes([dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)])
13        self.output.setPossibleDatatypes([dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)])
14
15        # Possible API 2:
16        self.input.setPossibleDatatypes([
17            (dai.DatatypeEnum.ImgFrame, True),
18            (dai.DatatypeEnum.Buffer, True)
19        ])
20        self.output.setPossibleDatatypes([
21            (dai.DatatypeEnum.ImgFrame, True),
22            (dai.DatatypeEnum.Buffer, True)
23        ])
24
25
26
27    def onStart(self):
28        print("Hello, this is", self.name)
29
30    def onStop(self):
31        print("Goodbye from", self.name)
32
33    def run(self):
34        while self.isRunning():
35            buffer = self.input.get()
36            print("The passthrough node received a buffer!")
37            self.output.send(buffer)
38
39class TestSink(dai.node.ThreadedHostNode):
40    def __init__(self, name: str):
41        super().__init__()
42        self.input = self.createInput()
43
44        self.name = name
45
46    def onStart(self):
47        print("Hello, this is", self.name)
48
49    def run(self):
50        while self.isRunning():
51            buffer = self.input.get()
52            del buffer
53            print(f"{self.name} node received a buffer!")
54
55class TestSource(dai.node.ThreadedHostNode):
56    def __init__(self, name: str):
57        super().__init__()
58        self.name = name
59        self.output = self.createOutput()
60
61    def run(self):
62        while self.isRunning():
63            buffer = dai.Buffer()
64            print(f"{self.name} node is sending a buffer!")
65            self.output.send(buffer)
66            time.sleep(1)
67
68class TestPassthroughSubnodes(dai.node.ThreadedHostNode):
69    def __init__(self, name: str):
70        super().__init__()
71        self.passthrough1 = self.createSubnode(TestPassthrough, name + "_subnode1")
72        self.passthrough2 = self.createSubnode(TestPassthrough, name + "_subnode2")
73        self.input = self.passthrough1.input
74        self.output = self.passthrough2.output
75
76        # Link the two subnodes together
77        self.passthrough1.output.link(self.passthrough2.input)
78
79    def run(self):
80        while self.isRunning():
81            buffer = self.input.get()
82            self.output.send(buffer)
83
84with dai.Pipeline(False) as p:
85    """
86    Create the following pipeline:
87    source -> passthrough -> sink1
88           |
89           -> passthroughSubnodes -> sink2
90    """
91
92    # Create nodes
93    source = TestSource("source")
94    passthrough = TestPassthrough("passthrough")
95    passthroughSubnodes = TestPassthroughSubnodes("passthroughSubnodes")
96    sink1 = TestSink("sink1")
97    sink2 = TestSink("sink2")
98
99    # Link nodes
100    source.output.link(passthrough.input)
101    source.output.link(passthroughSubnodes.input)
102    passthrough.output.link(sink1.input)
103    passthroughSubnodes.output.link(sink2.input)
104
105    p.start()
106    while p.isRunning():
107        time.sleep(1)
108        print("Pipeline is running...")

Need assistance?

Head over to Discussion Forum for technical support or any other questions you might have.