ImageReplay which reads images from the disk and sends them to the DepthAI device for processing. One could extend this example to read video instead of images.Demo
This example requires the DepthAI v3 API, see installation instructions.Source code
Python
C++
Python
PythonGitHub
1#!/usr/bin/env python3
2
3import cv2
4import depthai as dai
5import time
6from pathlib import Path
7
8# Get the absolute path of the current script's directory
9script_dir = Path(__file__).resolve().parent
10examplesRoot = (script_dir / Path('../')).resolve() # This resolves the parent directory correctly
11models = examplesRoot / 'models'
12tagImage = models / 'april_tags.jpg'
13
14class ImageReplay(dai.node.ThreadedHostNode):
15 def __init__(self):
16 dai.node.ThreadedHostNode.__init__(self)
17 self.output = self.createOutput()
18 frame = cv2.imread(str(tagImage.resolve()))
19 frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
20 imgFrame = dai.ImgFrame()
21 imgFrame.setData(frame)
22 imgFrame.setWidth(frame.shape[1])
23 imgFrame.setHeight(frame.shape[0])
24 imgFrame.setType(dai.ImgFrame.Type.GRAY8)
25 self.imgFrame = imgFrame
26 def run(self):
27 while self.mainLoop():
28 self.output.send(self.imgFrame)
29 time.sleep(0.03)
30
31with dai.Pipeline() as pipeline:
32 imageReplay = pipeline.create(ImageReplay)
33 aprilTagNode = pipeline.create(dai.node.AprilTag)
34 imageReplay.output.link(aprilTagNode.inputImage)
35 aprilTagNode.initialConfig.setFamily(dai.AprilTagConfig.Family.TAG_16H5)
36
37 passthroughOutputQueue = aprilTagNode.passthroughInputImage.createOutputQueue()
38 outQueue = aprilTagNode.out.createOutputQueue()
39
40 color = (0, 255, 0)
41 startTime = time.monotonic()
42 counter = 0
43 fps = 0.0
44
45 pipeline.start()
46 while pipeline.isRunning():
47 aprilTagMessage = outQueue.get()
48 assert(isinstance(aprilTagMessage, dai.AprilTags))
49 aprilTags = aprilTagMessage.aprilTags
50
51 counter += 1
52 currentTime = time.monotonic()
53 if (currentTime - startTime) > 1:
54 fps = counter / (currentTime - startTime)
55 counter = 0
56 startTime = currentTime
57
58 passthroughImage: dai.ImgFrame = passthroughOutputQueue.get()
59 frame = passthroughImage.getCvFrame()
60 frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
61
62 def to_int(tag):
63 return (int(tag.x), int(tag.y))
64
65 for tag in aprilTags:
66 topLeft = to_int(tag.topLeft)
67 topRight = to_int(tag.topRight)
68 bottomRight = to_int(tag.bottomRight)
69 bottomLeft = to_int(tag.bottomLeft)
70
71 center = (int((topLeft[0] + bottomRight[0]) / 2), int((topLeft[1] + bottomRight[1]) / 2))
72
73 cv2.line(frame, topLeft, topRight, color, 2, cv2.LINE_AA, 0)
74 cv2.line(frame, topRight,bottomRight, color, 2, cv2.LINE_AA, 0)
75 cv2.line(frame, bottomRight,bottomLeft, color, 2, cv2.LINE_AA, 0)
76 cv2.line(frame, bottomLeft,topLeft, color, 2, cv2.LINE_AA, 0)
77
78 idStr = "ID: " + str(tag.id)
79 cv2.putText(frame, idStr, center, cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
80
81 cv2.putText(frame, f"fps: {fps:.1f}", (200, 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
82
83 cv2.imshow("detections", frame)
84 if cv2.waitKey(1) == ord("q"):
85 breakNeed assistance?
Head over to Discussion Forum for technical support or any other questions you might have.