此页面由 AI 自动翻译。查看英文原版

本页目录

  • 在主机上运行
  • 线程主机节点
  • 样板代码
  • 链接多个主机节点
  • 主机节点
  • 样板代码
  • 同步
  • 附加方法

主机节点

主机节点是 DepthAI V3 中引入的自定义节点,它们运行在主机上并与 DepthAI 管道进行交互。它们可用于执行各种任务,例如数据处理、同步和显示。主机节点使用 HostNodeThreadedHostNode 类实现,这些类提供了一个框架,用于在管道内创建自定义主机端逻辑。DepthAI 附带了几个预定义的主机节点,例如 BasaltVIORTABMapSLAM 以及 Record/Replay。这些节点提供了强大的开箱即用功能。然而,主机节点最令人兴奋的方面之一是开发人员能够创建自己的自定义节点。这些自定义节点可以设计用于捕获帧、操作数据、解码神经网络输出等。此外,开发人员可以将自定义主机节点打包成 Python 包,并通过 pip 进行分发。这使得在不同项目和社区之间共享和重用自定义节点更加容易。例如,像 geaxgx 的手部跟踪器 这样的自定义节点可以打包并发布到 PyPI,允许其他人通过简单的 pip install 轻松地将其安装并集成到他们的 DepthAI 管道中。

在主机上运行

  • 在 RVC2 设备上(OAK-D、OAK-1) - 主机是通过 USB 连接到 DepthAI 设备的设备。这意味着主机节点将在连接到 DepthAI 设备的机器上运行(通常是 PC)。
  • 在 RVC4 设备上(OAK4-S、OAK4-D) - 主机端取决于设备运行的模式。在 外设模式 下,主机是通过 USB 连接到 DepthAI 设备的设备——与 RVC2 设备相同。在 独立模式 下,OAK4 设备本身就是主机(运行 Linux),主机节点在设备 CPU 上运行(见下图)。
有关更多信息,请参阅 独立模式与外设模式 文章。

主机节点类型

线程主机节点

ThreadedHostNode 是需要多线程的主机节点基类。此类允许节点在其自己的专用线程中异步运行其逻辑,使其适用于不应阻塞管道主执行流程的操作。

样板代码

要创建自定义线程主机节点,您需要从 ThreadedHostNode 派生一个新类并实现以下方法:
  • __init__:用于初始化节点的构造函数。您可以在此处创建输入和输出队列。
  • run() 方法是节点的主要执行循环,其中实现了自定义逻辑。
  • onStart()onStop() 方法分别在节点启动和停止时调用。这些方法是可选的,可用于执行任何初始化或清理任务。

代码示例

来自 主机摄像头 示例的简单线程主机节点示例。
Python
1class HostCamera(dai.node.ThreadedHostNode):
2    def __init__(self):
3        super().__init__() # 调用基类构造函数
4        self.output = self.createOutput() # 创建一个输出队列 - 这将发送 ImgFrame 消息
5
6    def run(self): # 节点的主要执行循环
7        cap = cv2.VideoCapture(0) # 为主机摄像头创建 VideoCapture 对象
8        if not cap.isOpened():
9            p.stop()
10            raise RuntimeError("Error: Couldn't open host camera")
11        while self.isRunning(): # 循环直到节点停止
12            ret, frame = cap.read() # 从摄像头读取帧
13            if not ret:
14                break
15            # 创建一个 ImgFrame 消息并设置其数据、宽度、高度和类型
16            imgFrame = dai.ImgFrame()
17            imgFrame.setData(frame)
18            imgFrame.setWidth(frame.shape[1])
19            imgFrame.setHeight(frame.shape[0])
20            imgFrame.setType(dai.ImgFrame.Type.BGR888i)
21            # 将消息发送到输出队列
22            self.output.send(imgFrame)
23            # 等待下一帧
24            time.sleep(0.1)
定义节点后,可以使用 pipeline.create() 将其添加到管道中。
Python
1with dai.Pipeline() as p:
2    hostCamera = p.create(HostCamera) # 在管道内创建自定义主机摄像头节点的实例
3    camQueue = hostCamera.output.createOutputQueue() # 创建一个输出队列以接收帧
4
5    p.start() # 启动管道,这将同时启动主机摄像头,隐式运行 `onStart()` 以及 `run()` 方法
6    while p.isRunning():
7        image : dai.ImgFrame = camQueue.get() # 从输出队列获取帧
8        cv2.imshow("HostCamera", image.getCvFrame())
9        key = cv2.waitKey(1)
10        if key == ord('q'):
11            p.stop() # 停止管道和主机摄像头,隐式运行 `onStop()` 方法
12            break

链接多个主机节点

与常规设备节点一样,主机节点也可以在管道中链接在一起。以下是一个包含两个自定义主机节点的管道示例:

代码示例

显示的 এটা 是一个简化的版本 线程主机节点 示例。
Python
1class TestSink(dai.node.ThreadedHostNode): # 用于接收数据的自定义线程主机节点
2    def __init__(self):
3        super().__init__()
4        self.input = self.createInput() # 只需要一个输入队列,因为此节点接收数据
5
6    def onStart(self):
7        print("Hello, this is", __class__.__name__) # 节点启动时打印一条消息
8
9    def run(self):
10        while self.isRunning():
11            buffer = self.input.get() # 从输入队列获取一个缓冲区
12            print("The sink node received a buffer!")
13
14class TestSource(dai.node.ThreadedHostNode): # 用于发送数据的自定义线程主机节点
15    def __init__(self):
16        super().__init__()
17        self.output = self.createOutput() # 只需要一个输出队列,因为此节点发送数据
18
19    def run(self):
20        while self.isRunning():
21            buffer = dai.Buffer() # 创建一个缓冲区
22            print("The source node is sending a buffer!")
23            self.output.send(buffer) # 将缓冲区发送到输出队列
24            time.sleep(1)
定义了自定义节点后,可以将它们添加到管道并链接在一起:
Python
1with dai.Pipeline() as p:
2    source = TestSource() # 创建源节点的实例
3    sink = TestSink() # 创建接收器节点的实例
4    source.output.link(sink.input) # 将源节点的输出链接到接收器节点的输入
5    p.start()
6    while p.isRunning():
7        time.sleep(1)
8        print("Pipeline is running...")

主机节点

HostNode 是一个建立在 ThreadedHostNode 之上的更专业的类。它提供了在主机上处理和同步数据的附加机制。此类旨在处理需要与 DepthAI 设备管道紧密集成的更复杂场景,例如同步多个数据流或显示处理后的数据。

样板代码

要创建自定义主机节点,需要从 HostNode 派生一个新类并实现以下方法:
  • process 方法在每次从输入队列接收到消息时被调用。这是实现自定义逻辑的地方。
  • onStartonStop 方法都可用(因为 HostNode 继承自 ThreadedHostNode),可用于执行任何初始化或清理任务。

代码示例

来自 主机显示 的简单主机节点示例。
Python
1class HostDisplay(dai.node.HostNode):
2    def build(self, frameOutput: dai.Node.Output):
3        self.link_args(frameOutput) # 必须与 `process` 方法的输入匹配
4
5        # 这会将所有处理发送到管道,由 `pipeline.runTasks()` 或隐式地由 `pipeline.run()` 方法执行。
6        # 这是必需的,因为 GUI 窗口需要在主线程中更新,而 `process` 方法默认在单独的线程中调用。
7        self.sendProcessingToPipeline(True)
8        return self
9
10    def onStart(self) -> None: # 可选方法
11        print("HostDisplay started")
12
13    def process(self, message: dai.ImgFrame):
14        cv2.imshow("HostDisplay", message.getCvFrame())
15        key = cv2.waitKey(1)
16        if key == ord('q'):
17            print("Detected 'q' - stopping the pipeline...")
18            self.stopPipeline()
定义节点后,可以使用 pipeline.create() 将其添加到管道中。
Python
1p = dai.Pipeline()
2with p:
3    camera = p.create(dai.node.Camera).build()
4    hostDisplay = p.create(HostDisplay).build(camera.requestOutput((300, 300)))
5
6    p.run() # 将阻塞直到管道被其他人停止(在本例中是显示节点)

同步

HostNode 提供了通过输入发送的消息的隐式同步。这意味着如果向输入队列发送了多条消息,它们将根据其时间戳进行同步。我们修改了上面的示例来演示此功能:

代码示例

Python
1class HostDisplay(dai.node.HostNode):
2
3    def build(self, *args):
4        rgb_frame, mono_frame = args # 从管道解包输入帧。在这种情况下,我们期望两帧 - RGB 和单色
5        self.link_args(rgb_frame, mono_frame) # 必须与 `process` 方法的输入匹配
6
7        # 这会将所有处理发送到管道,由 `pipeline.runTasks()` 或隐式地由 `pipeline.run()` 方法执行。
8        # 这是必需的,因为 GUI 窗口需要在主线程中更新,而 `process` 方法默认在单独的线程中调用。
9        self.sendProcessingToPipeline(True)
10        return self
11
12    def process(self, rgb_frame, mono_frame):
13        # 显示来自输入节点的每一帧
14        cv2.imshow('rgb', rgb_frame.getCvFrame())
15        cv2.imshow('mono', mono_frame.getCvFrame())
16
17        key = cv2.waitKey(1)
18        if key == ord('q'):
19            print("Detected 'q' - stopping the pipeline...")
20            self.stopPipeline()
21
22p = dai.Pipeline()
23with p:
24    rgb_camera = p.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A) # 为 RGB 摄像头创建摄像头节点
25    mono_camera = p.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) # 为左侧单色摄像头创建摄像头节点
26
27    # 创建自定义主机显示节点的实例并将其链接到两个摄像头节点
28    display = p.create(HostDisplay).build(rgb_camera.requestOutput((500, 500)), mono_camera.requestOutput((500, 500)))
29
30    p.run()  # 将阻塞直到管道被其他人停止(在本例中是显示节点)

附加方法

  • runSyncingOnDevice() - 此方法可用于在设备而非主机上运行 process 方法。默认情况下,process 方法在主机上执行。
  • sendProcessingToPipeline() - 此方法可用于将处理发送到管道。默认情况下,process 方法在单独的线程中执行。当处理涉及 GUI(如使用 OpenCV 显示帧)时,此方法是强制性的,因为 GUI 操作必须在主线程中完成。

需要帮助?

请前往 Discussion Forum 获取技术支持或提出您可能有的任何其他问题。