Demuxing Synchronized Script Outputs

This example demonstrates the use of the DepthAI Sync node in conjunction with the Demux node to synchronize and then demux outputs from two separate script nodes. Each script node generates data buffers at different intervals, which are first synchronized by the Sync node and then demultiplexed by the MessageDemux node.

Similar samples:

Demo

~/depthai-python/examples/Sync $ python3 demux_message_group.py
Start
Buffer 1 timestamp: 0:00:03.581073
Buffer 2 timestamp: 0:00:03.591084
----------
Buffer 1 timestamp: 0:00:04.583100
Buffer 2 timestamp: 0:00:04.497079
----------
Buffer 1 timestamp: 0:00:06.587174
Buffer 2 timestamp: 0:00:06.611154
----------
Buffer 1 timestamp: 0:00:07.589147
Buffer 2 timestamp: 0:00:07.517125
----------
Buffer 1 timestamp: 0:00:09.593076
Buffer 2 timestamp: 0:00:09.631089
----------
Buffer 1 timestamp: 0:00:10.595106
Buffer 2 timestamp: 0:00:10.537082

Setup

Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script

git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py

For additional information, please follow installation guide

Source code

Also available on GitHub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import depthai as dai
import time
from datetime import timedelta

pipeline = dai.Pipeline()

script1 = pipeline.create(dai.node.Script)
script1.setScript("""
from time import sleep

while True:
    sleep(1)
    b = Buffer(512)
    b.setData(bytes(4 * [i for i in range(0, 128)]))
    b.setTimestamp(Clock.now())
    node.io['out'].send(b)
""")

script2 = pipeline.create(dai.node.Script)
script2.setScript("""
from time import sleep

while True:
    sleep(0.3)
    b = Buffer(512)
    b.setData(bytes(4 * [i for i in range(128, 256)]))
    b.setTimestamp(Clock.now())
    node.io['out'].send(b)
""")

sync = pipeline.create(dai.node.Sync)
sync.setSyncThreshold(timedelta(milliseconds=100))

demux = pipeline.create(dai.node.MessageDemux)

xout1 = pipeline.create(dai.node.XLinkOut)
xout1.setStreamName("xout1")
xout2 = pipeline.create(dai.node.XLinkOut)
xout2.setStreamName("xout2")

script1.outputs["out"].link(sync.inputs["s1"])
script2.outputs["out"].link(sync.inputs["s2"])
sync.out.link(demux.input)
demux.outputs["s1"].link(xout1.input)
demux.outputs["s2"].link(xout2.input)

with dai.Device(pipeline) as device:
    print("Start")
    q1 = device.getOutputQueue("xout1", maxSize=10, blocking=True)
    q2 = device.getOutputQueue("xout2", maxSize=10, blocking=True)
    while True:
        bufS1 = q1.get()
        bufS2 = q2.get()
        print(f"Buffer 1 timestamp: {bufS1.getTimestamp()}")
        print(f"Buffer 2 timestamp: {bufS2.getTimestamp()}")
        print("----------")
        time.sleep(0.2)

Also available on GitHub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <chrono>
#include <iostream>

#include "depthai/depthai.hpp"

int main() {
    dai::Pipeline pipeline;

    auto script1 = pipeline.create<dai::node::Script>();
    script1->setScript(
        R"SCRPT(
from time import sleep

while True:
    sleep(1)
    b = Buffer(512)
    b.setData(bytes(4 * [i for i in range(0, 128)]))
    b.setTimestamp(Clock.now())
    node.io['out'].send(b)
)SCRPT");

    auto script2 = pipeline.create<dai::node::Script>();
    script2->setScript(
        R"SCRPT(
from time import sleep

while True:
    sleep(0.3)
    b = Buffer(512)
    b.setData(bytes(4 * [i for i in range(128, 256)]))
    b.setTimestamp(Clock.now())
    node.io['out'].send(b)
)SCRPT");

    auto sync = pipeline.create<dai::node::Sync>();
    sync->setSyncThreshold(std::chrono::milliseconds(100));

    auto demux = pipeline.create<dai::node::MessageDemux>();

    auto xout1 = pipeline.create<dai::node::XLinkOut>();
    xout1->setStreamName("xout1");
    auto xout2 = pipeline.create<dai::node::XLinkOut>();
    xout2->setStreamName("xout2");

    script1->outputs["out"].link(sync->inputs["s1"]);
    script2->outputs["out"].link(sync->inputs["s2"]);
    sync->out.link(demux->input);
    demux->outputs["s1"].link(xout1->input);
    demux->outputs["s2"].link(xout2->input);

    dai::Device device(pipeline);
    std::cout << "Start" << std::endl;
    auto queue1 = device.getOutputQueue("xout1", 10, true);
    auto queue2 = device.getOutputQueue("xout2", 10, true);
    while(true) {
        auto bufS1 = queue1->get<dai::Buffer>();
        auto bufS2 = queue2->get<dai::Buffer>();
        std::cout << "Buffer 1 timestamp: " << bufS1->getTimestamp().time_since_epoch().count() << std::endl;
        std::cout << "Buffer 2 timestamp: " << bufS2->getTimestamp().time_since_epoch().count() << std::endl;
        std::cout << "----------" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

How it Works

  1. Initialize a DepthAI pipeline.

  2. Create two Script nodes, with each script generating and sending data buffers at different intervals.

  3. Set up a Sync node with a synchronization threshold.

  4. Integrate a MessageDemux node to separate the synchronized data streams.

  5. Link the outputs of the Script nodes to the Sync node, and then from the Sync node to the MessageDemux node.

  6. Start the pipeline and continuously receive demultiplexed data from the MessageDemux node.

  7. Print the timestamps of the demultiplexed data for comparison.

Got questions?

Head over to Discussion Forum for technical support or any other questions you might have.