Script EMMC access¶
Note
This example requires a device with onboard EMMC memory (e.g. OAK-1-POE).
To check whether your device has EMMC memory, run the bootloader version script at Bootloader Version and check whether the output contains Memory.EMMC
.
This example shows how to use Script node to access EMMC memory of the device. Default location for EMMC memory is /media/mmcsd-0-0/
. The first script in the pipeline works by writing an image to EMMC memory.
The second script starts a webserver on :/media/mmcsd-0-0/
directory and serves the image from EMMC memory.
Setup¶
Please run the install script to download all required dependencies. Please note that this script must be ran from git context, so you have to download the depthai-python repository first and then run the script
git clone https://github.com/luxonis/depthai-python.git
cd depthai-python/examples
python3 install_requirements.py
For additional information, please follow installation guide
Prerequisites¶
We first need to enable the EMMC memory as storage on the device. To do so, we need to flash the device with an application that has EMMC enabled.
Example application:
import depthai as dai
# Create pipeline
pipeline = dai.Pipeline()
# Set board config
board = dai.BoardConfig()
board.emmc = True
config = dai.Device.Config()
config.board = board
pipeline.setBoardConfig(board)
(f, bl) = dai.DeviceBootloader.getFirstAvailableDevice()
bootloader = dai.DeviceBootloader(bl)
progress = lambda p : print(f'Flashing progress: {p*100:.1f}%')
(r, errmsg) = bootloader.flash(progress, pipeline, memory=dai.DeviceBootloader.Memory.EMMC)
if r: print("Flash OK")
The above code will flash the device with the application that enables the script node to access EMMC memory. Now we should be able to access EMMC memory even when the device is in standard mode (connected to the host PC).
Source code¶
Also available on GitHub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | import depthai as dai import cv2 # Start defining a pipeline pipeline = dai.Pipeline() board = dai.BoardConfig() board.emmc = True pipeline.setBoardConfig(board) # Define source and output camRgb = pipeline.create(dai.node.ColorCamera) jpegEncoder = pipeline.create(dai.node.VideoEncoder) # Properties camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_4_K) jpegEncoder.setDefaultProfilePreset(1, dai.VideoEncoderProperties.Profile.MJPEG) #Set a write script script_write = pipeline.createScript() script_write.setProcessor(dai.ProcessorType.LEON_CSS) script_write.setScript(""" import os index = 1000 import time while True: # Find an unused file name first while True: path = '/media/mmcsd-0-0/' + str(index) + '.jpg' if not os.path.exists(path): break index += 1 frame = node.io['jpeg'].get() node.warn(f'Saving to EMMC: {path}') with open(path, 'wb') as f: f.write(frame.getData()) index += 1 time.sleep(3) """) #Set a read script script_read = pipeline.createScript() script_read.setProcessor(dai.ProcessorType.LEON_CSS) script_read.setScript(""" import http.server import socketserver import socket import fcntl import struct import os def get_ip_address(ifname): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), -1071617759, # SIOCGIFADDR struct.pack('256s', ifname[:15].encode()) )[20:24]) # Note: `chdir` here will prevent unmount, this should be improved! os.chdir('/media/mmcsd-0-0') PORT = 80 Handler = http.server.SimpleHTTPRequestHandler with socketserver.TCPServer(("", PORT), Handler) as httpd: ip = get_ip_address('re0') node.warn(f'===== HTTP file server accessible at: http://{ip}') httpd.serve_forever() """) # Linking camRgb.video.link(jpegEncoder.input) jpegEncoder.bitstream.link(script_write.inputs['jpeg']) script_write.inputs['jpeg'].setBlocking(False) xout = pipeline.create(dai.node.XLinkOut) xout.setStreamName("rgb") script_read.outputs['jpeg'].link(xout.input) # Pipeline defined, now the device is connected to with dai.Device(pipeline) as device: # Output queue will be used to get the rgb frames from the output defined above qRgb = device.getOutputQueue(name="rgb", maxSize=100, blocking=False) while True: inRgb = qRgb.tryGet() if inRgb is not None: cv2.imshow("rgb", inRgb.getCvFrame()) if cv2.waitKey(1) == ord('q'): break |