Mono preview - Alternate between dot projector and illumination LED

This example will alternate between the IR illumination LED and IR dot projector. By default, example script will run both left and right monochrome camera sensors at 30FPS, and it will switch between the IR LED and dot projector every frame - meaning you will get LED-illuminated frames at 15FPS, and dot projector-illuminated frames at 15FPS.

LED-illuminated frames can be used for your AI vision tasks and CV algorithms (eg. Feature Tracker) in low-light environments. Dot projector-illuminated frames are used for active stereo depth.

Demo

On the video, we disabled both projector and LED for about a second, just to demonstrate how the scene looks in almost-complete darkness.

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script

git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py

For additional information, please follow installation guide

Source code

Also available on GitHub

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3

import cv2
import depthai as dai

if 1:  # PoE config
    fps = 30
    res = dai.MonoCameraProperties.SensorResolution.THE_400_P
    poolSize = 24  # default 3, increased to prevent desync
else:  # USB
    fps = 30
    res = dai.MonoCameraProperties.SensorResolution.THE_720_P
    poolSize = 8  # default 3, increased to prevent desync

# Create pipeline
pipeline = dai.Pipeline()

# Define sources and outputs
monoL = pipeline.create(dai.node.MonoCamera)
monoR = pipeline.create(dai.node.MonoCamera)

monoL.setCamera("left")
monoL.setResolution(res)
monoL.setFps(fps)
monoL.setNumFramesPool(poolSize)
monoR.setCamera("right")
monoR.setResolution(res)
monoR.setFps(fps)
monoR.setNumFramesPool(poolSize)

xoutDotL = pipeline.create(dai.node.XLinkOut)
xoutDotR = pipeline.create(dai.node.XLinkOut)
xoutFloodL = pipeline.create(dai.node.XLinkOut)
xoutFloodR = pipeline.create(dai.node.XLinkOut)

xoutDotL.setStreamName('dot-left')
xoutDotR.setStreamName('dot-right')
xoutFloodL.setStreamName('flood-left')
xoutFloodR.setStreamName('flood-right')
streams = ['dot-left', 'dot-right', 'flood-left', 'flood-right']

# Script node for frame routing and IR dot/flood alternate
script = pipeline.create(dai.node.Script)
script.setProcessor(dai.ProcessorType.LEON_CSS)
script.setScript("""
    dotBright = 0.8
    floodBright = 0.1
    LOGGING = False  # Set `True` for latency/timings debugging

    node.warn(f'IR drivers detected: {str(Device.getIrDrivers())}')

    flagDot = False
    while True:
        # Wait first for a frame event, received at MIPI start-of-frame
        event = node.io['event'].get()
        if LOGGING: tEvent = Clock.now()

        # Immediately reconfigure the IR driver.
        # Note the logic is inverted, as it applies for next frame
        Device.setIrLaserDotProjectorIntensity(0 if flagDot else dotBright)
        Device.setIrFloodLightIntensity(floodBright if flagDot else 0)
        if LOGGING: tIrSet = Clock.now()

        # Wait for the actual frames (after MIPI capture and ISP proc is done)
        frameL = node.io['frameL'].get()
        if LOGGING: tLeft = Clock.now()
        frameR = node.io['frameR'].get()
        if LOGGING: tRight = Clock.now()

        if LOGGING:
            latIR      = (tIrSet - tEvent               ).total_seconds() * 1000
            latEv      = (tEvent - event.getTimestamp() ).total_seconds() * 1000
            latProcL   = (tLeft  - event.getTimestamp() ).total_seconds() * 1000
            diffRecvRL = (tRight - tLeft                ).total_seconds() * 1000
            node.warn(f'T[ms] latEv:{latEv:5.3f} latIR:{latIR:5.3f} latProcL:{latProcL:6.3f} '
                    + f' diffRecvRL:{diffRecvRL:5.3f}')

        # Sync checks
        diffSeq = frameL.getSequenceNum() - event.getSequenceNum()
        diffTsEv = (frameL.getTimestamp() - event.getTimestamp()).total_seconds() * 1000
        diffTsRL = (frameR.getTimestamp() - frameL.getTimestamp()).total_seconds() * 1000
        if diffSeq or diffTsEv or (abs(diffTsRL) > 0.8):
            node.error(f'frame/event desync! Fr-Ev: {diffSeq} frames,'
                    + f' {diffTsEv:.3f} ms; R-L: {diffTsRL:.3f} ms')

        # Route the frames to their respective outputs
        node.io['dotL' if flagDot else 'floodL'].send(frameL)
        node.io['dotR' if flagDot else 'floodR'].send(frameR)

        flagDot = not flagDot
""")

# Linking
monoL.frameEvent.link(script.inputs['event'])
monoL.out.link(script.inputs['frameL'])
monoR.out.link(script.inputs['frameR'])

script.outputs['dotL'].link(xoutDotL.input)
script.outputs['dotR'].link(xoutDotR.input)
script.outputs['floodL'].link(xoutFloodL.input)
script.outputs['floodR'].link(xoutFloodR.input)

# Connect to device and start pipeline
with dai.Device(pipeline) as device:
    queues = [device.getOutputQueue(name=s, maxSize=4, blocking=False) for s in streams]

    while True:
        for q in queues:
            pkt = q.tryGet()
            if pkt is not None:
                name = q.getName()
                frame = pkt.getCvFrame()
                cv2.imshow(name, frame)

        if cv2.waitKey(5) == ord('q'):
            break

Not yet implemented.

Got questions?

Head over to Discussion Forum for technical support or any other questions you might have.