Frame syncing on OAK

This example showcases how you can use Script node to perform Message syncing of multiple streams. Example uses ImgFrame’s timestamps to achieve syncing precision.

Similar syncing demo scripts (python) can be found at our depthai-experiments repository in gen2-syncing folder.

Demo

Terminal log after about 13 minutes. Color and disparity streams are perfectly in-sync.


[1662574807.8811488] Stream rgb, timestamp: 7:26:21.601595, sequence number: 21852 [1662574807.8821492] Stream disp, timestamp: 7:26:21.601401, sequence number: 21852

[1662574807.913144] Stream rgb, timestamp: 7:26:21.634982, sequence number: 21853 [1662574807.9141443] Stream disp, timestamp: 7:26:21.634730, sequence number: 21853

[1662574807.9451444] Stream rgb, timestamp: 7:26:21.668243, sequence number: 21854 [1662574807.946151] Stream disp, timestamp: 7:26:21.668057, sequence number: 21854

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script

git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py

For additional information, please follow installation guide

This example script requires external file(s) to run. If you are using:

  • depthai-python, run python3 examples/install_requirements.py to download required file(s)

  • dephtai-core, required file(s) will get downloaded automatically when building the example

Source code

Also available on GitHub

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import depthai as dai
import time

FPS = 30

pipeline = dai.Pipeline()

# Define a source - color camera
camRgb = pipeline.create(dai.node.ColorCamera)
# Since we are saving RGB frames in Script node we need to make the
# video pool size larger, otherwise the pipeline will freeze because
# the ColorCamera won't be able to produce new video frames.
camRgb.setVideoNumFramesPool(10)
camRgb.setFps(FPS)

left = pipeline.create(dai.node.MonoCamera)
left.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
left.setCamera("left")
left.setFps(FPS)

right = pipeline.create(dai.node.MonoCamera)
right.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
right.setCamera("right")
right.setFps(FPS)

stereo = pipeline.createStereoDepth()
stereo.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
stereo.setLeftRightCheck(True)
stereo.setExtendedDisparity(False)
stereo.setSubpixel(False)
left.out.link(stereo.left)
right.out.link(stereo.right)

# Script node will sync high-res frames
script = pipeline.create(dai.node.Script)

# Send both streams to the Script node so we can sync them
stereo.disparity.link(script.inputs["disp_in"])
camRgb.video.link(script.inputs["rgb_in"])

script.setScript("""
    FPS=30
    import time
    from datetime import timedelta
    import math

    # Timestamp threshold (in miliseconds) under which frames will be considered synced.
    # Lower number means frames will have less delay between them, which can potentially
    # lead to dropped frames.
    MS_THRESHOL=math.ceil(500 / FPS)

    def check_sync(queues, timestamp):
        matching_frames = []
        for name, list in queues.items(): # Go through each available stream
            # node.warn(f"List {name}, len {str(len(list))}")
            for i, msg in enumerate(list): # Go through each frame of this stream
                time_diff = abs(msg.getTimestamp() - timestamp)
                if time_diff <= timedelta(milliseconds=MS_THRESHOL): # If time diff is below threshold, this frame is considered in-sync
                    matching_frames.append(i) # Append the position of the synced frame, so we can later remove all older frames
                    break

        if len(matching_frames) == len(queues):
            # We have all frames synced. Remove the excess ones
            i = 0
            for name, list in queues.items():
                queues[name] = queues[name][matching_frames[i]:] # Remove older (excess) frames
                i+=1
            return True
        else:
            return False # We don't have synced frames yet

    names = ['disp', 'rgb']
    frames = dict() # Dict where we store all received frames
    for name in names:
        frames[name] = []

    while True:
        for name in names:
            f = node.io[name+"_in"].tryGet()
            if f is not None:
                frames[name].append(f) # Save received frame

                if check_sync(frames, f.getTimestamp()): # Check if we have any synced frames
                    # Frames synced!
                    node.info(f"Synced frame!")
                    # node.warn(f"Queue size. Disp: {len(frames['disp'])}, rgb: {len(frames['rgb'])}")
                    for name, list in frames.items():
                        syncedF = list.pop(0) # We have removed older (excess) frames, so at positions 0 in dict we have synced frames
                        node.info(f"{name}, ts: {str(syncedF.getTimestamp())}, seq {str(syncedF.getSequenceNum())}")
                        node.io[name+'_out'].send(syncedF) # Send synced frames to the host


        time.sleep(0.001)  # Avoid lazy looping
""")

script_out = ['disp', 'rgb']

for name in script_out: # Create XLinkOut for disp/rgb streams
    xout = pipeline.create(dai.node.XLinkOut)
    xout.setStreamName(name)
    script.outputs[name+'_out'].link(xout.input)

with dai.Device(pipeline) as device:
    device.setLogLevel(dai.LogLevel.INFO)
    device.setLogOutputLevel(dai.LogLevel.INFO)
    names = ['rgb', 'disp']
    queues = [device.getOutputQueue(name) for name in names]

    while True:
        for q in queues:
            img: dai.ImgFrame = q.get()
            # Display timestamp/sequence number of two synced frames
            print(f"Time: {time.time()}. Stream {q.getName()}, timestamp: {img.getTimestamp()}, sequence number: {img.getSequenceNum()}")

Also available on GitHub

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include <chrono>
#include <iostream>

// Includes common necessary includes for development using depthai library
#include "depthai/depthai.hpp"

static constexpr auto FPS = 30;

int main() {
    dai::Pipeline pipeline;

    // Define a source - color camera
    auto camRgb = pipeline.create<dai::node::ColorCamera>();
    // Since we are saving RGB frames in Script node we need to make the
    // video pool size larger, otherwise the pipeline will freeze because
    // the ColorCamera won't be able to produce new video frames.
    camRgb->setVideoNumFramesPool(10);
    camRgb->setFps(FPS);

    auto left = pipeline.create<dai::node::MonoCamera>();
    left->setResolution(dai::MonoCameraProperties::SensorResolution::THE_400_P);
    left->setCamera("left");
    left->setFps(FPS);

    auto right = pipeline.create<dai::node::MonoCamera>();
    right->setResolution(dai::MonoCameraProperties::SensorResolution::THE_400_P);
    right->setCamera("right");
    right->setFps(FPS);

    auto stereo = pipeline.create<dai::node::StereoDepth>();
    stereo->initialConfig.setMedianFilter(dai::MedianFilter::KERNEL_7x7);
    stereo->setLeftRightCheck(true);
    stereo->setExtendedDisparity(false);
    stereo->setSubpixel(false);
    stereo->setDepthAlign(dai::CameraBoardSocket::CAM_A);
    left->out.link(stereo->left);
    right->out.link(stereo->right);

    // Script node will sync high-res frames
    auto script = pipeline.create<dai::node::Script>();

    // Send both streams to the Script node so we can sync them
    stereo->disparity.link(script->inputs["disp_in"]);
    camRgb->video.link(script->inputs["rgb_in"]);

    script->setScript(R"(
        FPS=30
        import time
        from datetime import timedelta
        import math

        # Timestamp threshold (in miliseconds) under which frames will be considered synced.
        # Lower number means frames will have less delay between them, which can potentially
        # lead to dropped frames.
        MS_THRESHOL=math.ceil(500 / FPS)

        def check_sync(queues, timestamp):
            matching_frames = []
            for name, list in queues.items(): # Go through each available stream
                # node.warn(f"List {name}, len {str(len(list))}")
                for i, msg in enumerate(list): # Go through each frame of this stream
                    time_diff = abs(msg.getTimestamp() - timestamp)
                    if time_diff <= timedelta(milliseconds=MS_THRESHOL): # If time diff is below threshold, this frame is considered in-sync
                        matching_frames.append(i) # Append the position of the synced frame, so we can later remove all older frames
                        break

            if len(matching_frames) == len(queues):
                # We have all frames synced. Remove the excess ones
                i = 0
                for name, list in queues.items():
                    queues[name] = queues[name][matching_frames[i]:] # Remove older (excess) frames
                    i+=1
                return True
            else:
                return False # We don't have synced frames yet

        names = ['disp', 'rgb']
        frames = dict() # Dict where we store all received frames
        for name in names:
            frames[name] = []

        while True:
            for name in names:
                f = node.io[name+"_in"].tryGet()
                if f is not None:
                    frames[name].append(f) # Save received frame

                    if check_sync(frames, f.getTimestamp()): # Check if we have any synced frames
                        # Frames synced!
                        node.info(f"Synced frame!")
                        for name, list in frames.items():
                            syncedF = list.pop(0) # We have removed older (excess) frames, so at positions 0 in dict we have synced frames
                            node.info(f"{name}, ts: {str(syncedF.getTimestamp())}, seq {str(syncedF.getSequenceNum())}")
                            node.io[name+'_out'].send(syncedF) # Send synced frames to the host


            time.sleep(0.001)  # Avoid lazy looping
        )");

    std::vector<std::string> scriptOut{"disp", "rgb"};
    // Create XLinkOut for disp/rgb streams
    for(auto& name : scriptOut) {
        auto xout = pipeline.create<dai::node::XLinkOut>();
        xout->setStreamName(name);
        script->outputs[name + "_out"].link(xout->input);
    }

    dai::Device device(pipeline);

    device.setLogOutputLevel(dai::LogLevel::INFO);
    device.setLogLevel(dai::LogLevel::INFO);
    std::vector<std::string> names{"rgb", "disp"};
    std::map<std::string, std::shared_ptr<dai::DataOutputQueue>> streams;
    for(auto& name : names) {
        streams[name] = device.getOutputQueue(name);
    }
    while(true) {
        for(auto& iter : streams) {
            auto name = iter.first;
            auto queue = iter.second;
            auto img = queue->get<dai::ImgFrame>();
            // Display timestamp/sequence number of two synced frames
            std::cout << "Stream " << name << ", timestamp: " << img->getTimestamp().time_since_epoch().count()
                      << ", sequence number: " << img->getSequenceNum() << std::endl;
        }
    }
}

Got questions?

Head over to Discussion Forum for technical support or any other questions you might have.