diff --git a/m8-controller-box/README.md b/m8-controller-box/README.md new file mode 100644 index 000000000..95ed9a754 --- /dev/null +++ b/m8-controller-box/README.md @@ -0,0 +1,146 @@ +# M8 Controller Box (PR1 Prototype) + +The **M8 Controller Box (PR1)** is a versatile USB-connected I/O and communication expansion module designed to simplify integration with CAN devices, sensors, relays, and general-purpose GPIO. + +It connects directly to your host device via USB and provides a compact, robust interface for industrial and embedded applications. + +## Key Features + +### USB ↔ CAN Bus Interface + +- Integrated **USB to CAN transceiver** +- Connected internally over an on-board **USB 2.0 hub** +- Powered by **candleLight firmware** +- Compatible with **can-utils** on Linux +- Optional **120Ω termination resistor** (not enabled by default) + +**Linux setup example:** + +```bash +sudo ip link set can0 type can bitrate 500000 +sudo ip link set can0 up +cansend can0 123#1122334455667788 +``` + +Note: Linux is required for CAN functionality. + +### USB Audio (Speaker) + +- Integrated via USB hub using **PCM2912APJTR** +- No additional drivers required (Linux tested) + +### USB 2.0 Expansion + +- USB-A ports act as **USB 2.0 extensions** +- Connected via internal USB hub +- Up to **500mA current limit** + +### UART1 – RS232 + +Available via dedicated interface. + +Contact us for configuration and usage details. + +### FSYNC & Strobe + +Hardware support available. + +Contact us for integration support and documentation. + +### IO Interface (via rp2040_u2if) + +General-purpose I/O is handled through the **RP2040 USB-to-interface firmware**: + +Repository: +https://github.com/luxonis/rp2040_u2if + +**Setup:** + +1. Download the repository +2. Install dependencies +3. Flash firmware +4. Refer to pinout diagram for GPIO mapping + +## Buttons & LEDs + +**Buttons (Top → Bottom):** + +- GPIO19 +- GPIO20 +- GPIO21 + +**LEDs (Top → Bottom):** + +- GPIO17 +- GPIO16 +- GPIO18 + +## GPIO Overview + +- GPIOs: **0–13, 26, 27** +- GPIO64–71 available +- Configurable as: + - Outputs + - Inputs + +## Relays + +Relays are present in hardware but **not yet supported in firmware**. + +Contact us for early access support. + +## What’s in the Box + +- M8 Controller Box (PR1 Prototype) +- M8 Cable (Male–Female) +- Plug-in screw terminals: + - 3× 6-pin + - 1× 4-pin + - 2× 3-pin + +## OS & Prerequisites + +- Linux required for CAN functionality +- USB configuration may require: + - USB muxing + - Forcing USB Host mode + +Setup differs between **OAK-4 S** and **OAK-4 D**. + +(Example configuration guide coming soon.) + +## Pinout Diagram + +![M8 Controller Box Schematics](media/schematics.png) + +## Example Application + +The repository includes the following example applications: + +### simple-example + +A minimal container example running directly on the M8 Controller Box. + +- LED on **pin 18** blinks continuously +- Button on **pin 19** turns on LED on **pin 17** + +### depthai-example + +Based on the official Luxonis hand pose example. + +- Performs hand detection and landmark estimation +- Turns on LED on **pin 17** when a hand is detected + +### can-example + +Demonstrates CAN transmission triggered by a physical button. + +- Monitors button on **pin 19** +- Sends a CAN frame over `can0` when pressed +- Uses `python-can` with Linux SocketCAN + +## Support + +For integration help, firmware support, or early-access features: + +Please contact us. diff --git a/m8-controller-box/can-example/.oakappignore b/m8-controller-box/can-example/.oakappignore new file mode 100644 index 000000000..671c0ae52 --- /dev/null +++ b/m8-controller-box/can-example/.oakappignore @@ -0,0 +1,33 @@ +# Python virtual environments +venv/ +.venv/ + +# Node.js +# ignore node_modules, it will be reinstalled in the container +node_modules/ + +# Multimedia files +media/ + +# Documentation +README.md + +# VCS +.git/ +.github/ +.gitlab/ + +# The following files are ignored by default +# uncomment a line if you explicitly need it + +# !*.oakapp + +# Python +# !**/.mypy_cache/ +# !**/.ruff_cache/ + +# IDE files +# !**/.idea +# !**/.vscode +# !**/.zed + diff --git a/m8-controller-box/can-example/README.md b/m8-controller-box/can-example/README.md new file mode 100644 index 000000000..47263ddd5 --- /dev/null +++ b/m8-controller-box/can-example/README.md @@ -0,0 +1,52 @@ +# M8 CAN Transmission Example (Button Triggered) + +This example demonstrates how to send data over the **CAN bus** from the M8 Controller Box by pressing a physical button. + +The application runs inside a container directly on the device and uses `python-can` with the Linux SocketCAN interface. + +## Functionality + +This example performs the following actions: + +- Monitors the button connected to **GPIO pin 19**. +- When the button is pressed, a CAN frame is transmitted over the M8 CAN interface (`can0`). + +This provides a minimal, practical reference for sending CAN messages from a containerized application running on the OAK4 device. + +## CAN Interface Setup (Required) + +Before running the application, the CAN interface must be configured on the target device where CAN messages should be transmitted. + +Run the following commands on the device: + +```bash +sudo ip link set can0 type can bitrate 500000 +sudo ip link set can0 up +``` + +This: + +- Configures the CAN bitrate to **500 kbps** +- Brings the `can0` interface online + +The bitrate must match the rest of the CAN network. + +## Monitoring CAN Traffic + +To verify transmission, you can listen to CAN traffic on a Linux system using: + +```bash +candump can0 +``` + +When the button on **pin 19** is pressed, a CAN frame will appear on the bus. + +## Use Case + +This example serves as a minimal reference implementation for: + +- Sending CAN messages from the M8 Controller Box +- Integrating containerized applications with CAN-based systems +- Trigger-based CAN communication using GPIO inputs + +It can be extended to transmit structured data, control messages, or sensor outputs over CAN. diff --git a/m8-controller-box/can-example/backend-run.sh b/m8-controller-box/can-example/backend-run.sh new file mode 100644 index 000000000..1fe73b103 --- /dev/null +++ b/m8-controller-box/can-example/backend-run.sh @@ -0,0 +1,3 @@ +#!/bin/sh +echo "Starting Backend" +exec python3.12 /app/main.py diff --git a/m8-controller-box/can-example/main.py b/m8-controller-box/can-example/main.py new file mode 100644 index 000000000..d56ac53eb --- /dev/null +++ b/m8-controller-box/can-example/main.py @@ -0,0 +1,54 @@ +import time +import can +from utils.rp2040_u2if import RP2040_u2if + +# button +BUTTON_PIN = 19 + +rp2040 = RP2040_u2if() +rp2040.open() + +# Button uses pull-up; typical behavior: released=1, pressed=0 +rp2040.gpio_init_pin(BUTTON_PIN, RP2040_u2if.GPIO_IN, RP2040_u2if.GPIO_PULL_UP) + +# --- CAN (python-can with SocketCAN) --- +CAN_IFACE = "can0" +CAN_ID = 0x123 +CAN_DATA = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88] + +bus = can.interface.Bus(channel=CAN_IFACE, interface="socketcan") + + +def send_can_frame(): + msg = can.Message( + arbitration_id=CAN_ID, + data=CAN_DATA, + is_extended_id=False, + ) + bus.send(msg) + print(f"Sent CAN: {CAN_IFACE} id=0x{CAN_ID:X} data={CAN_DATA}") + + +# --- Button edge detection (send once per press) --- +last_button_state = rp2040.gpio_get_pin(BUTTON_PIN) + +# Debounce +last_press_time = 0.0 +debounce_s = 0.05 + +while True: + button_state = rp2040.gpio_get_pin(BUTTON_PIN) + + # With pull-up: a press is usually 1->0 (falling edge) + now = time.monotonic() + pressed_edge = (last_button_state != 0) and (button_state == 0) + + if pressed_edge and (now - last_press_time) > debounce_s: + try: + send_can_frame() + except Exception as e: + print(f"[ERROR] Failed to send CAN frame: {e}") + last_press_time = now + + last_button_state = button_state + time.sleep(0.001) diff --git a/m8-controller-box/can-example/oakapp.toml b/m8-controller-box/can-example/oakapp.toml new file mode 100644 index 000000000..1e92b123e --- /dev/null +++ b/m8-controller-box/can-example/oakapp.toml @@ -0,0 +1,33 @@ +identifier = "com.example.m8-controller-box.can-example" +entrypoint = ["bash", "-c", "/usr/bin/runsvdir -P /etc/service"] +app_version = "1.0.2" +assign_frontend_port = true + +prepare_container = [ + { type = "COPY", source = "./requirements.txt", target = "./requirements.txt" }, + { type = "RUN", command = "python3.12 -m pip install -r /app/requirements.txt --break-system-packages" }, + { type = "RUN", command = "bash -lc 'set -e; apt-get update && apt-get install -y libhidapi-hidraw0 libhidapi-libusb0'" } +] + +build_steps = [ + "mkdir -p /etc/service/backend", + "cp /app/backend-run.sh /etc/service/backend/run", + "chmod +x /etc/service/backend/run", +] + +allowed_devices = [{ allow = true, access = "rwm" }] + +additional_mounts = [ + { source = "/dev", target = "/dev", type = "devtmpfs", options = [ + "mode=777", + ] } +] + +[base_image] +api_url = "https://registry-1.docker.io" +service = "registry.docker.io" +oauth_url = "https://auth.docker.io/token" +auth_type = "repository" +auth_name = "luxonis/oakapp-base" +image_name = "luxonis/oakapp-base" +image_tag = "1.2.6" diff --git a/m8-controller-box/can-example/requirements.txt b/m8-controller-box/can-example/requirements.txt new file mode 100644 index 000000000..3697e7349 --- /dev/null +++ b/m8-controller-box/can-example/requirements.txt @@ -0,0 +1,2 @@ +hidapi +python-can diff --git a/m8-controller-box/can-example/utils/rp2040_u2if.py b/m8-controller-box/can-example/utils/rp2040_u2if.py new file mode 100644 index 000000000..051c45451 --- /dev/null +++ b/m8-controller-box/can-example/utils/rp2040_u2if.py @@ -0,0 +1,529 @@ +# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries +# +# SPDX-License-Identifier: MIT +"""Helper class for use with RP2040 running u2if firmware""" +# https://github.com/execuc/u2if + +import os +import time +import hid + +# Use to set delay between reset and device reopen. if negative, don't reset at all +RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1)) + +# pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements +# pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods + + +class RP2040_u2if: + """Helper class for use with RP2040 running u2if firmware""" + + # MISC + RESP_OK = 0x01 + SYS_RESET = 0x10 + + # GPIO + GPIO_INIT_PIN = 0x20 + GPIO_SET_VALUE = 0x21 + GPIO_GET_VALUE = 0x22 + # Values + GPIO_IN = 0 + GPIO_OUT = 1 + GPIO_LOW = 0 + GPIO_HIGH = 1 + GPIO_PULL_NONE = 0 + GPIO_PULL_UP = 1 + GPIO_PULL_DOWN = 2 + + # ADC + ADC_INIT_PIN = 0x40 + ADC_GET_VALUE = 0x41 + + # I2C + I2C0_INIT = 0x80 + I2C0_DEINIT = 0x81 + I2C0_WRITE = 0x82 + I2C0_READ = 0x83 + I2C0_WRITE_FROM_UART = 0x84 + I2C1_INIT = I2C0_INIT + 0x10 + I2C1_DEINIT = I2C0_DEINIT + 0x10 + I2C1_WRITE = I2C0_WRITE + 0x10 + I2C1_READ = I2C0_READ + 0x10 + I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10 + + # SPI + SPI0_INIT = 0x60 + SPI0_DEINIT = 0x61 + SPI0_WRITE = 0x62 + SPI0_READ = 0x63 + SPI0_WRITE_FROM_UART = 0x64 + SPI1_INIT = SPI0_INIT + 0x10 + SPI1_DEINIT = SPI0_DEINIT + 0x10 + SPI1_WRITE = SPI0_WRITE + 0x10 + SPI1_READ = SPI0_READ + 0x10 + SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10 + + # WS2812B (LED) + WS2812B_INIT = 0xA0 + WS2812B_DEINIT = 0xA1 + WS2812B_WRITE = 0xA2 + + # PWM + PWM_INIT_PIN = 0x30 + PWM_DEINIT_PIN = 0x31 + PWM_SET_FREQ = 0x32 + PWM_GET_FREQ = 0x33 + PWM_SET_DUTY_U16 = 0x34 + PWM_GET_DUTY_U16 = 0x35 + PWM_SET_DUTY_NS = 0x36 + PWM_GET_DUTY_NS = 0x37 + + def __init__(self): + self._vid = None + self._pid = None + self._hid = None + self._opened = False + self._i2c_index = None + self._spi_index = None + self._serial = None + self._neopixel_initialized = False + self._uart_rx_buffer = None + + def _hid_xfer(self, report, response=True): + """Perform HID Transfer""" + # first byte is report ID, which =0 + # remaing bytes = 64 byte report data + # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185 + self._hid.write(b"\0" + report + b"\0" * (64 - len(report))) + if response: + # return is 64 byte response report + ret = self._hid.read( + 64, timeout_ms=1000 + ) # Sometimes this can hang, but if we time out the call, it will work on the next call + assert len(ret) == 64, "HID Timeout occurred." + return ret + return None + + def _reset(self): + self._hid_xfer(bytes([self.SYS_RESET]), False) + self._hid.close() + time.sleep(RP2040_U2IF_RESET_DELAY) + start = time.monotonic() + while time.monotonic() - start < 5: + try: + self._hid.open(self._vid, self._pid, self._serial) + except OSError: + time.sleep(0.1) + continue + return + raise OSError("RP2040 u2if open error.") + + # ---------------------------------------------------------------- + # MISC + # ---------------------------------------------------------------- + def open(self, vid=0xCAFE, pid=0x4005, serial=None): + """Open HID interface for given USB VID and PID.""" + + if self._opened: + return + self._vid = vid + self._pid = pid + self._serial = serial + self._hid = hid.device() + self._hid.open(self._vid, self._pid, self._serial) + if RP2040_U2IF_RESET_DELAY >= 0: + self._reset() + self._opened = True + + def close(self): + """Close HID interface.""" + if not self._opened: + return + self._hid_xfer(bytes([self.SYS_RESET]), True) + self._hid.close() + self._opened = False + + # ---------------------------------------------------------------- + # GPIO + # ---------------------------------------------------------------- + def gpio_init_pin(self, pin_id, direction, pull): + """Configure GPIO Pin.""" + self._hid_xfer( + bytes( + [ + self.GPIO_INIT_PIN, + pin_id, + direction, + pull, + ] + ) + ) + + def gpio_set_pin(self, pin_id, value): + """Set Current GPIO Pin Value""" + self._hid_xfer( + bytes( + [ + self.GPIO_SET_VALUE, + pin_id, + int(value), + ] + ) + ) + + def gpio_get_pin(self, pin_id): + """Get Current GPIO Pin Value""" + resp = self._hid_xfer( + bytes( + [ + self.GPIO_GET_VALUE, + pin_id, + ] + ), + True, + ) + return resp[3] != 0x00 + + # ---------------------------------------------------------------- + # ADC + # ---------------------------------------------------------------- + def adc_init_pin(self, pin_id): + """Configure ADC Pin.""" + self._hid_xfer( + bytes( + [ + self.ADC_INIT_PIN, + pin_id, + ] + ) + ) + + def adc_get_value(self, pin_id): + """Get ADC value for pin.""" + resp = self._hid_xfer( + bytes( + [ + self.ADC_GET_VALUE, + pin_id, + ] + ), + True, + ) + return int.from_bytes(resp[3 : 3 + 2], byteorder="little") + + # ---------------------------------------------------------------- + # I2C + # ---------------------------------------------------------------- + def i2c_configure(self, baudrate, pullup=False): + """Configure I2C.""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + resp = self._hid_xfer( + bytes( + [ + self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT, + 0x00 if not pullup else 0x01, + ] + ) + + baudrate.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C init error.") + + def i2c_set_port(self, index): + """Set I2C port.""" + if index not in (0, 1): + raise ValueError("I2C index must be 0 or 1.") + self._i2c_index = index + + def _i2c_write(self, address, buffer, start=0, end=None, stop=True): + """Write data from the buffer to an address""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + end = end if end else len(buffer) + + write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE + stop_flag = 0x01 if stop else 0x00 + + while (end - start) > 0: + remain_bytes = end - start + chunk = min(remain_bytes, 64 - 7) + resp = self._hid_xfer( + bytes([write_cmd, address, stop_flag]) + + remain_bytes.to_bytes(4, byteorder="little") + + buffer[start : (start + chunk)], + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C write error") + start += chunk + + def _i2c_read(self, address, buffer, start=0, end=None): + """Read data from an address and into the buffer""" + # TODO: support chunkified reads + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + end = end if end else len(buffer) + + read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ + stop_flag = 0x01 # always stop + read_size = end - start + + resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C write error") + # move into buffer + for i in range(read_size): + buffer[start + i] = resp[i + 2] + + def i2c_writeto(self, address, buffer, *, start=0, end=None): + """Write data from the buffer to an address""" + self._i2c_write(address, buffer, start, end) + + def i2c_readfrom_into(self, address, buffer, *, start=0, end=None): + """Read data from an address and into the buffer""" + self._i2c_read(address, buffer, start, end) + + def i2c_writeto_then_readfrom( + self, + address, + out_buffer, + in_buffer, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None, + ): + """Write data from buffer_out to an address and then + read data from an address and into buffer_in + """ + self._i2c_write(address, out_buffer, out_start, out_end, False) + self._i2c_read(address, in_buffer, in_start, in_end) + + def i2c_scan(self, *, start=0, end=0x79): + """Perform an I2C Device Scan""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + found = [] + for addr in range(start, end + 1): + # try a write + try: + self.i2c_writeto(addr, b"\x00\x00\x00") + except RuntimeError: # no reply! + continue + # store if success + found.append(addr) + return found + + # ---------------------------------------------------------------- + # SPI + # ---------------------------------------------------------------- + def spi_configure(self, baudrate): + """Configure SPI.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + resp = self._hid_xfer( + bytes( + [ + self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT, + 0x00, # mode, not yet implemented + ] + ) + + baudrate.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI init error.") + + def spi_set_port(self, index): + """Set SPI port.""" + if index not in (0, 1): + raise ValueError("SPI index must be 0 or 1.") + self._spi_index = index + + def spi_write(self, buffer, *, start=0, end=None): + """SPI write.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + end = end if end else len(buffer) + + write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE + + while (end - start) > 0: + remain_bytes = end - start + chunk = min(remain_bytes, 64 - 3) + resp = self._hid_xfer( + bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI write error") + start += chunk + + def spi_readinto(self, buffer, *, start=0, end=None, write_value=0): + """SPI readinto.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + end = end if end else len(buffer) + read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ + read_size = end - start + + resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI write error") + # move into buffer + for i in range(read_size): + buffer[start + i] = resp[i + 2] + + def spi_write_readinto( + self, + buffer_out, + buffer_in, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None, + ): + """SPI write and readinto.""" + raise NotImplementedError("SPI write_readinto Not implemented") + + # ---------------------------------------------------------------- + # NEOPIXEL + # ---------------------------------------------------------------- + def neopixel_write(self, gpio, buf): + """NeoPixel write.""" + # open serial (data is sent over this) + if self._serial is None: + import serial + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + for port in ports: + if port.vid == self._vid and port.pid == self._pid: + self._serial = serial.Serial(port.device) + break + if self._serial is None: + raise RuntimeError("Could not find Pico com port.") + + # init + if not self._neopixel_initialized: + # deinit any current setup + # pylint: disable=protected-access + self._hid_xfer(bytes([self.WS2812B_DEINIT])) + resp = self._hid_xfer( + bytes( + [ + self.WS2812B_INIT, + gpio._pin.id, + ] + ), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("Neopixel init error") + self._neopixel_initialized = True + + self._serial.reset_output_buffer() + + # write + # command is done over HID + remain_bytes = len(buf) + resp = self._hid_xfer( + bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + # pylint: disable=no-else-raise + if resp[2] == 0x01: + raise RuntimeError( + "Neopixel write error : too many pixel for the firmware." + ) + elif resp[2] == 0x02: + raise RuntimeError( + "Neopixel write error : transfer already in progress." + ) + else: + raise RuntimeError("Neopixel write error.") + # buffer is sent over serial + self._serial.write(buf) + # hack (see u2if) + if len(buf) % 64 == 0: + self._serial.write([0]) + self._serial.flush() + # polling loop to wait for write complete? + time.sleep(0.1) + resp = self._hid.read(64) + while resp[0] != self.WS2812B_WRITE: + resp = self._hid.read(64) + if resp[1] != self.RESP_OK: + raise RuntimeError("Neopixel write (flush) error.") + + # ---------------------------------------------------------------- + # PWM + # ---------------------------------------------------------------- + # pylint: disable=unused-argument + def pwm_configure( + self, pin_id: int, frequency=500, duty_cycle=0, variable_frequency=False + ): + """Configure PWM.""" + self.pwm_deinit(pin_id) + resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM init error.") + + self.pwm_set_frequency(pin_id, frequency) + self.pwm_set_duty_cycle(pin_id, duty_cycle) + + def pwm_deinit(self, pin_id: int): + """Deinit PWM.""" + self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin_id])) + + def pwm_get_frequency(self, pin_id: int): + """PWM get freq.""" + resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM get frequency error.") + return int.from_bytes(resp[3 : 3 + 4], byteorder="little") + + def pwm_set_frequency(self, pin_id: int, frequency): + """PWM set freq.""" + resp = self._hid_xfer( + bytes([self.PWM_SET_FREQ, pin_id]) + + frequency.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + # pylint: disable=no-else-raise + if resp[3] == 0x01: + raise RuntimeError("PWM different frequency on same slice.") + elif resp[3] == 0x02: + raise RuntimeError("PWM frequency too low.") + elif resp[3] == 0x03: + raise RuntimeError("PWM frequency too high.") + else: + raise RuntimeError("PWM frequency error.") + + def pwm_get_duty_cycle(self, pin_id: int): + """PWM get duty cycle.""" + resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM get duty cycle error.") + return int.from_bytes(resp[3 : 3 + 4], byteorder="little") + + def pwm_set_duty_cycle(self, pin_id: int, duty_cycle): + """PWM set duty cycle.""" + resp = self._hid_xfer( + bytes([self.PWM_SET_DUTY_U16, pin_id]) + + duty_cycle.to_bytes(2, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM set duty cycle error.") diff --git a/m8-controller-box/depthai-example/.oakappignore b/m8-controller-box/depthai-example/.oakappignore new file mode 100644 index 000000000..671c0ae52 --- /dev/null +++ b/m8-controller-box/depthai-example/.oakappignore @@ -0,0 +1,33 @@ +# Python virtual environments +venv/ +.venv/ + +# Node.js +# ignore node_modules, it will be reinstalled in the container +node_modules/ + +# Multimedia files +media/ + +# Documentation +README.md + +# VCS +.git/ +.github/ +.gitlab/ + +# The following files are ignored by default +# uncomment a line if you explicitly need it + +# !*.oakapp + +# Python +# !**/.mypy_cache/ +# !**/.ruff_cache/ + +# IDE files +# !**/.idea +# !**/.vscode +# !**/.zed + diff --git a/m8-controller-box/depthai-example/README.md b/m8-controller-box/depthai-example/README.md new file mode 100644 index 000000000..8da8e68b7 --- /dev/null +++ b/m8-controller-box/depthai-example/README.md @@ -0,0 +1,17 @@ +# Hand Pose Example with M8 Controller LED Trigger + +This project is based on the official Luxonis hand pose example: + +https://github.com/luxonis/oak-examples/tree/main/neural-networks/pose-estimation/hand-pose + +It demonstrates hand detection and hand landmark estimation using DepthAI. + +## Added Functionality + +In addition to the original example, this version integrates an **M8 Controller Box**. + +When a hand is detected: + +- The LED connected to **pin 17** on the M8 controller will turn on. + +This allows simple hardware feedback triggered directly from hand detection events. diff --git a/m8-controller-box/depthai-example/backend-run.sh b/m8-controller-box/depthai-example/backend-run.sh new file mode 100644 index 000000000..1fe73b103 --- /dev/null +++ b/m8-controller-box/depthai-example/backend-run.sh @@ -0,0 +1,3 @@ +#!/bin/sh +echo "Starting Backend" +exec python3.12 /app/main.py diff --git a/m8-controller-box/depthai-example/depthai_models/mediapipe_hand_landmarker.RVC2.yaml b/m8-controller-box/depthai-example/depthai_models/mediapipe_hand_landmarker.RVC2.yaml new file mode 100644 index 000000000..e8e8089ec --- /dev/null +++ b/m8-controller-box/depthai-example/depthai_models/mediapipe_hand_landmarker.RVC2.yaml @@ -0,0 +1,2 @@ +model: luxonis/mediapipe-hand-landmarker:224x224 +platform: RVC2 \ No newline at end of file diff --git a/m8-controller-box/depthai-example/depthai_models/mediapipe_hand_landmarker.RVC4.yaml b/m8-controller-box/depthai-example/depthai_models/mediapipe_hand_landmarker.RVC4.yaml new file mode 100644 index 000000000..1e61a842f --- /dev/null +++ b/m8-controller-box/depthai-example/depthai_models/mediapipe_hand_landmarker.RVC4.yaml @@ -0,0 +1,2 @@ +model: luxonis/mediapipe-hand-landmarker:224x224 +platform: RVC4 \ No newline at end of file diff --git a/m8-controller-box/depthai-example/depthai_models/mediapipe_palm_detection.RVC2.yaml b/m8-controller-box/depthai-example/depthai_models/mediapipe_palm_detection.RVC2.yaml new file mode 100644 index 000000000..cc87d00a2 --- /dev/null +++ b/m8-controller-box/depthai-example/depthai_models/mediapipe_palm_detection.RVC2.yaml @@ -0,0 +1,2 @@ +model: luxonis/mediapipe-palm-detection:192x192 +platform: RVC2 \ No newline at end of file diff --git a/m8-controller-box/depthai-example/depthai_models/mediapipe_palm_detection.RVC4.yaml b/m8-controller-box/depthai-example/depthai_models/mediapipe_palm_detection.RVC4.yaml new file mode 100644 index 000000000..ee0ab6c0f --- /dev/null +++ b/m8-controller-box/depthai-example/depthai_models/mediapipe_palm_detection.RVC4.yaml @@ -0,0 +1,2 @@ +model: luxonis/mediapipe-palm-detection:192x192 +platform: RVC4 \ No newline at end of file diff --git a/m8-controller-box/depthai-example/main.py b/m8-controller-box/depthai-example/main.py new file mode 100644 index 000000000..d786b5845 --- /dev/null +++ b/m8-controller-box/depthai-example/main.py @@ -0,0 +1,174 @@ +from pathlib import Path + +import depthai as dai +from depthai_nodes.node import ParsingNeuralNetwork, GatherData + +from utils.arguments import initialize_argparser +from utils.annotation_node import AnnotationNode +from utils.process import ProcessDetections + +# M8 Controller Box +from utils.rp2040_u2if import RP2040_u2if + +rp2040 = RP2040_u2if() +rp2040.open() + +rp2040.gpio_init_pin(17, RP2040_u2if.GPIO_OUT, RP2040_u2if.GPIO_PULL_NONE) + +_, args = initialize_argparser() + +PADDING = 0.1 +CONFIDENCE_THRESHOLD = 0.5 + +visualizer = dai.RemoteConnection(httpPort=8082) +device = dai.Device(dai.DeviceInfo(args.device)) if args.device else dai.Device() +platform = device.getPlatform().name +print(f"Platform: {platform}") + +frame_type = ( + dai.ImgFrame.Type.BGR888p if platform == "RVC2" else dai.ImgFrame.Type.BGR888i +) + +if not args.fps_limit: + args.fps_limit = 8 if platform == "RVC2" else 30 + print( + f"\nFPS limit set to {args.fps_limit} for {platform} platform. If you want to set a custom FPS limit, use the --fps_limit flag.\n" + ) + +with dai.Pipeline(device) as pipeline: + print("Creating pipeline...") + + # detection model + det_model_description = dai.NNModelDescription.fromYamlFile( + f"mediapipe_palm_detection.{platform}.yaml" + ) + det_nn_archive = dai.NNArchive(dai.getModelFromZoo(det_model_description)) + + # pose estimation model + pose_model_description = dai.NNModelDescription.fromYamlFile( + f"mediapipe_hand_landmarker.{platform}.yaml" + ) + pose_nn_archive = dai.NNArchive(dai.getModelFromZoo(pose_model_description)) + + # media/camera input + if args.media_path: + replay = pipeline.create(dai.node.ReplayVideo) + replay.setReplayVideoFile(Path(args.media_path)) + replay.setOutFrameType(frame_type) + replay.setLoop(True) + if args.fps_limit: + replay.setFps(args.fps_limit) + else: + cam = pipeline.create(dai.node.Camera).build() + cam_out = cam.requestOutput((768, 768), frame_type, fps=args.fps_limit) + input_node = replay.out if args.media_path else cam_out + + # resize to det model input size + resize_node = pipeline.create(dai.node.ImageManip) + resize_node.setMaxOutputFrameSize( + det_nn_archive.getInputWidth() * det_nn_archive.getInputHeight() * 3 + ) + resize_node.initialConfig.setOutputSize( + det_nn_archive.getInputWidth(), + det_nn_archive.getInputHeight(), + mode=dai.ImageManipConfig.ResizeMode.STRETCH, + ) + resize_node.initialConfig.setFrameType(frame_type) + input_node.link(resize_node.inputImage) + + detection_nn: ParsingNeuralNetwork = pipeline.create(ParsingNeuralNetwork).build( + resize_node.out, det_nn_archive + ) + + # detection processing + detections_processor = pipeline.create(ProcessDetections).build( + detections_input=detection_nn.out, + padding=PADDING, + target_size=(pose_nn_archive.getInputWidth(), pose_nn_archive.getInputHeight()), + ) + + script = pipeline.create(dai.node.Script) + script.setScriptPath(str(Path(__file__).parent / "utils/script.py")) + script.inputs["frame_input"].setMaxSize(30) + script.inputs["config_input"].setMaxSize(30) + script.inputs["num_configs_input"].setMaxSize(30) + + detection_nn.passthrough.link(script.inputs["frame_input"]) + detections_processor.config_output.link(script.inputs["config_input"]) + detections_processor.num_configs_output.link(script.inputs["num_configs_input"]) + + pose_manip = pipeline.create(dai.node.ImageManip) + pose_manip.initialConfig.setOutputSize( + pose_nn_archive.getInputWidth(), pose_nn_archive.getInputHeight() + ) + pose_manip.inputConfig.setMaxSize(30) + pose_manip.inputImage.setMaxSize(30) + pose_manip.setNumFramesPool(30) + pose_manip.inputConfig.setWaitForMessage(True) + + script.outputs["output_config"].link(pose_manip.inputConfig) + script.outputs["output_frame"].link(pose_manip.inputImage) + + pose_nn: ParsingNeuralNetwork = pipeline.create(ParsingNeuralNetwork).build( + pose_manip.out, pose_nn_archive + ) + + # detections and pose estimations sync + gather_data = pipeline.create(GatherData).build(camera_fps=args.fps_limit) + detection_nn.out.link(gather_data.input_reference) + pose_nn.outputs.link(gather_data.input_data) + + # annotation + connection_pairs = ( + pose_nn_archive.getConfig() + .model.heads[0] + .metadata.extraParams["skeleton_edges"] + ) + annotation_node = pipeline.create(AnnotationNode).build( + gathered_data=gather_data.out, + video=input_node, + padding_factor=PADDING, + confidence_threshold=CONFIDENCE_THRESHOLD, + connections_pairs=connection_pairs, + ) + + # video encoding + video_encode_manip = pipeline.create(dai.node.ImageManip) + video_encode_manip.setMaxOutputFrameSize(768 * 768 * 3) + video_encode_manip.initialConfig.setOutputSize(768, 768) + video_encode_manip.initialConfig.setFrameType(dai.ImgFrame.Type.NV12) + input_node.link(video_encode_manip.inputImage) + + video_encoder = pipeline.create(dai.node.VideoEncoder) + video_encoder.setMaxOutputFrameSize(768 * 768 * 3) + video_encoder.setDefaultProfilePreset( + args.fps_limit, dai.VideoEncoderProperties.Profile.H264_MAIN + ) + video_encode_manip.out.link(video_encoder.input) + + # visualization + visualizer.addTopic("Video", video_encoder.out, "images") + visualizer.addTopic("Detections", annotation_node.out_detections, "images") + visualizer.addTopic("Pose", annotation_node.out_pose_annotations, "images") + + print("Pipeline created.") + + # Output queue for hand detections (non-blocking) + det_queue = detection_nn.out.createOutputQueue(maxSize=4, blocking=False) + + pipeline.start() + visualizer.registerPipeline(pipeline) + + while pipeline.isRunning(): + # Check for hand detections (non-blocking) + det_in = det_queue.tryGet() + if det_in is not None: + detections = det_in.detections if hasattr(det_in, "detections") else [] + if len(detections) > 0: + rp2040.gpio_set_pin(17, 1) # Turn LED ON + else: + rp2040.gpio_set_pin(17, 0) # Turn LED OFF + key = visualizer.waitKey(1) + if key == ord("q"): + print("Got q key. Exiting...") + break diff --git a/m8-controller-box/depthai-example/oakapp.toml b/m8-controller-box/depthai-example/oakapp.toml new file mode 100644 index 000000000..325a0ff74 --- /dev/null +++ b/m8-controller-box/depthai-example/oakapp.toml @@ -0,0 +1,35 @@ +identifier = "com.example.m8-controller-box.depthai-example" +entrypoint = ["bash", "-c", "/usr/bin/runsvdir -P /etc/service"] +app_version = "1.0.2" +assign_frontend_port = true + +prepare_container = [ + { type = "COPY", source = "./requirements.txt", target = "./requirements.txt" }, + { type = "RUN", command = "python3.12 -m pip install -r /app/requirements.txt --break-system-packages"}, + { type = "RUN", command = "bash -lc 'set -e; apt-get update && apt-get install -y libhidapi-hidraw0 libhidapi-libusb0'" } +] + +build_steps = [ + "mkdir -p /etc/service/backend", + "cp /app/backend-run.sh /etc/service/backend/run", + "chmod +x /etc/service/backend/run", +] + +depthai_models = { yaml_path = "./depthai_models" } + +allowed_devices = [{ allow = true, access = "rwm" }] + +additional_mounts = [ + { source = "/dev", target = "/dev", type = "devtmpfs", options = [ + "mode=777", + ] } +] + +[base_image] +api_url = "https://registry-1.docker.io" +service = "registry.docker.io" +oauth_url = "https://auth.docker.io/token" +auth_type = "repository" +auth_name = "luxonis/oakapp-base" +image_name = "luxonis/oakapp-base" +image_tag = "1.2.6" diff --git a/m8-controller-box/depthai-example/requirements.txt b/m8-controller-box/depthai-example/requirements.txt new file mode 100644 index 000000000..7bee8df4e --- /dev/null +++ b/m8-controller-box/depthai-example/requirements.txt @@ -0,0 +1,3 @@ +depthai==3.0.0 +depthai-nodes==0.3.4 +hidapi diff --git a/m8-controller-box/depthai-example/utils/__init__.py b/m8-controller-box/depthai-example/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/m8-controller-box/depthai-example/utils/annotation_node.py b/m8-controller-box/depthai-example/utils/annotation_node.py new file mode 100644 index 000000000..18efc36f1 --- /dev/null +++ b/m8-controller-box/depthai-example/utils/annotation_node.py @@ -0,0 +1,137 @@ +import depthai as dai +from depthai_nodes import ( + ImgDetectionsExtended, + ImgDetectionExtended, + Keypoints, + Predictions, + GatheredData, + SECONDARY_COLOR, +) +from depthai_nodes.utils import AnnotationHelper +from typing import List +from utils.gesture_recognition import recognize_gesture + + +class AnnotationNode(dai.node.HostNode): + def __init__(self) -> None: + super().__init__() + self.gathered_data = self.createInput() + self.out_detections = self.createOutput() + self.out_pose_annotations = self.createOutput( + possibleDatatypes=[ + dai.Node.DatatypeHierarchy(dai.DatatypeEnum.ImgAnnotations, True) + ] + ) + self.confidence_threshold = 0.5 + self.padding_factor = 0.1 + self.connection_pairs = [[]] + + def build( + self, + gathered_data: dai.Node.Output, + video: dai.Node.Output, + confidence_threshold: float, + padding_factor: float, + connections_pairs: List[List[int]], + ) -> "AnnotationNode": + self.confidence_threshold = confidence_threshold + self.padding_factor = padding_factor + self.connection_pairs = connections_pairs + self.link_args(gathered_data, video) + return self + + def process(self, gathered_data: dai.Buffer, video_message: dai.ImgFrame) -> None: + assert isinstance(gathered_data, GatheredData) + + detections_message: ImgDetectionsExtended = gathered_data.reference_data + detections_list: List[ImgDetectionExtended] = detections_message.detections + + new_dets = ImgDetectionsExtended() + new_dets.transformation = video_message.getTransformation() + + annotation_helper = AnnotationHelper() + + for ix, detection in enumerate(detections_list): + keypoints_msg: Keypoints = gathered_data.gathered[ix]["0"] + confidence_msg: Predictions = gathered_data.gathered[ix]["1"] + handness_msg: Predictions = gathered_data.gathered[ix]["2"] + + hand_confidence = confidence_msg.prediction + handness = handness_msg.prediction + + if hand_confidence < self.confidence_threshold: + continue + + width = detection.rotated_rect.size.width + height = detection.rotated_rect.size.height + + xmin = detection.rotated_rect.center.x - width / 2 + xmax = detection.rotated_rect.center.x + width / 2 + ymin = detection.rotated_rect.center.y - height / 2 + ymax = detection.rotated_rect.center.y + height / 2 + + padding = self.padding_factor + + slope_x = (xmax + padding) - (xmin - padding) + slope_y = (ymax + padding) - (ymin - padding) + + new_det = ImgDetectionExtended() + new_det.rotated_rect = ( + detection.rotated_rect.center.x, + detection.rotated_rect.center.y, + detection.rotated_rect.size.width + 2 * padding, + detection.rotated_rect.size.height + 2 * padding, + detection.rotated_rect.angle, + ) + new_det.label = 0 + new_det.label_name = "Hand" + new_det.confidence = detection.confidence + new_dets.detections.append(new_det) + + xs = [] + ys = [] + + for kp in keypoints_msg.keypoints: + x = min(max(xmin - padding + slope_x * kp.x, 0.0), 1.0) + y = min(max(ymin - padding + slope_y * kp.y, 0.0), 1.0) + xs.append(x) + ys.append(y) + + for connection in self.connection_pairs: + pt1_ix, pt2_ix = connection + annotation_helper.draw_line( + pt1=(xs[pt1_ix], ys[pt1_ix]), + pt2=(xs[pt2_ix], ys[pt2_ix]), + ) + + keypoints = [[kpt[0], kpt[1]] for kpt in zip(xs, ys)] + + gesture = recognize_gesture(keypoints) + + text = "Left" if handness < 0.5 else "Right" + text += f" {gesture}" + + text_x = detection.rotated_rect.center.x - 0.05 + text_y = detection.rotated_rect.center.y - height / 2 - 0.10 + + annotation_helper.draw_text( + text=text, + position=(text_x, text_y), + color=SECONDARY_COLOR, + size=32, + ) + + annotation_helper.draw_points( + points=keypoints, color=SECONDARY_COLOR, thickness=2 + ) + + new_dets.setTimestamp(detections_message.getTimestamp()) + new_dets.setSequenceNum(detections_message.getSequenceNum()) + self.out_detections.send(new_dets) + + annotations = annotation_helper.build( + timestamp=detections_message.getTimestamp(), + sequence_num=detections_message.getSequenceNum(), + ) + + self.out_pose_annotations.send(annotations) diff --git a/m8-controller-box/depthai-example/utils/arguments.py b/m8-controller-box/depthai-example/utils/arguments.py new file mode 100644 index 000000000..4ef0b9862 --- /dev/null +++ b/m8-controller-box/depthai-example/utils/arguments.py @@ -0,0 +1,38 @@ +import argparse + + +def initialize_argparser(): + """Initialize the argument parser for the script.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "-d", + "--device", + help="Optional name, DeviceID or IP of the camera to connect to.", + required=False, + default=None, + type=str, + ) + + parser.add_argument( + "-fps", + "--fps_limit", + help="FPS limit for the model runtime.", + required=False, + default=None, + type=int, + ) + + parser.add_argument( + "-media", + "--media_path", + help="Path to the media file you aim to run the model on. If not set, the model will run on the camera input.", + required=False, + default=None, + type=str, + ) + args = parser.parse_args() + + return parser, args diff --git a/m8-controller-box/depthai-example/utils/gesture_recognition.py b/m8-controller-box/depthai-example/utils/gesture_recognition.py new file mode 100644 index 000000000..381e369f5 --- /dev/null +++ b/m8-controller-box/depthai-example/utils/gesture_recognition.py @@ -0,0 +1,132 @@ +import numpy as np +from typing import List, Tuple + + +def distance(a, b): + return np.linalg.norm(a - b) + + +def angle(a, b, c): + ba = a - b + bc = c - b + cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc)) + angle = np.arccos(cosine_angle) + + return np.degrees(angle) + + +def recognize_gesture(kpts: List[Tuple[float, float]]) -> str: + kpts = np.array(kpts) + d_3_5 = distance(kpts[3], kpts[5]) + d_2_3 = distance(kpts[2], kpts[3]) + angle0 = angle(kpts[0], kpts[1], kpts[2]) + angle1 = angle(kpts[1], kpts[2], kpts[3]) + angle2 = angle(kpts[2], kpts[3], kpts[4]) + thumb_state = 0 + index_state = 0 + middle_state = 0 + ring_state = 0 + little_state = 0 + gesture = None + if angle0 + angle1 + angle2 > 460 and d_3_5 / d_2_3 > 1.2: + thumb_state = 1 + else: + thumb_state = 0 + + if kpts[8][1] < kpts[7][1] < kpts[6][1]: + index_state = 1 + elif kpts[6][1] < kpts[8][1]: + index_state = 0 + else: + index_state = -1 + + if kpts[12][1] < kpts[11][1] < kpts[10][1]: + middle_state = 1 + elif kpts[10][1] < kpts[12][1]: + middle_state = 0 + else: + middle_state = -1 + + if kpts[16][1] < kpts[15][1] < kpts[14][1]: + ring_state = 1 + elif kpts[14][1] < kpts[16][1]: + ring_state = 0 + else: + ring_state = -1 + + if kpts[20][1] < kpts[19][1] < kpts[18][1]: + little_state = 1 + elif kpts[18][1] < kpts[20][1]: + little_state = 0 + else: + little_state = -1 + + # Gesture + if ( + thumb_state == 1 + and index_state == 1 + and middle_state == 1 + and ring_state == 1 + and little_state == 1 + ): + gesture = "FIVE" + elif ( + thumb_state == 0 + and index_state == 0 + and middle_state == 0 + and ring_state == 0 + and little_state == 0 + ): + gesture = "FIST" + elif ( + thumb_state == 1 + and index_state == 0 + and middle_state == 0 + and ring_state == 0 + and little_state == 0 + ): + gesture = "OK" + elif ( + thumb_state == 0 + and index_state == 1 + and middle_state == 1 + and ring_state == 0 + and little_state == 0 + ): + gesture = "PEACE" + elif ( + thumb_state == 0 + and index_state == 1 + and middle_state == 0 + and ring_state == 0 + and little_state == 0 + ): + gesture = "ONE" + elif ( + thumb_state == 1 + and index_state == 1 + and middle_state == 0 + and ring_state == 0 + and little_state == 0 + ): + gesture = "TWO" + elif ( + thumb_state == 1 + and index_state == 1 + and middle_state == 1 + and ring_state == 0 + and little_state == 0 + ): + gesture = "THREE" + elif ( + thumb_state == 0 + and index_state == 1 + and middle_state == 1 + and ring_state == 1 + and little_state == 1 + ): + gesture = "FOUR" + else: + gesture = None + + return gesture diff --git a/m8-controller-box/depthai-example/utils/process.py b/m8-controller-box/depthai-example/utils/process.py new file mode 100644 index 000000000..9db495bf2 --- /dev/null +++ b/m8-controller-box/depthai-example/utils/process.py @@ -0,0 +1,78 @@ +import depthai as dai +from depthai_nodes import ImgDetectionsExtended, ImgDetectionExtended +from typing import Tuple + + +class ProcessDetections(dai.node.HostNode): + """A host node for processing a list of detections in a two-stage pipeline. + The node iterates over a list of detections and sends a dai.MessageGroup with + a list of ImageManipConfig objects that can be executed by the ImageManip node. + + Before use, the target size need to be set with the set_target_size method. + Attributes + ---------- + detections_input : dai.Input + The input message for the detections. + config_output : dai.Output + The output message for the ImageManipConfig objects. + num_configs_output : dai.Output + The output message for the number of configs. + padding: float + The padding factor to enlarge the bounding box a little bit. + + """ + + def __init__(self): + super().__init__() + self.detections_input = self.createInput() + self.config_output = self.createOutput() + self.num_configs_output = self.createOutput() + self.padding = 0.1 + self._target_h = None + self._target_w = None + + def build( + self, + detections_input: dai.Node.Output, + padding: float, + target_size: Tuple[int, int], + ) -> "ProcessDetections": + self.padding = padding + self._target_w = target_size[0] + self._target_h = target_size[1] + self.link_args(detections_input) + return self + + def process(self, img_detections: dai.Buffer) -> None: + assert isinstance(img_detections, ImgDetectionsExtended) + detections = img_detections.detections + + num_detections = len(detections) + num_cfgs_message = dai.Buffer(num_detections) + + num_cfgs_message.setTimestamp(img_detections.getTimestamp()) + num_cfgs_message.setSequenceNum(img_detections.getSequenceNum()) + self.num_configs_output.send(num_cfgs_message) + + for i, detection in enumerate(detections): + cfg = dai.ImageManipConfig() + detection: ImgDetectionExtended = detection + rect = detection.rotated_rect + + new_rect = dai.RotatedRect() + new_rect.center.x = rect.center.x + new_rect.center.y = rect.center.y + new_rect.size.width = rect.size.width + 0.1 * 2 + new_rect.size.height = rect.size.height + 0.1 * 2 + new_rect.angle = 0 + + cfg.addCropRotatedRect(new_rect, normalizedCoords=True) + cfg.setOutputSize( + self._target_w, + self._target_h, + dai.ImageManipConfig.ResizeMode.STRETCH, + ) + cfg.setReusePreviousImage(False) + cfg.setTimestamp(img_detections.getTimestamp()) + cfg.setSequenceNum(img_detections.getSequenceNum()) + self.config_output.send(cfg) diff --git a/m8-controller-box/depthai-example/utils/rp2040_u2if.py b/m8-controller-box/depthai-example/utils/rp2040_u2if.py new file mode 100644 index 000000000..051c45451 --- /dev/null +++ b/m8-controller-box/depthai-example/utils/rp2040_u2if.py @@ -0,0 +1,529 @@ +# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries +# +# SPDX-License-Identifier: MIT +"""Helper class for use with RP2040 running u2if firmware""" +# https://github.com/execuc/u2if + +import os +import time +import hid + +# Use to set delay between reset and device reopen. if negative, don't reset at all +RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1)) + +# pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements +# pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods + + +class RP2040_u2if: + """Helper class for use with RP2040 running u2if firmware""" + + # MISC + RESP_OK = 0x01 + SYS_RESET = 0x10 + + # GPIO + GPIO_INIT_PIN = 0x20 + GPIO_SET_VALUE = 0x21 + GPIO_GET_VALUE = 0x22 + # Values + GPIO_IN = 0 + GPIO_OUT = 1 + GPIO_LOW = 0 + GPIO_HIGH = 1 + GPIO_PULL_NONE = 0 + GPIO_PULL_UP = 1 + GPIO_PULL_DOWN = 2 + + # ADC + ADC_INIT_PIN = 0x40 + ADC_GET_VALUE = 0x41 + + # I2C + I2C0_INIT = 0x80 + I2C0_DEINIT = 0x81 + I2C0_WRITE = 0x82 + I2C0_READ = 0x83 + I2C0_WRITE_FROM_UART = 0x84 + I2C1_INIT = I2C0_INIT + 0x10 + I2C1_DEINIT = I2C0_DEINIT + 0x10 + I2C1_WRITE = I2C0_WRITE + 0x10 + I2C1_READ = I2C0_READ + 0x10 + I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10 + + # SPI + SPI0_INIT = 0x60 + SPI0_DEINIT = 0x61 + SPI0_WRITE = 0x62 + SPI0_READ = 0x63 + SPI0_WRITE_FROM_UART = 0x64 + SPI1_INIT = SPI0_INIT + 0x10 + SPI1_DEINIT = SPI0_DEINIT + 0x10 + SPI1_WRITE = SPI0_WRITE + 0x10 + SPI1_READ = SPI0_READ + 0x10 + SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10 + + # WS2812B (LED) + WS2812B_INIT = 0xA0 + WS2812B_DEINIT = 0xA1 + WS2812B_WRITE = 0xA2 + + # PWM + PWM_INIT_PIN = 0x30 + PWM_DEINIT_PIN = 0x31 + PWM_SET_FREQ = 0x32 + PWM_GET_FREQ = 0x33 + PWM_SET_DUTY_U16 = 0x34 + PWM_GET_DUTY_U16 = 0x35 + PWM_SET_DUTY_NS = 0x36 + PWM_GET_DUTY_NS = 0x37 + + def __init__(self): + self._vid = None + self._pid = None + self._hid = None + self._opened = False + self._i2c_index = None + self._spi_index = None + self._serial = None + self._neopixel_initialized = False + self._uart_rx_buffer = None + + def _hid_xfer(self, report, response=True): + """Perform HID Transfer""" + # first byte is report ID, which =0 + # remaing bytes = 64 byte report data + # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185 + self._hid.write(b"\0" + report + b"\0" * (64 - len(report))) + if response: + # return is 64 byte response report + ret = self._hid.read( + 64, timeout_ms=1000 + ) # Sometimes this can hang, but if we time out the call, it will work on the next call + assert len(ret) == 64, "HID Timeout occurred." + return ret + return None + + def _reset(self): + self._hid_xfer(bytes([self.SYS_RESET]), False) + self._hid.close() + time.sleep(RP2040_U2IF_RESET_DELAY) + start = time.monotonic() + while time.monotonic() - start < 5: + try: + self._hid.open(self._vid, self._pid, self._serial) + except OSError: + time.sleep(0.1) + continue + return + raise OSError("RP2040 u2if open error.") + + # ---------------------------------------------------------------- + # MISC + # ---------------------------------------------------------------- + def open(self, vid=0xCAFE, pid=0x4005, serial=None): + """Open HID interface for given USB VID and PID.""" + + if self._opened: + return + self._vid = vid + self._pid = pid + self._serial = serial + self._hid = hid.device() + self._hid.open(self._vid, self._pid, self._serial) + if RP2040_U2IF_RESET_DELAY >= 0: + self._reset() + self._opened = True + + def close(self): + """Close HID interface.""" + if not self._opened: + return + self._hid_xfer(bytes([self.SYS_RESET]), True) + self._hid.close() + self._opened = False + + # ---------------------------------------------------------------- + # GPIO + # ---------------------------------------------------------------- + def gpio_init_pin(self, pin_id, direction, pull): + """Configure GPIO Pin.""" + self._hid_xfer( + bytes( + [ + self.GPIO_INIT_PIN, + pin_id, + direction, + pull, + ] + ) + ) + + def gpio_set_pin(self, pin_id, value): + """Set Current GPIO Pin Value""" + self._hid_xfer( + bytes( + [ + self.GPIO_SET_VALUE, + pin_id, + int(value), + ] + ) + ) + + def gpio_get_pin(self, pin_id): + """Get Current GPIO Pin Value""" + resp = self._hid_xfer( + bytes( + [ + self.GPIO_GET_VALUE, + pin_id, + ] + ), + True, + ) + return resp[3] != 0x00 + + # ---------------------------------------------------------------- + # ADC + # ---------------------------------------------------------------- + def adc_init_pin(self, pin_id): + """Configure ADC Pin.""" + self._hid_xfer( + bytes( + [ + self.ADC_INIT_PIN, + pin_id, + ] + ) + ) + + def adc_get_value(self, pin_id): + """Get ADC value for pin.""" + resp = self._hid_xfer( + bytes( + [ + self.ADC_GET_VALUE, + pin_id, + ] + ), + True, + ) + return int.from_bytes(resp[3 : 3 + 2], byteorder="little") + + # ---------------------------------------------------------------- + # I2C + # ---------------------------------------------------------------- + def i2c_configure(self, baudrate, pullup=False): + """Configure I2C.""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + resp = self._hid_xfer( + bytes( + [ + self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT, + 0x00 if not pullup else 0x01, + ] + ) + + baudrate.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C init error.") + + def i2c_set_port(self, index): + """Set I2C port.""" + if index not in (0, 1): + raise ValueError("I2C index must be 0 or 1.") + self._i2c_index = index + + def _i2c_write(self, address, buffer, start=0, end=None, stop=True): + """Write data from the buffer to an address""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + end = end if end else len(buffer) + + write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE + stop_flag = 0x01 if stop else 0x00 + + while (end - start) > 0: + remain_bytes = end - start + chunk = min(remain_bytes, 64 - 7) + resp = self._hid_xfer( + bytes([write_cmd, address, stop_flag]) + + remain_bytes.to_bytes(4, byteorder="little") + + buffer[start : (start + chunk)], + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C write error") + start += chunk + + def _i2c_read(self, address, buffer, start=0, end=None): + """Read data from an address and into the buffer""" + # TODO: support chunkified reads + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + end = end if end else len(buffer) + + read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ + stop_flag = 0x01 # always stop + read_size = end - start + + resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C write error") + # move into buffer + for i in range(read_size): + buffer[start + i] = resp[i + 2] + + def i2c_writeto(self, address, buffer, *, start=0, end=None): + """Write data from the buffer to an address""" + self._i2c_write(address, buffer, start, end) + + def i2c_readfrom_into(self, address, buffer, *, start=0, end=None): + """Read data from an address and into the buffer""" + self._i2c_read(address, buffer, start, end) + + def i2c_writeto_then_readfrom( + self, + address, + out_buffer, + in_buffer, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None, + ): + """Write data from buffer_out to an address and then + read data from an address and into buffer_in + """ + self._i2c_write(address, out_buffer, out_start, out_end, False) + self._i2c_read(address, in_buffer, in_start, in_end) + + def i2c_scan(self, *, start=0, end=0x79): + """Perform an I2C Device Scan""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + found = [] + for addr in range(start, end + 1): + # try a write + try: + self.i2c_writeto(addr, b"\x00\x00\x00") + except RuntimeError: # no reply! + continue + # store if success + found.append(addr) + return found + + # ---------------------------------------------------------------- + # SPI + # ---------------------------------------------------------------- + def spi_configure(self, baudrate): + """Configure SPI.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + resp = self._hid_xfer( + bytes( + [ + self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT, + 0x00, # mode, not yet implemented + ] + ) + + baudrate.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI init error.") + + def spi_set_port(self, index): + """Set SPI port.""" + if index not in (0, 1): + raise ValueError("SPI index must be 0 or 1.") + self._spi_index = index + + def spi_write(self, buffer, *, start=0, end=None): + """SPI write.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + end = end if end else len(buffer) + + write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE + + while (end - start) > 0: + remain_bytes = end - start + chunk = min(remain_bytes, 64 - 3) + resp = self._hid_xfer( + bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI write error") + start += chunk + + def spi_readinto(self, buffer, *, start=0, end=None, write_value=0): + """SPI readinto.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + end = end if end else len(buffer) + read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ + read_size = end - start + + resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI write error") + # move into buffer + for i in range(read_size): + buffer[start + i] = resp[i + 2] + + def spi_write_readinto( + self, + buffer_out, + buffer_in, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None, + ): + """SPI write and readinto.""" + raise NotImplementedError("SPI write_readinto Not implemented") + + # ---------------------------------------------------------------- + # NEOPIXEL + # ---------------------------------------------------------------- + def neopixel_write(self, gpio, buf): + """NeoPixel write.""" + # open serial (data is sent over this) + if self._serial is None: + import serial + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + for port in ports: + if port.vid == self._vid and port.pid == self._pid: + self._serial = serial.Serial(port.device) + break + if self._serial is None: + raise RuntimeError("Could not find Pico com port.") + + # init + if not self._neopixel_initialized: + # deinit any current setup + # pylint: disable=protected-access + self._hid_xfer(bytes([self.WS2812B_DEINIT])) + resp = self._hid_xfer( + bytes( + [ + self.WS2812B_INIT, + gpio._pin.id, + ] + ), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("Neopixel init error") + self._neopixel_initialized = True + + self._serial.reset_output_buffer() + + # write + # command is done over HID + remain_bytes = len(buf) + resp = self._hid_xfer( + bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + # pylint: disable=no-else-raise + if resp[2] == 0x01: + raise RuntimeError( + "Neopixel write error : too many pixel for the firmware." + ) + elif resp[2] == 0x02: + raise RuntimeError( + "Neopixel write error : transfer already in progress." + ) + else: + raise RuntimeError("Neopixel write error.") + # buffer is sent over serial + self._serial.write(buf) + # hack (see u2if) + if len(buf) % 64 == 0: + self._serial.write([0]) + self._serial.flush() + # polling loop to wait for write complete? + time.sleep(0.1) + resp = self._hid.read(64) + while resp[0] != self.WS2812B_WRITE: + resp = self._hid.read(64) + if resp[1] != self.RESP_OK: + raise RuntimeError("Neopixel write (flush) error.") + + # ---------------------------------------------------------------- + # PWM + # ---------------------------------------------------------------- + # pylint: disable=unused-argument + def pwm_configure( + self, pin_id: int, frequency=500, duty_cycle=0, variable_frequency=False + ): + """Configure PWM.""" + self.pwm_deinit(pin_id) + resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM init error.") + + self.pwm_set_frequency(pin_id, frequency) + self.pwm_set_duty_cycle(pin_id, duty_cycle) + + def pwm_deinit(self, pin_id: int): + """Deinit PWM.""" + self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin_id])) + + def pwm_get_frequency(self, pin_id: int): + """PWM get freq.""" + resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM get frequency error.") + return int.from_bytes(resp[3 : 3 + 4], byteorder="little") + + def pwm_set_frequency(self, pin_id: int, frequency): + """PWM set freq.""" + resp = self._hid_xfer( + bytes([self.PWM_SET_FREQ, pin_id]) + + frequency.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + # pylint: disable=no-else-raise + if resp[3] == 0x01: + raise RuntimeError("PWM different frequency on same slice.") + elif resp[3] == 0x02: + raise RuntimeError("PWM frequency too low.") + elif resp[3] == 0x03: + raise RuntimeError("PWM frequency too high.") + else: + raise RuntimeError("PWM frequency error.") + + def pwm_get_duty_cycle(self, pin_id: int): + """PWM get duty cycle.""" + resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM get duty cycle error.") + return int.from_bytes(resp[3 : 3 + 4], byteorder="little") + + def pwm_set_duty_cycle(self, pin_id: int, duty_cycle): + """PWM set duty cycle.""" + resp = self._hid_xfer( + bytes([self.PWM_SET_DUTY_U16, pin_id]) + + duty_cycle.to_bytes(2, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM set duty cycle error.") diff --git a/m8-controller-box/depthai-example/utils/script.py b/m8-controller-box/depthai-example/utils/script.py new file mode 100644 index 000000000..5746ce683 --- /dev/null +++ b/m8-controller-box/depthai-example/utils/script.py @@ -0,0 +1,23 @@ +try: + while True: + frame = node.inputs["frame_input"].get() + # node.warn(f"{frame.getType()}") + # node.warn(f"[ConfigSender {frame.getSequenceNum()}] Got frame {frame.getTimestamp()}") + num_configs_message = node.inputs["num_configs_input"].get() + conf_seq = num_configs_message.getSequenceNum() + frame_seq = frame.getSequenceNum() + num_configs = len(bytearray(num_configs_message.getData())) + + while conf_seq > frame_seq: + # node.warn(f"[ConfigSender {conf_seq}] Configs {conf_seq} mismatch with frame {frame_seq}") + frame = node.inputs["frame_input"].get() + + for i in range(num_configs): + cfg = node.inputs["config_input"].get() + # node.warn(f"[ConfigSender {conf_seq}] Got config {i}") + node.outputs["output_config"].send(cfg) + node.outputs["output_frame"].send(frame) + # node.warn(f"[ConfigSender {conf_seq}] sent {i}") + +except Exception as e: + node.warn(str(e)) diff --git a/m8-controller-box/media/schematics.png b/m8-controller-box/media/schematics.png new file mode 100644 index 000000000..0e4f74a3f Binary files /dev/null and b/m8-controller-box/media/schematics.png differ diff --git a/m8-controller-box/simple-example/.oakappignore b/m8-controller-box/simple-example/.oakappignore new file mode 100644 index 000000000..671c0ae52 --- /dev/null +++ b/m8-controller-box/simple-example/.oakappignore @@ -0,0 +1,33 @@ +# Python virtual environments +venv/ +.venv/ + +# Node.js +# ignore node_modules, it will be reinstalled in the container +node_modules/ + +# Multimedia files +media/ + +# Documentation +README.md + +# VCS +.git/ +.github/ +.gitlab/ + +# The following files are ignored by default +# uncomment a line if you explicitly need it + +# !*.oakapp + +# Python +# !**/.mypy_cache/ +# !**/.ruff_cache/ + +# IDE files +# !**/.idea +# !**/.vscode +# !**/.zed + diff --git a/m8-controller-box/simple-example/README.md b/m8-controller-box/simple-example/README.md new file mode 100644 index 000000000..a9bb4df6c --- /dev/null +++ b/m8-controller-box/simple-example/README.md @@ -0,0 +1,13 @@ +# Minimal M8 Controller Box Container Example + +This is a minimal example demonstrating how to build and run a container specifically for the **M8 Controller Box** itself. + +## Functionality + +This example performs simple GPIO interactions: + +- The LED connected to **pin 18** blinks continuously. +- When the button connected to **pin 19** is pressed: + - The LED connected to **pin 17** turns on. + +This provides a minimal reference setup for containerized applications running directly on the M8 Controller Box. diff --git a/m8-controller-box/simple-example/backend-run.sh b/m8-controller-box/simple-example/backend-run.sh new file mode 100644 index 000000000..1fe73b103 --- /dev/null +++ b/m8-controller-box/simple-example/backend-run.sh @@ -0,0 +1,3 @@ +#!/bin/sh +echo "Starting Backend" +exec python3.12 /app/main.py diff --git a/m8-controller-box/simple-example/main.py b/m8-controller-box/simple-example/main.py new file mode 100644 index 000000000..0a2418d5b --- /dev/null +++ b/m8-controller-box/simple-example/main.py @@ -0,0 +1,33 @@ +import time +from utils.rp2040_u2if import RP2040_u2if + +rp2040 = RP2040_u2if() +rp2040.open() + +# leds +rp2040.gpio_init_pin(18, RP2040_u2if.GPIO_OUT, RP2040_u2if.GPIO_PULL_NONE) +rp2040.gpio_init_pin(17, RP2040_u2if.GPIO_OUT, RP2040_u2if.GPIO_PULL_NONE) + +# button +rp2040.gpio_init_pin(19, RP2040_u2if.GPIO_IN, RP2040_u2if.GPIO_PULL_UP) + +blink_interval = 0.5 +last_blink_time = time.monotonic() +led_state = 0 + +while True: + current_time = time.monotonic() + + # Non-blocking blink + if current_time - last_blink_time >= blink_interval: + led_state = 1 - led_state + rp2040.gpio_set_pin(18, led_state) + last_blink_time = current_time + + # Button check runs continuously + button_state = rp2040.gpio_get_pin(19) + + if button_state != 0: + rp2040.gpio_set_pin(17, 1) + else: + rp2040.gpio_set_pin(17, 0) diff --git a/m8-controller-box/simple-example/oakapp.toml b/m8-controller-box/simple-example/oakapp.toml new file mode 100644 index 000000000..9f3a82233 --- /dev/null +++ b/m8-controller-box/simple-example/oakapp.toml @@ -0,0 +1,33 @@ +identifier = "com.example.m8-controller-box.simple-example" +entrypoint = ["bash", "-c", "/usr/bin/runsvdir -P /etc/service"] +app_version = "1.0.2" +assign_frontend_port = true + +prepare_container = [ + { type = "COPY", source = "./requirements.txt", target = "./requirements.txt" }, + { type = "RUN", command = "python3.12 -m pip install -r /app/requirements.txt --break-system-packages" }, + { type = "RUN", command = "bash -lc 'set -e; apt-get update && apt-get install -y libhidapi-hidraw0 libhidapi-libusb0'" } +] + +build_steps = [ + "mkdir -p /etc/service/backend", + "cp /app/backend-run.sh /etc/service/backend/run", + "chmod +x /etc/service/backend/run", +] + +allowed_devices = [{ allow = true, access = "rwm" }] + +additional_mounts = [ + { source = "/dev", target = "/dev", type = "devtmpfs", options = [ + "mode=777", + ] } +] + +[base_image] +api_url = "https://registry-1.docker.io" +service = "registry.docker.io" +oauth_url = "https://auth.docker.io/token" +auth_type = "repository" +auth_name = "luxonis/oakapp-base" +image_name = "luxonis/oakapp-base" +image_tag = "1.2.6" diff --git a/m8-controller-box/simple-example/requirements.txt b/m8-controller-box/simple-example/requirements.txt new file mode 100644 index 000000000..daccb3da2 --- /dev/null +++ b/m8-controller-box/simple-example/requirements.txt @@ -0,0 +1 @@ +hidapi diff --git a/m8-controller-box/simple-example/utils/rp2040_u2if.py b/m8-controller-box/simple-example/utils/rp2040_u2if.py new file mode 100644 index 000000000..051c45451 --- /dev/null +++ b/m8-controller-box/simple-example/utils/rp2040_u2if.py @@ -0,0 +1,529 @@ +# SPDX-FileCopyrightText: 2021 Melissa LeBlanc-Williams for Adafruit Industries +# +# SPDX-License-Identifier: MIT +"""Helper class for use with RP2040 running u2if firmware""" +# https://github.com/execuc/u2if + +import os +import time +import hid + +# Use to set delay between reset and device reopen. if negative, don't reset at all +RP2040_U2IF_RESET_DELAY = float(os.environ.get("RP2040_U2IF_RESET_DELAY", 1)) + +# pylint: disable=import-outside-toplevel,too-many-branches,too-many-statements +# pylint: disable=too-many-arguments,too-many-function-args, too-many-public-methods + + +class RP2040_u2if: + """Helper class for use with RP2040 running u2if firmware""" + + # MISC + RESP_OK = 0x01 + SYS_RESET = 0x10 + + # GPIO + GPIO_INIT_PIN = 0x20 + GPIO_SET_VALUE = 0x21 + GPIO_GET_VALUE = 0x22 + # Values + GPIO_IN = 0 + GPIO_OUT = 1 + GPIO_LOW = 0 + GPIO_HIGH = 1 + GPIO_PULL_NONE = 0 + GPIO_PULL_UP = 1 + GPIO_PULL_DOWN = 2 + + # ADC + ADC_INIT_PIN = 0x40 + ADC_GET_VALUE = 0x41 + + # I2C + I2C0_INIT = 0x80 + I2C0_DEINIT = 0x81 + I2C0_WRITE = 0x82 + I2C0_READ = 0x83 + I2C0_WRITE_FROM_UART = 0x84 + I2C1_INIT = I2C0_INIT + 0x10 + I2C1_DEINIT = I2C0_DEINIT + 0x10 + I2C1_WRITE = I2C0_WRITE + 0x10 + I2C1_READ = I2C0_READ + 0x10 + I2C1_WRITE_FROM_UART = I2C0_WRITE_FROM_UART + 0x10 + + # SPI + SPI0_INIT = 0x60 + SPI0_DEINIT = 0x61 + SPI0_WRITE = 0x62 + SPI0_READ = 0x63 + SPI0_WRITE_FROM_UART = 0x64 + SPI1_INIT = SPI0_INIT + 0x10 + SPI1_DEINIT = SPI0_DEINIT + 0x10 + SPI1_WRITE = SPI0_WRITE + 0x10 + SPI1_READ = SPI0_READ + 0x10 + SPI1_WRITE_FROM_UART = SPI0_WRITE_FROM_UART + 0x10 + + # WS2812B (LED) + WS2812B_INIT = 0xA0 + WS2812B_DEINIT = 0xA1 + WS2812B_WRITE = 0xA2 + + # PWM + PWM_INIT_PIN = 0x30 + PWM_DEINIT_PIN = 0x31 + PWM_SET_FREQ = 0x32 + PWM_GET_FREQ = 0x33 + PWM_SET_DUTY_U16 = 0x34 + PWM_GET_DUTY_U16 = 0x35 + PWM_SET_DUTY_NS = 0x36 + PWM_GET_DUTY_NS = 0x37 + + def __init__(self): + self._vid = None + self._pid = None + self._hid = None + self._opened = False + self._i2c_index = None + self._spi_index = None + self._serial = None + self._neopixel_initialized = False + self._uart_rx_buffer = None + + def _hid_xfer(self, report, response=True): + """Perform HID Transfer""" + # first byte is report ID, which =0 + # remaing bytes = 64 byte report data + # https://github.com/libusb/hidapi/blob/083223e77952e1ef57e6b77796536a3359c1b2a3/hidapi/hidapi.h#L185 + self._hid.write(b"\0" + report + b"\0" * (64 - len(report))) + if response: + # return is 64 byte response report + ret = self._hid.read( + 64, timeout_ms=1000 + ) # Sometimes this can hang, but if we time out the call, it will work on the next call + assert len(ret) == 64, "HID Timeout occurred." + return ret + return None + + def _reset(self): + self._hid_xfer(bytes([self.SYS_RESET]), False) + self._hid.close() + time.sleep(RP2040_U2IF_RESET_DELAY) + start = time.monotonic() + while time.monotonic() - start < 5: + try: + self._hid.open(self._vid, self._pid, self._serial) + except OSError: + time.sleep(0.1) + continue + return + raise OSError("RP2040 u2if open error.") + + # ---------------------------------------------------------------- + # MISC + # ---------------------------------------------------------------- + def open(self, vid=0xCAFE, pid=0x4005, serial=None): + """Open HID interface for given USB VID and PID.""" + + if self._opened: + return + self._vid = vid + self._pid = pid + self._serial = serial + self._hid = hid.device() + self._hid.open(self._vid, self._pid, self._serial) + if RP2040_U2IF_RESET_DELAY >= 0: + self._reset() + self._opened = True + + def close(self): + """Close HID interface.""" + if not self._opened: + return + self._hid_xfer(bytes([self.SYS_RESET]), True) + self._hid.close() + self._opened = False + + # ---------------------------------------------------------------- + # GPIO + # ---------------------------------------------------------------- + def gpio_init_pin(self, pin_id, direction, pull): + """Configure GPIO Pin.""" + self._hid_xfer( + bytes( + [ + self.GPIO_INIT_PIN, + pin_id, + direction, + pull, + ] + ) + ) + + def gpio_set_pin(self, pin_id, value): + """Set Current GPIO Pin Value""" + self._hid_xfer( + bytes( + [ + self.GPIO_SET_VALUE, + pin_id, + int(value), + ] + ) + ) + + def gpio_get_pin(self, pin_id): + """Get Current GPIO Pin Value""" + resp = self._hid_xfer( + bytes( + [ + self.GPIO_GET_VALUE, + pin_id, + ] + ), + True, + ) + return resp[3] != 0x00 + + # ---------------------------------------------------------------- + # ADC + # ---------------------------------------------------------------- + def adc_init_pin(self, pin_id): + """Configure ADC Pin.""" + self._hid_xfer( + bytes( + [ + self.ADC_INIT_PIN, + pin_id, + ] + ) + ) + + def adc_get_value(self, pin_id): + """Get ADC value for pin.""" + resp = self._hid_xfer( + bytes( + [ + self.ADC_GET_VALUE, + pin_id, + ] + ), + True, + ) + return int.from_bytes(resp[3 : 3 + 2], byteorder="little") + + # ---------------------------------------------------------------- + # I2C + # ---------------------------------------------------------------- + def i2c_configure(self, baudrate, pullup=False): + """Configure I2C.""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + resp = self._hid_xfer( + bytes( + [ + self.I2C0_INIT if self._i2c_index == 0 else self.I2C1_INIT, + 0x00 if not pullup else 0x01, + ] + ) + + baudrate.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C init error.") + + def i2c_set_port(self, index): + """Set I2C port.""" + if index not in (0, 1): + raise ValueError("I2C index must be 0 or 1.") + self._i2c_index = index + + def _i2c_write(self, address, buffer, start=0, end=None, stop=True): + """Write data from the buffer to an address""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + end = end if end else len(buffer) + + write_cmd = self.I2C0_WRITE if self._i2c_index == 0 else self.I2C1_WRITE + stop_flag = 0x01 if stop else 0x00 + + while (end - start) > 0: + remain_bytes = end - start + chunk = min(remain_bytes, 64 - 7) + resp = self._hid_xfer( + bytes([write_cmd, address, stop_flag]) + + remain_bytes.to_bytes(4, byteorder="little") + + buffer[start : (start + chunk)], + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C write error") + start += chunk + + def _i2c_read(self, address, buffer, start=0, end=None): + """Read data from an address and into the buffer""" + # TODO: support chunkified reads + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + + end = end if end else len(buffer) + + read_cmd = self.I2C0_READ if self._i2c_index == 0 else self.I2C1_READ + stop_flag = 0x01 # always stop + read_size = end - start + + resp = self._hid_xfer(bytes([read_cmd, address, stop_flag, read_size]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("I2C write error") + # move into buffer + for i in range(read_size): + buffer[start + i] = resp[i + 2] + + def i2c_writeto(self, address, buffer, *, start=0, end=None): + """Write data from the buffer to an address""" + self._i2c_write(address, buffer, start, end) + + def i2c_readfrom_into(self, address, buffer, *, start=0, end=None): + """Read data from an address and into the buffer""" + self._i2c_read(address, buffer, start, end) + + def i2c_writeto_then_readfrom( + self, + address, + out_buffer, + in_buffer, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None, + ): + """Write data from buffer_out to an address and then + read data from an address and into buffer_in + """ + self._i2c_write(address, out_buffer, out_start, out_end, False) + self._i2c_read(address, in_buffer, in_start, in_end) + + def i2c_scan(self, *, start=0, end=0x79): + """Perform an I2C Device Scan""" + if self._i2c_index is None: + raise RuntimeError("I2C bus not initialized.") + found = [] + for addr in range(start, end + 1): + # try a write + try: + self.i2c_writeto(addr, b"\x00\x00\x00") + except RuntimeError: # no reply! + continue + # store if success + found.append(addr) + return found + + # ---------------------------------------------------------------- + # SPI + # ---------------------------------------------------------------- + def spi_configure(self, baudrate): + """Configure SPI.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + resp = self._hid_xfer( + bytes( + [ + self.SPI0_INIT if self._spi_index == 0 else self.SPI1_INIT, + 0x00, # mode, not yet implemented + ] + ) + + baudrate.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI init error.") + + def spi_set_port(self, index): + """Set SPI port.""" + if index not in (0, 1): + raise ValueError("SPI index must be 0 or 1.") + self._spi_index = index + + def spi_write(self, buffer, *, start=0, end=None): + """SPI write.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + end = end if end else len(buffer) + + write_cmd = self.SPI0_WRITE if self._spi_index == 0 else self.SPI1_WRITE + + while (end - start) > 0: + remain_bytes = end - start + chunk = min(remain_bytes, 64 - 3) + resp = self._hid_xfer( + bytes([write_cmd, chunk]) + buffer[start : (start + chunk)], True + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI write error") + start += chunk + + def spi_readinto(self, buffer, *, start=0, end=None, write_value=0): + """SPI readinto.""" + if self._spi_index is None: + raise RuntimeError("SPI bus not initialized.") + + end = end if end else len(buffer) + read_cmd = self.SPI0_READ if self._spi_index == 0 else self.SPI1_READ + read_size = end - start + + resp = self._hid_xfer(bytes([read_cmd, write_value, read_size]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("SPI write error") + # move into buffer + for i in range(read_size): + buffer[start + i] = resp[i + 2] + + def spi_write_readinto( + self, + buffer_out, + buffer_in, + *, + out_start=0, + out_end=None, + in_start=0, + in_end=None, + ): + """SPI write and readinto.""" + raise NotImplementedError("SPI write_readinto Not implemented") + + # ---------------------------------------------------------------- + # NEOPIXEL + # ---------------------------------------------------------------- + def neopixel_write(self, gpio, buf): + """NeoPixel write.""" + # open serial (data is sent over this) + if self._serial is None: + import serial + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + for port in ports: + if port.vid == self._vid and port.pid == self._pid: + self._serial = serial.Serial(port.device) + break + if self._serial is None: + raise RuntimeError("Could not find Pico com port.") + + # init + if not self._neopixel_initialized: + # deinit any current setup + # pylint: disable=protected-access + self._hid_xfer(bytes([self.WS2812B_DEINIT])) + resp = self._hid_xfer( + bytes( + [ + self.WS2812B_INIT, + gpio._pin.id, + ] + ), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("Neopixel init error") + self._neopixel_initialized = True + + self._serial.reset_output_buffer() + + # write + # command is done over HID + remain_bytes = len(buf) + resp = self._hid_xfer( + bytes([self.WS2812B_WRITE]) + remain_bytes.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + # pylint: disable=no-else-raise + if resp[2] == 0x01: + raise RuntimeError( + "Neopixel write error : too many pixel for the firmware." + ) + elif resp[2] == 0x02: + raise RuntimeError( + "Neopixel write error : transfer already in progress." + ) + else: + raise RuntimeError("Neopixel write error.") + # buffer is sent over serial + self._serial.write(buf) + # hack (see u2if) + if len(buf) % 64 == 0: + self._serial.write([0]) + self._serial.flush() + # polling loop to wait for write complete? + time.sleep(0.1) + resp = self._hid.read(64) + while resp[0] != self.WS2812B_WRITE: + resp = self._hid.read(64) + if resp[1] != self.RESP_OK: + raise RuntimeError("Neopixel write (flush) error.") + + # ---------------------------------------------------------------- + # PWM + # ---------------------------------------------------------------- + # pylint: disable=unused-argument + def pwm_configure( + self, pin_id: int, frequency=500, duty_cycle=0, variable_frequency=False + ): + """Configure PWM.""" + self.pwm_deinit(pin_id) + resp = self._hid_xfer(bytes([self.PWM_INIT_PIN, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM init error.") + + self.pwm_set_frequency(pin_id, frequency) + self.pwm_set_duty_cycle(pin_id, duty_cycle) + + def pwm_deinit(self, pin_id: int): + """Deinit PWM.""" + self._hid_xfer(bytes([self.PWM_DEINIT_PIN, pin_id])) + + def pwm_get_frequency(self, pin_id: int): + """PWM get freq.""" + resp = self._hid_xfer(bytes([self.PWM_GET_FREQ, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM get frequency error.") + return int.from_bytes(resp[3 : 3 + 4], byteorder="little") + + def pwm_set_frequency(self, pin_id: int, frequency): + """PWM set freq.""" + resp = self._hid_xfer( + bytes([self.PWM_SET_FREQ, pin_id]) + + frequency.to_bytes(4, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + # pylint: disable=no-else-raise + if resp[3] == 0x01: + raise RuntimeError("PWM different frequency on same slice.") + elif resp[3] == 0x02: + raise RuntimeError("PWM frequency too low.") + elif resp[3] == 0x03: + raise RuntimeError("PWM frequency too high.") + else: + raise RuntimeError("PWM frequency error.") + + def pwm_get_duty_cycle(self, pin_id: int): + """PWM get duty cycle.""" + resp = self._hid_xfer(bytes([self.PWM_GET_DUTY_U16, pin_id]), True) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM get duty cycle error.") + return int.from_bytes(resp[3 : 3 + 4], byteorder="little") + + def pwm_set_duty_cycle(self, pin_id: int, duty_cycle): + """PWM set duty cycle.""" + resp = self._hid_xfer( + bytes([self.PWM_SET_DUTY_U16, pin_id]) + + duty_cycle.to_bytes(2, byteorder="little"), + True, + ) + if resp[1] != self.RESP_OK: + raise RuntimeError("PWM set duty cycle error.")