ON THIS PAGE

  • Host Nodes
  • Running on host
  • Threaded Host Nodes
  • Boilerplate
  • Linking multiple host nodes together
  • Host Nodes
  • Boilerplate
  • Syncing
  • Additional methods

Host Nodes

Host nodes are custom nodes, introduced in DepthAI V3 that run on the host machine and interact with the DepthAI pipeline. They can be used to perform various tasks such as data processing, synchronization, and displying. Host nodes are implemented using the HostNode and ThreadedHostNode classes, which provide a framework for creating custom host-side logic within the pipeline.DepthAI comes with several predefined host nodes, such as BasaltVIO, RTABMapSlam, and Record/ Replay. These nodes offer powerful out-of-the-box functionality. However, one of the most exciting aspects of host nodes is the ability for developers to create their own custom nodes. These custom nodes can be designed to capture frames, manipulate data, decode neural network outputs, and more.Moreover, developers can package their custom host nodes as Python packages and distribute them via pip. This makes it easier to share and reuse custom nodes across different projects and among the community. For example, a custom node like geaxgx's hand tracker could be packaged and published to PyPI, allowing others to easily install and integrate it into their DepthAI pipelines with a simple pip install.

Running on host

  • on RVC2 devices (OAK-D, OAK-1) - the host is the device connected to the DepthAI device via USB. This means that the host nodes will run on the device connected to the DepthAI device (usually a PC).
  • on RVC4 devices (OAK4-S, OAK4-D) - host side depends on the mode the device is running in. In peripheral mode, the host is the device connected to the DepthAI device via USB - the same as RVC2 devices. In standalone mode, the OAK4 device itself is the host (running Linux) and the host nodes run on the device CPU (see image below).
For more info, check out Standalone vs peripheral mode article.

Types of host nodes

  • Threaded Host Nodes - Standard threaded host nodes
  • Host Nodes - Threaded host nodes with additional mechanisms for processing and synchronizing data on the host

Threaded Host Nodes

ThreadedHostNode is the base class for host nodes that require multithreading. This class allows the node to run its logic asynchronously in its own dedicated thread, making it suitable for operations that should not block the main execution flow of the pipeline.

Boilerplate

To create a custom threaded host node, you need to derive a new class from ThreadedHostNode and implement the following methods:
  • __init__: Constructor method to initialize the node. This is where you can create input and output queues.
  • The run() method is the main execution loop of the node, where the custom logic is implemented.
  • The onStart() and onStop() methods are called when the node starts and stops, respectively. The methods are optional and can be used to perform any initialization or cleanup tasks.

Code example

Example of a simple threaded host node from host camera example.
Python
1class HostCamera(dai.node.ThreadedHostNode):
2    def __init__(self):
3        super().__init__() # Call the base class constructor
4        self.output = self.createOutput() # Create an output queue - this will send ImgFrame messages
5
6    def run(self): # The main execution loop of the node
7        cap = cv2.VideoCapture(0) # Create a VideoCapture object for the host camera
8        if not cap.isOpened():
9            p.stop()
10            raise RuntimeError("Error: Couldn't open host camera")
11        while self.isRunning(): # Loop until the node is stopped
12            ret, frame = cap.read() # Read the frame from the camera
13            if not ret:
14                break
15            # Create an ImgFrame message and set its data, width, height, and type
16            imgFrame = dai.ImgFrame()
17            imgFrame.setData(frame)
18            imgFrame.setWidth(frame.shape[1])
19            imgFrame.setHeight(frame.shape[0])
20            imgFrame.setType(dai.ImgFrame.Type.BGR888i)
21            # Send the message to the output queue
22            self.output.send(imgFrame)
23            # Wait for the next frame
24            time.sleep(0.1)
Once the node is defined, it can be added to the pipeline using pipeline.create().
Python
1with dai.Pipeline() as p:
2    hostCamera = p.create(HostCamera) # Create an instance of the custom host camera node inside the pipeline
3    camQueue = hostCamera.output.createOutputQueue() # Create an output queue to receive the frames
4
5    p.start() # Start the pipeline, which will also start the host camera, implicitly running the `onStart()`, as well as the `run()` method
6    while p.isRunning():
7        image : dai.ImgFrame = camQueue.get() # Get the frame from the output queue
8        cv2.imshow("HostCamera", image.getCvFrame())
9        key = cv2.waitKey(1)
10        if key == ord('q'):
11            p.stop() # Stop the pipeline and the host camera, implicitly running the `onStop()` method
12            break

Linking multiple host nodes together

Just like regular old device nodes, host nodes can be linked together in a pipeline. Here's an example of a pipeline with two custom host nodes:

Code example

The code shown is a simplified version of threaded host nodes example.
Python
1class TestSink(dai.node.ThreadedHostNode): # Custom threaded host node for receiving data
2    def __init__(self):
3        super().__init__()
4        self.input = self.createInput() # Only an input queue is needed as this node receives data
5
6    def onStart(self):
7        print("Hello, this is", __class__.__name__) # Print a message when the node starts
8
9    def run(self):
10        while self.isRunning():
11            buffer = self.input.get() # Get a buffer from the input queue
12            print("The sink node received a buffer!")
13
14class TestSource(dai.node.ThreadedHostNode): # Custom threaded host node for sending data
15    def __init__(self):
16        super().__init__()
17        self.output = self.createOutput() # Only an output queue is needed as this node sends data
18
19    def run(self):
20        while self.isRunning():
21            buffer = dai.Buffer() # Create a buffer
22            print("The source node is sending a buffer!")
23            self.output.send(buffer) # Send the buffer to the output queue
24            time.sleep(1)
With the custom nodes defined, they can be added to the pipeline and linked together:
Python
1with dai.Pipeline() as p:
2    source = TestSource() # Create an instance of the source node
3    sink = TestSink() # Create an instance of the sink node
4    source.output.link(sink.input) # Link the output of the source node to the input of the sink node
5    p.start()
6    while p.isRunning():
7        time.sleep(1)
8        print("Pipeline is running...")

Host Nodes

HostNode is a more specialized class built on top of ThreadedHostNode. It provides additional mechanisms for processing and synchronizing data on the host. This class is designed to handle more complex scenarios where host-side data processing needs to be tightly integrated with the DepthAI device pipeline, such as synchronizing multiple data streams or displaying processed data.

Boilerplate

To create a custom host node, you need to derive a new class from HostNode and implement the following methods:
  • process method is called whenever a message is received on the input queue. This is where the custom logic is implemented.
  • Both onStart and onStop methods are available (as HostNode inherits from ThreadedHostNode) and can be used to perform any initialization or cleanup tasks.

Code example

Example of a simple host node from host display.
Python
1class HostDisplay(dai.node.HostNode):
2    def build(self, frameOutput: dai.Node.Output):
3        self.link_args(frameOutput) # Has to match the inputs to the `process` method
4
5        # This sends all the processing to the pipeline where it's executed by the `pipeline.runTasks()` or implicitly by `pipeline.run()` method.
6        # It's needed as the GUI window needs to be updated in the main thread, and the `process` method is by default called in a separate thread.
7        self.sendProcessingToPipeline(True)
8        return self
9
10    def onStart(self) -> None: # Optional method
11        print("HostDisplay started")
12
13    def process(self, message: dai.ImgFrame):
14        cv2.imshow("HostDisplay", message.getCvFrame())
15        key = cv2.waitKey(1)
16        if key == ord('q'):
17            print("Detected 'q' - stopping the pipeline...")
18            self.stopPipeline()
Once the node is defined, it can be added to the pipeline using pipeline.create().
Python
1p = dai.Pipeline()
2with p:
3    camera = p.create(dai.node.Camera).build()
4    hostDisplay = p.create(HostDisplay).build(camera.requestOutput((300, 300)))
5
6    p.run() # Will block until the pipeline is stopped by someone else (in this case it's the display node)

Syncing

HostNode provides implicit synchronization of messages sent via the input. This means that if multiple messages are sent to the input queue, they will be synced based on their timestamps.We have modified above example to demonstrate this feature:

Code example

Python
1class HostDisplay(dai.node.HostNode):
2
3    def build(self, *args):
4        rgb_frame, mono_frame = args # Unpack the input frames from the pipeline. In this case we expect two frames - RGB and mono
5        self.link_args(rgb_frame, mono_frame) # Has to match the inputs to the `process` method
6
7        # This sends all the processing to the pipeline where it's executed by the `pipeline.runTasks()` or implicitly by `pipeline.run()` method.
8        # It's needed as the GUI window needs to be updated in the main thread, and the `process` method is by default called in a separate thread.
9        self.sendProcessingToPipeline(True)
10        return self
11
12    def process(self, rgb_frame, mono_frame):
13        # Display each frame from the input nodes
14        cv2.imshow('rgb', rgb_frame.getCvFrame())
15        cv2.imshow('mono', mono_frame.getCvFrame())
16
17        key = cv2.waitKey(1)
18        if key == ord('q'):
19            print("Detected 'q' - stopping the pipeline...")
20            self.stopPipeline()
21
22p = dai.Pipeline()
23with p:
24    rgb_camera = p.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A) # Create a camera node for RGB camera
25    mono_camera = p.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) # Create a camera node for left mono camera
26
27    # Create an instance of the custom host display node and link it to both camera nodes
28    display = p.create(HostDisplay).build(rgb_camera.requestOutput((500, 500)), mono_camera.requestOutput((500, 500)))
29
30    p.run()  # Will block until the pipeline is stopped by someone else (in this case it's the display node)

Additional methods

  • runSyncingOnDevice() - This method can be used to run the process method on the device instead of the host. By default, the process method is executed on the host.
  • sendProcessingToPipeline() - This method can be used to send the processing to the pipeline. By default, the process method is executed in a separate thread. This is mandatory when the processing involves a GUI (like displaying frames using OpenCV) since GUI operations must be done in the main thread.

Need assistance?

Head over to Discussion Forum for technical support or any other questions you might have.