# Threaded Host Nodes

This guide covers the implementation of a custom DepthAI pipeline using three custom nodes: TestSource, TestPassthrough, and
TestSink. These nodes demonstrate basic data flow in a DepthAI pipeline, where buffers are generated by the source, passed through
a passthrough node, and then received by a sink node.

## Overview

The pipeline consists of three custom nodes:

 * TestSource: Generates and sends buffers at regular intervals.
 * TestPassthrough: Receives buffers from the source and forwards them to the sink.
 * TestSink: Receives buffers from the passthrough node and processes them.

These nodes demonstrate the basic structure of threaded host nodes in DepthAI, which can be used to implement custom logic within
a pipeline.

## Demo output

```bash
Hello, this is passthrough
Hello, this is passthroughSubnodes_subnode1
Hello, this is passthroughSubnodes_subnode2
Hello, this is sink1
Hello, this is sink2
source node is sending a buffer!
The passthrough node received a buffer!
The passthrough node received a buffer!
The passthrough node received a buffer!
sink2 node received a buffer!
sink1 node received a buffer!
Pipeline is running...
source node is sending a buffer!
The passthrough node received a buffer!
The passthrough node received a buffer!
sink1 node received a buffer!
The passthrough node received a buffer!
sink2 node received a buffer!
...
```

## Source Code

#### Python

```python
import depthai as dai
import time

class TestPassthrough(dai.node.ThreadedHostNode):
    def __init__(self, name: str):
        super().__init__()
        self.name = name
        self.input = self.createInput()
        self.output = self.createOutput()

        # Possible API 1:
        self.input.setPossibleDatatypes([dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)])
        self.output.setPossibleDatatypes([dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgFrame, True)])

        # Possible API 2:
        self.input.setPossibleDatatypes([
            (dai.DatatypeEnum.ImgFrame, True),
            (dai.DatatypeEnum.Buffer, True)
        ])
        self.output.setPossibleDatatypes([
            (dai.DatatypeEnum.ImgFrame, True),
            (dai.DatatypeEnum.Buffer, True)
        ])

    def onStart(self):
        print("Hello, this is", self.name)

    def onStop(self):
        print("Goodbye from", self.name)

    def run(self):
        while self.mainLoop():
            buffer = self.input.get()
            print("The passthrough node received a buffer!")
            self.output.send(buffer)

class TestSink(dai.node.ThreadedHostNode):
    def __init__(self, name: str):
        super().__init__()
        self.input = self.createInput()

        self.name = name

    def onStart(self):
        print("Hello, this is", self.name)

    def run(self):
        while self.mainLoop():
            buffer = self.input.get()
            del buffer
            print(f"{self.name} node received a buffer!")

class TestSource(dai.node.ThreadedHostNode):
    def __init__(self, name: str):
        super().__init__()
        self.name = name
        self.output = self.createOutput()

    def run(self):
        while self.mainLoop():
            buffer = dai.Buffer()
            print(f"{self.name} node is sending a buffer!")
            self.output.send(buffer)
            time.sleep(1)

class TestPassthroughSubnodes(dai.node.ThreadedHostNode):
    def __init__(self, name: str):
        super().__init__()
        self.passthrough1 = self.createSubnode(TestPassthrough, name + "_subnode1")
        self.passthrough2 = self.createSubnode(TestPassthrough, name + "_subnode2")
        self.input = self.passthrough1.input
        self.output = self.passthrough2.output

        # Link the two subnodes together
        self.passthrough1.output.link(self.passthrough2.input)

    def run(self):
        while self.mainLoop():
            buffer = self.input.get()
            self.output.send(buffer)

with dai.Pipeline(False) as p:
    """
    Create the following pipeline:
    source -> passthrough -> sink1
           |
           -> passthroughSubnodes -> sink2
    """

    # Create nodes
    source = p.create(TestSource, "source")
    passthrough = p.create(TestPassthrough, "passthrough")
    passthroughSubnodes = p.create(TestPassthroughSubnodes, "passthroughSubnodes")
    sink1 = p.create(TestSink, "sink1")
    sink2 = p.create(TestSink, "sink2")

    # Link nodes
    source.output.link(passthrough.input)
    source.output.link(passthroughSubnodes.input)
    passthrough.output.link(sink1.input)
    passthroughSubnodes.output.link(sink2.input)

    p.start()
    while p.isRunning():
        time.sleep(1)
        print("Pipeline is running...")
```

#### C++

```cpp
#include <chrono>
#include <iostream>
#include <thread>

#include "depthai/depthai.hpp"

class TestPassthrough : public dai::node::CustomThreadedNode<TestPassthrough> {
   public:
    Input input = dai::Node::Input{*this, {}};
    Output output = dai::Node::Output{*this, {}};

    void run() override {
        while(mainLoop()) {
            auto buffer = input.get<dai::Buffer>();
            if(buffer) {
                std::cout << "The passthrough node received a buffer!" << std::endl;
                output.send(buffer);
            }
        }
    }
};

class TestSink : public dai::node::CustomThreadedNode<TestSink> {
   public:
    Input input = dai::Node::Input{*this, {}};

    void run() override {
        while(mainLoop()) {
            auto buffer = input.get<dai::Buffer>();
            if(buffer) {
                std::cout << "The sink node received a buffer!" << std::endl;
            }
        }
    }
};

class TestSource : public dai::node::CustomThreadedNode<TestSource> {
   public:
    Output output = dai::Node::Output{*this, {}};

    void run() override {
        while(mainLoop()) {
            auto buffer = std::make_shared<dai::Buffer>();
            std::cout << "The source node is sending a buffer!" << std::endl;
            output.send(buffer);
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }
};

int main() {
    dai::Pipeline pipeline(false);

    auto source = pipeline.create<TestSource>();
    auto passthrough = pipeline.create<TestPassthrough>();
    auto sink = pipeline.create<TestSink>();

    source->output.link(passthrough->input);
    passthrough->output.link(sink->input);

    pipeline.start();

    while(true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Pipeline is running..." << std::endl;
    }

    return 0;
}
```

### Need assistance?

Head over to [Discussion Forum](https://discuss.luxonis.com/) for technical support or any other questions you might have.
