Source code
Python
PythonGitHub
1#!/usr/bin/env python3
2"""
3PointCloud Visualizer
4
5Live 3D visualization of stereo-depth point clouds using Open3D.
6Press Q in the Open3D window (or Ctrl-C in the terminal) to quit.
7"""
8
9import time
10
11import cv2
12import numpy as np
13
14try:
15 import open3d as o3d
16except ImportError:
17 raise SystemExit("Please install open3d: pip install open3d")
18
19import depthai as dai
20
21
22def colorizeDepth(frame: np.ndarray) -> np.ndarray:
23 """Normalize a uint16 depth frame and apply a colormap for display."""
24 downscaled = frame[::4, ::4]
25 nonZero = downscaled[downscaled != 0]
26 if nonZero.size == 0:
27 minD, maxD = 0, 1
28 else:
29 minD = np.percentile(nonZero, 1)
30 maxD = np.percentile(nonZero, 99)
31 colored = np.interp(frame, (minD, maxD), (0, 255)).astype(np.uint8)
32 return cv2.applyColorMap(colored, cv2.COLORMAP_HOT)
33
34
35def main() -> None:
36 print("PointCloud Visualizer")
37 print("=====================")
38 print("Connecting to device...")
39
40 device = dai.Device()
41 print(f"Device: {device.getDeviceName()} (ID: {device.getDeviceId()})\n")
42
43 with dai.Pipeline(device) as pipeline:
44 # ── Camera + StereoDepth ──────────────────────────────────────
45 left = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B)
46 right = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C)
47 color = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A)
48 stereo = pipeline.create(dai.node.StereoDepth)
49 left.requestOutput((640, 400)).link(stereo.left)
50 right.requestOutput((640, 400)).link(stereo.right)
51
52 # Align depth to color camera
53 platform = pipeline.getDefaultDevice().getPlatform()
54 colorOut = color.requestOutput(
55 (640, 400), type=dai.ImgFrame.Type.RGB888i,
56 resizeMode=dai.ImgResizeMode.CROP, enableUndistortion=True,
57 )
58 if platform == dai.Platform.RVC4:
59 align = pipeline.create(dai.node.ImageAlign)
60 stereo.depth.link(align.input)
61 colorOut.link(align.inputAlignTo)
62 alignedDepth = align.outputAligned
63 else:
64 colorOut.link(stereo.inputAlignTo)
65 alignedDepth = stereo.depth
66
67 # ── PointCloud node ───────────────────────────────────────────
68 pc = pipeline.create(dai.node.PointCloud)
69 pc.setRunOnHost(True)
70 pc.initialConfig.setLengthUnit(dai.LengthUnit.METER)
71 alignedDepth.link(pc.inputDepth)
72 colorOut.link(pc.inputColor)
73
74 queue = pc.outputPointCloud.createOutputQueue(maxSize=4, blocking=False)
75 qDepth = pc.passthroughDepth.createOutputQueue(maxSize=4, blocking=False)
76
77 # ── Open3D setup ──────────────────────────────────────────────
78 vis = o3d.visualization.Visualizer()
79 vis.create_window("PointCloud Visualizer")
80 pcd = o3d.geometry.PointCloud()
81 coord = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1)
82 vis.add_geometry(coord)
83 first = True
84
85 # ── Start pipeline & wait for auto-exposure ───────────────────
86 pipeline.start()
87 print("Waiting for auto-exposure to settle...")
88 time.sleep(1)
89 queue.tryGetAll() # drain stale frames
90 qDepth.tryGetAll()
91
92 print("Streaming... Press Q in the Open3D window to quit.")
93
94 try:
95 while True:
96 pclData = queue.tryGet()
97
98 if pclData is not None:
99 if pclData.isColor():
100 points, colors = pclData.getPointsRGB()
101 if len(points) > 0:
102 pcd.points = o3d.utility.Vector3dVector(points.astype(np.float64))
103 pcd.colors = o3d.utility.Vector3dVector(
104 np.delete(colors / 255.0, 3, 1).astype(np.float64)
105 )
106 else:
107 points = pclData.getPoints()
108 if len(points) > 0:
109 pcd.points = o3d.utility.Vector3dVector(points)
110 pcd.colors = o3d.utility.Vector3dVector()
111
112 if len(pcd.points) > 0:
113 if first:
114 vis.add_geometry(pcd)
115 ctr = vis.get_view_control()
116 ctr.set_front([0, 0, -1])
117 ctr.set_up([0, -1, 0])
118 ctr.set_lookat([0, 0, 1])
119 ctr.set_zoom(0.3)
120 first = False
121 else:
122 vis.update_geometry(pcd)
123
124 # Show colorized depth in an OpenCV window
125 depthMsg = qDepth.tryGet()
126 if depthMsg is not None:
127 cv2.imshow("Depth", colorizeDepth(depthMsg.getCvFrame()))
128
129 if cv2.waitKey(1) == ord("q"):
130 break
131 if not vis.poll_events():
132 break
133 vis.update_renderer()
134
135 except KeyboardInterrupt:
136 print("\nStopping...")
137
138 vis.destroy_window()
139 cv2.destroyAllWindows()
140
141 print("Done.")
142
143
144if __name__ == "__main__":
145 main()Need assistance?
Head over to Discussion Forum for technical support or any other questions you might have.