# Host Nodes

Host nodes are custom nodes, introduced in DepthAI V3 that run on the host machine and interact with the DepthAI pipeline. They
can be used to perform various tasks such as data processing, synchronization, and displying. Host nodes are implemented using the
HostNode and ThreadedHostNode classes, which provide a framework for creating custom host-side logic within the pipeline.

DepthAI comes with several predefined host nodes, such as
[BasaltVIO](https://docs.luxonis.com/software-v3/depthai/depthai-components/host_nodes/basalt_vio.md),
[RTABMapSLAM](https://docs.luxonis.com/software-v3/depthai/depthai-components/host_nodes/rtabmap_slam.md), and
[Record](https://docs.luxonis.com/software-v3/depthai/depthai-components/host_nodes/record_video.md)/
[Replay](https://docs.luxonis.com/software-v3/depthai/depthai-components/host_nodes/replay_video.md). These nodes offer powerful
out-of-the-box functionality. However, one of the most exciting aspects of host nodes is the ability for developers to create
their own custom nodes. These custom nodes can be designed to capture frames, manipulate data, decode neural network outputs, and
more.

Moreover, developers can package their custom host nodes as Python packages and distribute them via pip. This makes it easier to
share and reuse custom nodes across different projects and among the community. For example, a custom node like [geaxgx's hand
tracker](https://github.com/geaxgx/depthai_hand_tracker) could be packaged and published to PyPI, allowing others to easily
install and integrate it into their DepthAI pipelines with a simple pip install.

## Running on host

 * on RVC2 devices (OAK-D, OAK-1) - the host is the device connected to the DepthAI device via USB. This means that the host nodes
   will run on the device connected to the DepthAI device (usually a PC).
 * on RVC4 devices (OAK4-S, OAK4-D) - host side depends on the mode the device is running in. In peripheral mode, the host is the
   device connected to the DepthAI device via USB - the same as RVC2 devices. In standalone mode, the OAK4 device itself is the
   host (running Linux) and the host nodes run on the device CPU (see image below).

For more info, check out [Standalone vs peripheral
mode](https://docs.luxonis.com/hardware/platform/deploy/oak4-deployment-guide/oak4-getting-started.md) article.

### Types of host nodes

 * [Threaded Host Node](#Host%2520Nodes-Threaded%2520Host%2520Node) - Standard threaded host nodes

 * [Host Node](#Host%2520Nodes-Host%2520Node) - Threaded host nodes with additional mechanisms for processing and synchronizing
   data on the host

## Threaded Host Node

ThreadedHostNode is the base class for host nodes that require multithreading. This class allows the node to run its logic
asynchronously in its own dedicated thread, making it suitable for operations that should not block the main execution flow of the
pipeline.

### Boilerplate

To create a custom threaded host node, you need to derive a new class from ThreadedHostNode and implement the following methods:

 * __init__: Constructor method to initialize the node. This is where you can create input and output queues.

 * The run() method is the main execution loop of the node, where the custom logic is implemented.

 * The onStart() and onStop() methods are called when the node starts and stops, respectively. The methods are optional and can be
   used to perform any initialization or cleanup tasks.

### Code example

Example of a simple threaded host node from [host
camera](https://docs.luxonis.com/software-v3/depthai/examples/host_nodes/host_camera.md) example.

```python
class HostCamera(dai.node.ThreadedHostNode):
    def __init__(self):
        super().__init__() # Call the base class constructor
        self.output = self.createOutput() # Create an output queue - this will send ImgFrame messages

    def run(self): # The main execution loop of the node
        cap = cv2.VideoCapture(0) # Create a VideoCapture object for the host camera
        if not cap.isOpened():
            p.stop()
            raise RuntimeError("Error: Couldn't open host camera")
        while self.isRunning(): # Loop until the node is stopped
            ret, frame = cap.read() # Read the frame from the camera
            if not ret:
                break
            # Create an ImgFrame message and set its data, width, height, and type
            imgFrame = dai.ImgFrame()
            imgFrame.setData(frame)
            imgFrame.setWidth(frame.shape[1])
            imgFrame.setHeight(frame.shape[0])
            imgFrame.setType(dai.ImgFrame.Type.BGR888i)
            # Send the message to the output queue
            self.output.send(imgFrame)
            # Wait for the next frame
            time.sleep(0.1)
```

Once the node is defined, it can be added to the pipeline using pipeline.create().

```python
with dai.Pipeline() as p:
    hostCamera = p.create(HostCamera) # Create an instance of the custom host camera node inside the pipeline
    camQueue = hostCamera.output.createOutputQueue() # Create an output queue to receive the frames

    p.start() # Start the pipeline, which will also start the host camera, implicitly running the `onStart()`, as well as the `run()` method
    while p.isRunning():
        image : dai.ImgFrame = camQueue.get() # Get the frame from the output queue
        cv2.imshow("HostCamera", image.getCvFrame())
        key = cv2.waitKey(1)
        if key == ord('q'):
            p.stop() # Stop the pipeline and the host camera, implicitly running the `onStop()` method
            break
```

### Linking multiple host nodes together

Just like regular old device nodes, host nodes can be linked together in a pipeline. Here's an example of a pipeline with two
custom host nodes:

### Code example

The code shown is a simplified version of [threaded host
nodes](https://docs.luxonis.com/software-v3/depthai/examples/host_nodes/threaded_host_nodes.md) example.

```python
class TestSink(dai.node.ThreadedHostNode): # Custom threaded host node for receiving data
    def __init__(self):
        super().__init__()
        self.input = self.createInput() # Only an input queue is needed as this node receives data

    def onStart(self):
        print("Hello, this is", __class__.__name__) # Print a message when the node starts

    def run(self):
        while self.isRunning():
            buffer = self.input.get() # Get a buffer from the input queue
            print("The sink node received a buffer!")

class TestSource(dai.node.ThreadedHostNode): # Custom threaded host node for sending data
    def __init__(self):
        super().__init__()
        self.output = self.createOutput() # Only an output queue is needed as this node sends data

    def run(self):
        while self.isRunning():
            buffer = dai.Buffer() # Create a buffer
            print("The source node is sending a buffer!")
            self.output.send(buffer) # Send the buffer to the output queue
            time.sleep(1)
```

With the custom nodes defined, they can be added to the pipeline and linked together:

```python
with dai.Pipeline() as p:
    source = TestSource() # Create an instance of the source node
    sink = TestSink() # Create an instance of the sink node
    source.output.link(sink.input) # Link the output of the source node to the input of the sink node
    p.start()
    while p.isRunning():
        time.sleep(1)
        print("Pipeline is running...")
```

## Host Node

HostNode is a more specialized class built on top of ThreadedHostNode. It provides additional mechanisms for processing and
synchronizing data on the host. This class is designed to handle more complex scenarios where host-side data processing needs to
be tightly integrated with the DepthAI device pipeline, such as synchronizing multiple data streams or displaying processed data.

### Boilerplate

To create a custom host node, you need to derive a new class from HostNode and implement the following methods:

 * process method is called whenever a message is received on the input queue. This is where the custom logic is implemented.
 * Both onStart and onStop methods are available (as HostNode inherits from ThreadedHostNode) and can be used to perform any
   initialization or cleanup tasks.

### Code example

Example of a simple host node from [host display](https://docs.luxonis.com/software-v3/depthai/examples/host_nodes/display.md).

```python
class HostDisplay(dai.node.HostNode):
    def build(self, frameOutput: dai.Node.Output):
        self.link_args(frameOutput) # Has to match the inputs to the `process` method

        # This sends all the processing to the pipeline where it's executed by the `pipeline.runTasks()` or implicitly by `pipeline.run()` method.
        # It's needed as the GUI window needs to be updated in the main thread, and the `process` method is by default called in a separate thread.
        self.sendProcessingToPipeline(True)
        return self

    def onStart(self) -> None: # Optional method
        print("HostDisplay started")

    def process(self, message: dai.ImgFrame):
        cv2.imshow("HostDisplay", message.getCvFrame())
        key = cv2.waitKey(1)
        if key == ord('q'):
            print("Detected 'q' - stopping the pipeline...")
            self.stopPipeline()
```

Once the node is defined, it can be added to the pipeline using pipeline.create().

```python
p = dai.Pipeline()
with p:
    camera = p.create(dai.node.Camera).build()
    hostDisplay = p.create(HostDisplay).build(camera.requestOutput((300, 300)))

    p.run() # Will block until the pipeline is stopped by someone else (in this case it's the display node)
```

### Syncing

HostNode provides implicit synchronization of messages sent via the input. This means that if multiple messages are sent to the
input queue, they will be synced based on their timestamps.

We have modified above example to demonstrate this feature:

### Code example

```python
class HostDisplay(dai.node.HostNode):

    def build(self, *args):
        rgb_frame, mono_frame = args # Unpack the input frames from the pipeline. In this case we expect two frames - RGB and mono
        self.link_args(rgb_frame, mono_frame) # Has to match the inputs to the `process` method

        # This sends all the processing to the pipeline where it's executed by the `pipeline.runTasks()` or implicitly by `pipeline.run()` method.
        # It's needed as the GUI window needs to be updated in the main thread, and the `process` method is by default called in a separate thread.
        self.sendProcessingToPipeline(True)
        return self

    def process(self, rgb_frame, mono_frame):
        # Display each frame from the input nodes
        cv2.imshow('rgb', rgb_frame.getCvFrame())
        cv2.imshow('mono', mono_frame.getCvFrame())

        key = cv2.waitKey(1)
        if key == ord('q'):
            print("Detected 'q' - stopping the pipeline...")
            self.stopPipeline()

p = dai.Pipeline()
with p:
    rgb_camera = p.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A) # Create a camera node for RGB camera
    mono_camera = p.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) # Create a camera node for left mono camera

    # Create an instance of the custom host display node and link it to both camera nodes
    display = p.create(HostDisplay).build(rgb_camera.requestOutput((500, 500)), mono_camera.requestOutput((500, 500)))

    p.run()  # Will block until the pipeline is stopped by someone else (in this case it's the display node)
```

### Additional methods

 * runSyncingOnDevice() - This method can be used to run the process method on the device instead of the host. By default, the
   process method is executed on the host.
 * sendProcessingToPipeline() - This method can be used to send the processing to the pipeline. By default, the process method is
   executed in a separate thread. This is mandatory when the processing involves a GUI (like displaying frames using OpenCV) since
   GUI operations must be done in the main thread.

### Need assistance?

Head over to [Discussion Forum](https://discuss.luxonis.com/) for technical support or any other questions you might have.
