Software Stack
DepthAI

ON THIS PAGE

  • AprilTags on image replay
  • Demo
  • Setup
  • Pipeline
  • Source code

AprilTags on image replay

Utilizes the AprilTag node to detect apriltag markers on the image that's on the host computer. This example uses a custom Host Node called ImageReplay which reads images from the disk and sends them to the DepthAI device for processing. One could extend this example to read video instead of images.

Demo

Setup

This example requires the DepthAI v3 API, see installation instructions.

Pipeline

Source code

Python
C++

Python

Python
GitHub
1#!/usr/bin/env python3
2
3import cv2
4import depthai as dai
5import time
6from pathlib import Path
7
8# Get the absolute path of the current script's directory
9script_dir = Path(__file__).resolve().parent
10examplesRoot = (script_dir / Path('../')).resolve()  # This resolves the parent directory correctly
11models = examplesRoot / 'models'
12tagImage = models / 'april_tags.jpg'
13
14class ImageReplay(dai.node.ThreadedHostNode):
15    def __init__(self):
16        dai.node.ThreadedHostNode.__init__(self)
17        self.output =  self.createOutput()
18        frame = cv2.imread(str(tagImage.resolve()))
19        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
20        imgFrame = dai.ImgFrame()
21        imgFrame.setData(frame)
22        imgFrame.setWidth(frame.shape[1])
23        imgFrame.setHeight(frame.shape[0])
24        imgFrame.setType(dai.ImgFrame.Type.GRAY8)
25        self.imgFrame = imgFrame
26    def run(self):
27        while self.isRunning():
28            self.output.send(self.imgFrame)
29            time.sleep(0.03)
30
31with dai.Pipeline() as pipeline:
32    imageReplay = pipeline.create(ImageReplay)
33    aprilTagNode = pipeline.create(dai.node.AprilTag)
34    imageReplay.output.link(aprilTagNode.inputImage)
35    aprilTagNode.initialConfig.setFamily(dai.AprilTagConfig.Family.TAG_16H5)
36
37    passthroughOutputQueue = aprilTagNode.passthroughInputImage.createOutputQueue()
38    outQueue = aprilTagNode.out.createOutputQueue()
39
40    color = (0, 255, 0)
41    startTime = time.monotonic()
42    counter = 0
43    fps = 0.0
44
45    pipeline.start()
46    while pipeline.isRunning():
47        aprilTagMessage = outQueue.get()
48        assert(isinstance(aprilTagMessage, dai.AprilTags))
49        aprilTags = aprilTagMessage.aprilTags
50
51        counter += 1
52        currentTime = time.monotonic()
53        if (currentTime - startTime) > 1:
54            fps = counter / (currentTime - startTime)
55            counter = 0
56            startTime = currentTime
57
58        passthroughImage: dai.ImgFrame = passthroughOutputQueue.get()
59        frame = passthroughImage.getCvFrame()
60        frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
61
62        def to_int(tag):
63            return (int(tag.x), int(tag.y))
64
65        for tag in aprilTags:
66            topLeft = to_int(tag.topLeft)
67            topRight = to_int(tag.topRight)
68            bottomRight = to_int(tag.bottomRight)
69            bottomLeft = to_int(tag.bottomLeft)
70
71            center = (int((topLeft[0] + bottomRight[0]) / 2), int((topLeft[1] + bottomRight[1]) / 2))
72
73            cv2.line(frame, topLeft, topRight, color, 2, cv2.LINE_AA, 0)
74            cv2.line(frame, topRight,bottomRight, color, 2, cv2.LINE_AA, 0)
75            cv2.line(frame, bottomRight,bottomLeft, color, 2, cv2.LINE_AA, 0)
76            cv2.line(frame, bottomLeft,topLeft, color, 2, cv2.LINE_AA, 0)
77
78            idStr = "ID: " + str(tag.id)
79            cv2.putText(frame, idStr, center, cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
80
81            cv2.putText(frame, f"fps: {fps:.1f}", (200, 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
82
83        cv2.imshow("detections", frame)
84        if cv2.waitKey(1) == ord("q"):
85            break

Need assistance?

Head over to Discussion Forum for technical support or any other questions you might have.