DepthAI v2 has been superseded by DepthAI v3. You are viewing legacy documentation.
此页面由 AI 自动翻译。查看英文原版

本页目录

  • 支持
  • OAK PoE 独立模式
  • 转换为独立模式
  • 引导加载程序
  • 闪存管道
  • DepthAI 应用程序包(.dap)
  • 清除闪存
  • 工厂重置

独立模式

独立模式允许 RVC2 系列摄像头在通电时自动启动(已刷写应用程序),而无需连接到任何特定的主机计算机。这在多摄像头架构中尤其有用,因为每个摄像头都可以运行自己的应用程序,而无需依赖主机计算机。每个摄像头都可以是服务器或客户端,并可以通过网络协议(例如 HTTP、TCP、UDP、MQTT 等)与其他摄像头或服务器进行通信。相比之下,外设模式意味着摄像头直接连接到特定的主机计算机。主机计算机连接到空闲设备,上传管道 + 资产(如 NN 模型),然后与设备通信(例如,获取视频流)。与外设模式相比,独立模式:
  • 无需主机计算机即可启动应用程序,并且可以独立连接到不同的计算机/服务器
  • 对任何不稳定因素(例如,网络问题,摄像头和主机计算机之间的连接会中断)更具鲁棒性,因为它会自动重启应用程序
  • 启动速度更快,因为主机计算机无需传输管道 + 资产(需要几秒钟)

支持

对于基于 RVC2 的摄像头,独立模式的支持取决于闪存以及与外部世界通信的能力:
  • OAK PoERVC2 系列)具有板载闪存,并且可以通过网络协议(例如 HTTP、TCP、UDP、MQTT 等)与外部世界通信
  • OAK USBRVC2 系列)- 不支持,因为它们无法与外部世界通信
  • [已弃用] OAK IoT(RVC2 系列)具有板载内存和集成的 ESP32,能够通过 WiFi/蓝牙与外部世界通信。
集成 Linux 计算机的设备可以设置自己的应用程序以在启动时运行:
  • OAK-D CM4OAK-D CM4 PoE 集成了 Raspberry Pi Compute Module 4
  • RVC3RAE Robot)和 RVC4OAK4)芯片都运行 Linux 操作系统,因此用户可以直接 SSH 进入设备,并可以设置自己的应用程序以在启动时运行。

OAK PoE 独立模式

为了与(计算机)外部世界“通信”,OAK POE 摄像头可以使用 Script 节点 来发送/接收网络数据包(HTTP/TCP/UDP...)。以下是一些示例:

转换为独立模式

由于主机和设备之间将没有直接通信,因此您需要首先删除所有 XLinkOutXLinkIn 节点,这些节点在外设模式下用于主机和设备之间的通信。我们需要将 Script 节点添加到管道中,并将我们想要的所有数据(例如,编码视频、NN 元数据、IMU 结果等)首先流式传输到 Script 节点,然后通过网络发送。

示例

让我们将 YOLOv8 示例更新为独立模式。此示例使用 YOLOv8 模型检测视频流中的对象,并将视频流 + 元数据发送到主机计算机,在那里进行可视化(边界框)并显示给用户。首先,我们需要将代码分成两部分:
  • 一部分是管道定义(将刷写到设备上)
  • 另一部分是主机代码(接收、可视化和显示数据)
在示例的源代码(Python)中,管道定义代码位于第 41 至 75 行。我们可以将此定义复制到一个单独的文件中(例如 oak.py)。我还会删除 XLinkOut 节点,因为它们在独立模式下会被忽略,而是创建一个 Script 节点,用于通过网络发送数据。
Python
1pipeline = dai.Pipeline()
2
3# Define sources and outputs
4camRgb = pipeline.create(dai.node.ColorCamera)
5detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
6# Properties
7camRgb.setPreviewSize(640, 352)
8camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
9camRgb.setInterleaved(False)
10camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
11camRgb.setFps(40)
12
13# Network specific settings
14detectionNetwork.setConfidenceThreshold(0.5)
15detectionNetwork.setNumClasses(80)
16detectionNetwork.setCoordinateSize(4)
17detectionNetwork.setIouThreshold(0.5)
18detectionNetwork.setBlobPath(nnPath)
19detectionNetwork.setNumInferenceThreads(2)
20detectionNetwork.input.setBlocking(False)
21
22# Create Script node that will handle TCP communication
23script = pipeline.create(dai.node.Script)
24script.setProcessor(dai.ProcessorType.LEON_CSS)
25script.setScript("""
26while True:
27    detections = node.io["detection_in"].get().detections
28    img = node.io["frame_in"].get()
29""")
30# Link outputs (RGB stream, NN output) to the Script node
31detectionNetwork.passthrough.link(script.inputs['frame_in'])
32detectionNetwork.out.link(script.inputs['detection_in'])
现在,我们只需要关注 Script 节点部分,它将启动 TCP 服务器并将帧 + 检测结果发送给连接的客户端。我们已经有了 TCP 流式传输示例代码,因此我们可以以此为基础。我们还需要将检测结果(元数据)流式传输到客户端,因此我们将添加这部分代码。最终的 Script 节点代码将如下所示:
Python
1import socket
2import time
3import threading
4node.warn("Server up")
5server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
6server.bind(("0.0.0.0", 5000)) # Create TCP server on port 5000
7server.listen()
8
9while True:
10    conn, client = server.accept()
11    node.warn(f"Connected to client IP: {client}")
12    try:
13        while True:
14            detections = node.io["detection_in"].get().detections # Read ImgDetections message, only get detections
15            img = node.io["frame_in"].get() # Read ImgFrame message
16            node.warn('Received frame + dets')
17            img_data = img.getData()
18            ts = img.getTimestamp()
19
20            det_arr = []
21            for det in detections:
22                det_arr.append(f"{det.label};{(det.confidence*100):.1f};{det.xmin:.4f};{det.ymin:.4f};{det.xmax:.4f};{det.ymax:.4f}")
23            det_str = "|".join(det_arr) # Serialize detections to string, which will get sent to the client
24
25            header = f"IMG {ts.total_seconds()} {len(img_data)} {len(det_str)}".ljust(32)
26            node.warn(f'>{header}<')
27            conn.send(bytes(header, encoding='ascii')) # Send over the header
28            if 0 < len(det_arr): # Send over serialized detections (if there are any)
29                conn.send(bytes(det_str, encoding='ascii'))
30            conn.send(img_data) # Send over the actual image frame
31    except Exception as e:
32        node.warn("Client disconnected")
现在我们有了准备好的带有 Script 节点的 Pipeline 端,我们需要创建一个 host.py 脚本,该脚本将连接到摄像头的 TCP 服务器,接收视频流 + 元数据,并可视化 + 显示数据。 我们可以使用 host.py 脚本 作为基础,并对其进行修改以接收和可视化检测结果:
Python
1import socket
2import re
3import cv2
4import numpy as np
5
6# Enter your own IP! After you run oak.py script, it will print the IP in the terminal
7OAK_IP = "10.12.101.188"
8
9labels =  [ "person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush" ]
10
11def get_frame(socket, size):
12    bytes = socket.recv(4096)
13    while True:
14        read = 4096
15        if size-len(bytes) < read:
16            read = size-len(bytes)
17        bytes += socket.recv(read)
18        if size == len(bytes):
19            return bytes
20
21sock = socket.socket()
22sock.connect((OAK_IP, 5000))
23
24try:
25    COLOR = (127,255,0)
26    while True:
27        header = str(sock.recv(32), encoding="ascii")
28        chunks = re.split(' +', header)
29        if chunks[0] == "IMG":
30            print(f">{header}<")
31            ts = float(chunks[1])
32            imgSize = int(chunks[2])
33            det_len = int(chunks[3])
34
35            if 0 < det_len: # If there are detections, read them
36                det_str = str(sock.recv(det_len), encoding="ascii")
37
38            img = get_frame(sock, imgSize) # Get image frame
39            img_planar = np.frombuffer(img, dtype=np.uint8).reshape(3, 352, 640) # Reshape (it's in planar)
40            img_interleaved = img_planar.transpose(1, 2, 0).copy() # Convert to interleaved (cv2 requires this)
41            # Visualize detections:
42            if 0 < det_len:
43                dets = det_str.split("|") # Deserialize detections
44                for det in dets:
45                    det_section = det.split(";")
46                    class_id = int(det_section[0])
47                    confidence = float(det_section[1])
48                    bbox = [ # Convert from relative to absolute
49                        int(float(det_section[2]) * img_interleaved.shape[1]),
50                        int(float(det_section[3]) * img_interleaved.shape[0]),
51                        int(float(det_section[4]) * img_interleaved.shape[1]),
52                        int(float(det_section[5]) * img_interleaved.shape[0])
53                    ]
54                    cv2.putText(img_interleaved, labels[class_id], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
55                    cv2.putText(img_interleaved, f"{int(confidence)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
56                    cv2.rectangle(img_interleaved, (bbox[0], bbox[1]), (bbox[2], bbox[3]), COLOR, 2)
57
58            # Display the frame with visualized detections
59            cv2.imshow("Img", img_interleaved)
60
61        if cv2.waitKey(1) == ord('q'):
62            break
63except Exception as e:
64    print("Error:", e)
65
66sock.close()
完整的代码(oak.pyhost.py)可以在 此处 找到。请注意,对于独立模式,您可能需要闪存静态 IP,这样就不必每次都更改代码中的 IP。

引导加载程序

引导加载程序 负责启动已闪存的应用程序(独立模式),我们建议使用最新的引导加载程序,您可以使用 设备管理器(GUI 工具)进行闪存。要查看其背后的 API 代码,请参阅 闪存引导加载程序 示例代码(仅 API 脚本)。

闪存管道

在定义了(独立)管道 并已在设备上闪存了最新的 引导加载程序 后,您可以将管道及其资产(例如 NN 模型)闪存到设备上。您可以使用以下代码段闪存管道:
Python
1import depthai as dai
2
3pipeline = dai.Pipeline()
4
5# 定义独立管道;添加节点并链接它们
6# cam = pipeline.create(dai.node.ColorCamera)
7# script = pipeline.create(dai.node.Script)
8# ...
9
10# 闪存管道
11(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
12bootloader = dai.DeviceBootloader(bl)
13progress = lambda p : print(f'Flashing progress: {p*100:.1f}%')
14bootloader.flash(progress, pipeline)
成功闪存管道后,它将在您为设备供电时自动启动。如果您想更改已闪存的管道,只需再次闪存即可。

DepthAI 应用程序包(.dap)

或者,您也可以使用 设备管理器 闪存管道。对于这种方法,您将需要一个 Depthai 应用程序包(.dap),您可以使用以下脚本创建它:
Python
1import depthai as dai
2
3pipeline = dai.Pipeline()
4
5# 定义独立管道;添加节点并链接它们
6# cam = pipeline.create(dai.node.ColorCamera)
7# script = pipeline.create(dai.node.Script)
8# ...
9
10# 创建 Depthai 应用程序包(.dap)
11(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
12bootloader = dai.DeviceBootloader(bl)
13bootloader.saveDepthaiApplicationPackage(pipeline=pipeline, path=<path_of_new_dap>)

清除闪存

由于管道将在设备通电时启动,这可能会导致不必要的发热。如果您想清除已闪存的管道,请使用下面的代码片段。
Python
1import depthai as dai
2(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
3if not f:
4    print('No devices found, exiting...')
5    exit(-1)
6
7with dai.DeviceBootloader(bl) as bootloader:
8    bootloader.flashClear()
9    print('Successfully cleared bootloader flash')

工厂重置

如果您已将设备软砖化,或者只想清除所有内容(已闪存的管道/资产和引导加载程序配置),我们建议使用 设备管理器。工厂重置还将闪存最新的引导加载程序。