Standalone mode
- Doesn't require a host computer to start the application, and can connect to different computers/servers independently
- Is more robust to any instabilities (eg. networking issues, where connection between camera and host computer would drop), as it will auto-restart the application
- Is faster to start, as host computer doesn't need to send over the pipeline + assets (takes a few seconds)
Support
- OAK PoE (RVC2-based) have on-board flash memory, and can communicate with the outside world via network protocols (eg. HTTP, TCP, UDP, MQTT...)
- OAK USB (RVC2-based) - not supported, as they can't communicate with the outside world
- [Deprecated] OAK IoT (RVC2-based) have on-board memory and integrated ESP32, which was able to communicate with the outside world via WiFi/Bluetooth.
- OAK-D CM4 and OAK-D CM4 PoE have integrated Raspberry Pi Compute Module 4
- Both RVC3 (RAE Robot) and RVC4 (OAK4) chips are running Linux OS, so users can directly SSH into the device, and can setup their own applications to run on boot.
OAK PoE standalone mode
- TCP streaming (camera being either server or client)
- HTTP server
- HTTP client
- MQTT client
DNS resolver
Standalone mode is missing a DNS resolver, so you will need to use IP addresses instead of domain names.
Converting to standalone mode
Example
- One part will be the pipeline definition (which will be flashed to the device)
- The other part will be the host computer code (receiving data, visualizing and displaying it)
oak.py). I'll also remove XLinkOut nodes, as they're ignored in standalone mode, and instead create a Script node, which will be used to send the data over the network.Python
1pipeline = dai.Pipeline()
2
3# Define sources and outputs
4camRgb = pipeline.create(dai.node.ColorCamera)
5detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
6# Properties
7camRgb.setPreviewSize(640, 352)
8camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
9camRgb.setInterleaved(False)
10camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
11camRgb.setFps(40)
12
13# Network specific settings
14detectionNetwork.setConfidenceThreshold(0.5)
15detectionNetwork.setNumClasses(80)
16detectionNetwork.setCoordinateSize(4)
17detectionNetwork.setIouThreshold(0.5)
18detectionNetwork.setBlobPath(nnPath)
19detectionNetwork.setNumInferenceThreads(2)
20detectionNetwork.input.setBlocking(False)
21
22# Create Script node that will handle TCP communication
23script = pipeline.create(dai.node.Script)
24script.setProcessor(dai.ProcessorType.LEON_CSS)
25script.setScript("""
26while True:
27 detections = node.io["detection_in"].get().detections
28 img = node.io["frame_in"].get()
29""")
30# Link outputs (RGB stream, NN output) to the Script node
31detectionNetwork.passthrough.link(script.inputs['frame_in'])
32detectionNetwork.out.link(script.inputs['detection_in'])Python
1import socket
2import time
3import threading
4node.warn("Server up")
5server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
6server.bind(("0.0.0.0", 5000)) # Create TCP server on port 5000
7server.listen()
8
9while True:
10 conn, client = server.accept()
11 node.warn(f"Connected to client IP: {client}")
12 try:
13 while True:
14 detections = node.io["detection_in"].get().detections # Read ImgDetections message, only get detections
15 img = node.io["frame_in"].get() # Read ImgFrame message
16 node.warn('Received frame + dets')
17 img_data = img.getData()
18 ts = img.getTimestamp()
19
20 det_arr = []
21 for det in detections:
22 det_arr.append(f"{det.label};{(det.confidence*100):.1f};{det.xmin:.4f};{det.ymin:.4f};{det.xmax:.4f};{det.ymax:.4f}")
23 det_str = "|".join(det_arr) # Serialize detections to string, which will get sent to the client
24
25 header = f"IMG {ts.total_seconds()} {len(img_data)} {len(det_str)}".ljust(32)
26 node.warn(f'>{header}<')
27 conn.send(bytes(header, encoding='ascii')) # Send over the header
28 if 0 < len(det_arr): # Send over serialized detections (if there are any)
29 conn.send(bytes(det_str, encoding='ascii'))
30 conn.send(img_data) # Send over the actual image frame
31 except Exception as e:
32 node.warn("Client disconnected")host.py script that will connect to the camera's TCP server, receive the video stream + metadata, and visualize + display the data. We can use host.py script as a base, and modify it to also receive and visualize detections:Python
1import socket
2import re
3import cv2
4import numpy as np
5
6# Enter your own IP! After you run oak.py script, it will print the IP in the terminal
7OAK_IP = "10.12.101.188"
8
9labels = [ "person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed", "diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush" ]
10
11def get_frame(socket, size):
12 bytes = socket.recv(4096)
13 while True:
14 read = 4096
15 if size-len(bytes) < read:
16 read = size-len(bytes)
17 bytes += socket.recv(read)
18 if size == len(bytes):
19 return bytes
20
21sock = socket.socket()
22sock.connect((OAK_IP, 5000))
23
24try:
25 COLOR = (127,255,0)
26 while True:
27 header = str(sock.recv(32), encoding="ascii")
28 chunks = re.split(' +', header)
29 if chunks[0] == "IMG":
30 print(f">{header}<")
31 ts = float(chunks[1])
32 imgSize = int(chunks[2])
33 det_len = int(chunks[3])
34
35 if 0 < det_len: # If there are detections, read them
36 det_str = str(sock.recv(det_len), encoding="ascii")
37
38 img = get_frame(sock, imgSize) # Get image frame
39 img_planar = np.frombuffer(img, dtype=np.uint8).reshape(3, 352, 640) # Reshape (it's in planar)
40 img_interleaved = img_planar.transpose(1, 2, 0).copy() # Convert to interleaved (cv2 requires this)
41 # Visualize detections:
42 if 0 < det_len:
43 dets = det_str.split("|") # Deserialize detections
44 for det in dets:
45 det_section = det.split(";")
46 class_id = int(det_section[0])
47 confidence = float(det_section[1])
48 bbox = [ # Convert from relative to absolute
49 int(float(det_section[2]) * img_interleaved.shape[1]),
50 int(float(det_section[3]) * img_interleaved.shape[0]),
51 int(float(det_section[4]) * img_interleaved.shape[1]),
52 int(float(det_section[5]) * img_interleaved.shape[0])
53 ]
54 cv2.putText(img_interleaved, labels[class_id], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
55 cv2.putText(img_interleaved, f"{int(confidence)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, COLOR)
56 cv2.rectangle(img_interleaved, (bbox[0], bbox[1]), (bbox[2], bbox[3]), COLOR, 2)
57
58 # Display the frame with visualized detections
59 cv2.imshow("Img", img_interleaved)
60
61 if cv2.waitKey(1) == ord('q'):
62 break
63except Exception as e:
64 print("Error:", e)
65
66sock.close()oak.py and host.py) can be found here. Note that for Standalone mode, you might want to flash static IP, so you don't have to change the IP in the code every time.Bootloader
Flash the pipeline
Python
1import depthai as dai
2
3pipeline = dai.Pipeline()
4
5# Define standalone pipeline; add nodes and link them
6# cam = pipeline.create(dai.node.ColorCamera)
7# script = pipeline.create(dai.node.Script)
8# ...
9
10# Flash the pipeline
11(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
12bootloader = dai.DeviceBootloader(bl)
13progress = lambda p : print(f'Flashing progress: {p*100:.1f}%')
14bootloader.flash(progress, pipeline)DepthAI Application Package (.dap)
Python
1import depthai as dai
2
3pipeline = dai.Pipeline()
4
5# Define standalone pipeline; add nodes and link them
6# cam = pipeline.create(dai.node.ColorCamera)
7# script = pipeline.create(dai.node.Script)
8# ...
9
10# Create Depthai Application Package (.dap)
11(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
12bootloader = dai.DeviceBootloader(bl)
13bootloader.saveDepthaiApplicationPackage(pipeline=pipeline, path=<path_of_new_dap>)Clear flash
Python
1import depthai as dai
2(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
3if not f:
4 print('No devices found, exiting...')
5 exit(-1)
6
7with dai.DeviceBootloader(bl) as bootloader:
8 bootloader.flashClear()
9 print('Successfully cleared bootloader flash')