Script MJPEG server¶
Note
This example can only run on OAK POE devices. You need bootloader on/above version 0.0.15. You can flash bootloader by running python3 examples/bootloader/flash_bootloader.py
.
This demo runs an HTTP server on the device itself. Server will serve you MJPEG stream when you connect to it.
Demo¶
When you run the demo, it will print something similar to
Serving at 192.168.1.193:8080
If you open this IP in the browser (eg. chrome), you will see this:

if you click on the here
href, you will get the MJPEG video stream. For static image, you can check
out Script HTTP server.
Setup¶
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py
For additional information, please follow installation guide
Source code¶
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | #!/usr/bin/env python3 import depthai as dai import time # Start defining a pipeline pipeline = dai.Pipeline() # Define a source - color camera cam = pipeline.create(dai.node.ColorCamera) # VideoEncoder jpeg = pipeline.create(dai.node.VideoEncoder) jpeg.setDefaultProfilePreset(cam.getFps(), dai.VideoEncoderProperties.Profile.MJPEG) # Script node script = pipeline.create(dai.node.Script) script.setProcessor(dai.ProcessorType.LEON_CSS) script.setScript(""" import time import socket import fcntl import struct from socketserver import ThreadingMixIn from http.server import BaseHTTPRequestHandler, HTTPServer PORT = 8080 def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), -1071617759, # SIOCGIFADDR struct.pack('256s', ifname[:15].encode()) )[20:24]) class ThreadingSimpleServer(ThreadingMixIn, HTTPServer): pass class HTTPHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.end_headers() self.wfile.write(b'<h1>[DepthAI] Hello, world!</h1><p>Click <a href="img">here</a> for an image</p>') elif self.path == '/img': try: self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=--jpgboundary') self.end_headers() fpsCounter = 0 timeCounter = time.time() while True: jpegImage = node.io['jpeg'].get() self.wfile.write("--jpgboundary".encode()) self.wfile.write(bytes([13, 10])) self.send_header('Content-type', 'image/jpeg') self.send_header('Content-length', str(len(jpegImage.getData()))) self.end_headers() self.wfile.write(jpegImage.getData()) self.end_headers() fpsCounter = fpsCounter + 1 if time.time() - timeCounter > 1: node.warn(f'FPS: {fpsCounter}') fpsCounter = 0 timeCounter = time.time() except Exception as ex: node.warn(str(ex)) with ThreadingSimpleServer(("", PORT), HTTPHandler) as httpd: node.warn(f"Serving at {get_ip_address('re0')}:{PORT}") httpd.serve_forever() """) # Connections cam.video.link(jpeg.input) jpeg.bitstream.link(script.inputs['jpeg']) # Connect to device with pipeline with dai.Device(pipeline) as device: while not device.isClosed(): time.sleep(1) |
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | #include <chrono> #include <iostream> #include <thread> // Includes common necessary includes for development using depthai library #include "depthai/depthai.hpp" int main() { using namespace std; // Start defining a pipeline dai::Pipeline pipeline; auto cam = pipeline.create<dai::node::ColorCamera>(); auto jpeg = pipeline.create<dai::node::VideoEncoder>(); jpeg->setDefaultProfilePreset(cam->getFps(), dai::VideoEncoderProperties::Profile::MJPEG); cam->video.link(jpeg->input); // Script node auto script = pipeline.create<dai::node::Script>(); script->setProcessor(dai::ProcessorType::LEON_CSS); jpeg->bitstream.link(script->inputs["jpeg"]); script->setScript(R"( import time import socket import fcntl import struct from socketserver import ThreadingMixIn from http.server import BaseHTTPRequestHandler, HTTPServer PORT = 8080 def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), -1071617759, # SIOCGIFADDR struct.pack('256s', ifname[:15].encode()) )[20:24]) class ThreadingSimpleServer(ThreadingMixIn, HTTPServer): pass class HTTPHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(200) self.end_headers() self.wfile.write(b'<h1>[DepthAI] Hello, world!</h1><p>Click <a href="img">here</a> for an image</p>') elif self.path == '/img': try: self.send_response(200) self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=--jpgboundary') self.end_headers() fpsCounter = 0 timeCounter = time.time() while True: jpegImage = node.io['jpeg'].get() self.wfile.write("--jpgboundary".encode()) self.wfile.write(bytes([13, 10])) self.send_header('Content-type', 'image/jpeg') self.send_header('Content-length', str(len(jpegImage.getData()))) self.end_headers() self.wfile.write(jpegImage.getData()) self.end_headers() fpsCounter = fpsCounter + 1 if time.time() - timeCounter > 1: node.warn(f'FPS: {fpsCounter}') fpsCounter = 0 timeCounter = time.time() except Exception as ex: node.warn(str(ex)) with ThreadingSimpleServer(("", PORT), HTTPHandler) as httpd: node.warn(f"Serving at {get_ip_address('re0')}:{PORT}") httpd.serve_forever() )"); // Connect to device with pipeline dai::Device device(pipeline); while(!device.isClosed()) { this_thread::sleep_for(chrono::milliseconds(1000)); } return 0; } |