独立模式
独立模式
- 无需主机计算机即可启动应用程序,并且可以独立连接到不同的计算机/服务器
- 对任何不稳定因素(例如网络问题,摄像头和主机计算机之间的连接会断开)更具鲁棒性,因为它会自动重启应用程序
- 启动速度更快,因为主机计算机无需传输管道 + 资产(需要几秒钟)
支持
- OAK PoE(RVC2 型)具有板载闪存,并且可以通过网络协议(例如 HTTP、TCP、UDP、MQTT 等)与外部世界通信
- OAK USB(RVC2 型)- 不支持,因为它们无法与外部世界通信
- [已弃用] OAK IoT(RVC2 型)具有板载内存和集成的 ESP32,能够通过 WiFi/蓝牙与外部世界通信。
- OAK-D CM4 和 OAK-D CM4 PoE 集成了 Raspberry Pi Compute Module 4
- RVC3(RAE 机器人)和 RVC4(OAK4)芯片都运行 Linux 操作系统,因此用户可以直接 SSH 进入设备,并可以设置自己的应用程序以在启动时运行。
OAK PoE 独立模式
DNS 解析器
独立模式缺少 DNS 解析器,因此您需要使用 IP 地址而不是域名。
转换为独立模式
示例
- 一部分是管道定义(将刷写到设备)
- 另一部分是主机计算机代码(接收数据、可视化和显示它)
oak.py)。我还会移除 XLinkOut 节点,因为它们在独立模式下会被忽略,而是创建一个 Script 节点,用于通过网络发送数据。Python
1pipeline = dai.Pipeline()
2
3# Define sources and outputs
4camRgb = pipeline.create(dai.node.ColorCamera)
5detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
6# Properties
7camRgb.setPreviewSize(640, 352)
8camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
9camRgb.setInterleaved(False)
10camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
11camRgb.setFps(40)
12
13# Network specific settings
14detectionNetwork.setConfidenceThreshold(0.5)
15detectionNetwork.setNumClasses(80)
16detectionNetwork.setCoordinateSize(4)
17detectionNetwork.setIouThreshold(0.5)
18detectionNetwork.setBlobPath(nnPath)
19detectionNetwork.setNumInferenceThreads(2)
20detectionNetwork.input.setBlocking(False)
21
22# Create Script node that will handle TCP communication
23script = pipeline.create(dai.node.Script)
24script.setProcessor(dai.ProcessorType.LEON_CSS)
25script.setScript("""
26while True:
27 detections = node.io["detection_in"].get().detections
28 img = node.io["frame_in"].get()
29""")
30# Link outputs (RGB stream, NN output) to the Script node
31detectionNetwork.passthrough.link(script.inputs['frame_in'])
32detectionNetwork.out.link(script.inputs['detection_in'])Python
1import socket
2import time
3import threading
4node.warn("Server up")
5server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
6server.bind(("0.0.0.0", 5000)) # Create TCP server on port 5000
7server.listen()
8
9while True:
10 conn, client = server.accept()
11 node.warn(f"Connected to client IP: {client}")
12 try:
13 while True:
14 detections = node.io["detection_in"].get().detections # Read ImgDetections message, only get detections
15 img = node.io["frame_in"].get() # Read ImgFrame message
16 node.warn('Received frame + dets')
17 img_data = img.getData()
18 ts = img.getTimestamp()
19
20 det_arr = []
21 for det in detections:
22 det_arr.append(f"{det.label};{(det.confidence*100):.1f};{det.xmin:.4f};{det.ymin:.4f};{det.xmax:.4f};{det.ymax:.4f}")
23 det_str = "|".join(det_arr) # Serialize detections to string, which will get sent to the client
24
25 header = f"IMG {ts.total_seconds()} {len(img_data)} {len(det_str)}".ljust(32)
26 node.warn(f'>{header}<')
27 conn.send(bytes(header, encoding='ascii')) # Send over the header
28 if 0 < len(det_arr): # Send over serialized detections (if there are any)
29 conn.send(bytes(det_str, encoding='ascii'))
30 conn.send(img_data) # Send over the actual image frame
31 except Exception as e:
32 node.warn("Client disconnected")host.py 脚本,该脚本将连接到摄像头的 TCP 服务器,接收视频流+元数据,并可视化+显示数据。 我们可以使用 host.py 脚本 作为基础,并对其进行修改以接收和可视化检测结果:Python
1import socket
2import re
3import cv2
4import numpy as np
5
6# Enter your own IP! After you run oak.py script, it will print the IP in the terminal
7OAK_IP = "10.12.101.188"
8
9labels = [ "person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush" ]
10
11def get_frame(socket, size):
12 bytes = socket.recv(4096)
13 while True:
14 read = 4096
15 if size-len(bytes) < read:
16 read = size-len(bytes)
17 bytes += socket.recv(read)
18 if size == len(bytes):
19 return bytes
20
21sock = socket.socket()
22sock.connect((OAK_IP, 5000))
23
24try:
25 COLOR = (127,255,0)
26 while True:
27 header = str(sock.recv(32), encoding="ascii")
28 chunks = re.split(' +', header)
29 if chunks[0] == "IMG":
30 print(f">{header}<")
31 ts = float(chunks[1])
32 imgSize = int(chunks[2])
33 det_len = int(chunks[3])
34
35 if 0 < det_len: # If there are detections, read them
36 det_str = str(sock.recv(det_len), encoding="ascii")
37
38 img = get_frame(sock, imgSize) # Get image frame
39 img_planar = np.frombuffer(img, dtype=np.uint8).reshape(3, 352, 640) # Reshape (it's in planar)
40 img_interleaved = img_planar.transpose(1, 2, 0).copy() # Convert to interleaved (cv2 requires this)
41 # Visualize detections:
42 if 0 < det_len:
43 dets = det_str.split("|") # Deserialize detections
44 for det in dets:
45 det_section = det.split(";")
46 class_id = int(det_section[0])
47 confidence = float(det_section[1])
48 bbox = [ # Convert from relative to absolute
49 int(float(det_section[2]) * img_interleaved.shape[1]),
50 int(float(det_section[3]) * img_interleaved.shape[0]),
51 int(float(det_section[4]) * img_interleaved.shape[1]),
52 int(float(det_section[5]) * img_interleaved.shape[0])
53 ]
54 cv2.putText(img_interleaved, labels[class_id], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
55 cv2.putText(img_interleaved, f"{int(confidence)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
56 cv2.rectangle(img_interleaved, (bbox[0], bbox[1]), (bbox[2], bbox[3]), COLOR, 2)
57
58 # Display the frame with visualized detections
59 cv2.imshow("Img", img_interleaved)
60
61 if cv2.waitKey(1) == ord('q'):
62 break
63except Exception as e:
64 print("Error:", e)
65
66sock.close()oak.py 和 host.py)可以在 此处 找到。请注意,对于独立模式,您可能需要刷写静态 IP,这样就不必每次都更改代码中的 IP。引导加载程序
刷写管道
Python
1import depthai as dai
2
3pipeline = dai.Pipeline()
4
5# 定义独立管道;添加节点并链接它们
6# cam = pipeline.create(dai.node.ColorCamera)
7# script = pipeline.create(dai.node.Script)
8# ...
9
10# 刷写管道
11(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
12bootloader = dai.DeviceBootloader(bl)
13progress = lambda p : print(f'Flashing progress: {p*100:.1f}%')
14bootloader.flash(progress, pipeline)DepthAI 应用程序包 (.dap)
Python
1import depthai as dai
2
3pipeline = dai.Pipeline()
4
5# 定义独立管道;添加节点并链接它们
6# cam = pipeline.create(dai.node.ColorCamera)
7# script = pipeline.create(dai.node.Script)
8# ...
9
10# 创建 Depthai 应用程序包 (.dap)
11(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
12bootloader = dai.DeviceBootloader(bl)
13bootloader.saveDepthaiApplicationPackage(pipeline=pipeline, path=<path_of_new_dap>)清除闪存
Python
1import depthai as dai
2(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
3if not f:
4 print('No devices found, exiting...')
5 exit(-1)
6
7with dai.DeviceBootloader(bl) as bootloader:
8 bootloader.flashClear()
9 print('Successfully cleared bootloader flash')