diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..86be4a9 Binary files /dev/null and b/.DS_Store differ diff --git a/monkey_bot/.DS_Store b/monkey_bot/.DS_Store new file mode 100644 index 0000000..16c172a Binary files /dev/null and b/monkey_bot/.DS_Store differ diff --git a/monkey_bot/main.py b/monkey_bot/main.py new file mode 100644 index 0000000..f8f704a --- /dev/null +++ b/monkey_bot/main.py @@ -0,0 +1,295 @@ +import depthai as dai +# Attempt to import cv2; if unavailable, try to install opencv-python and re-import. +try: + import cv2 +except Exception: + import sys + import subprocess + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", "opencv-python"]) + import importlib + cv2 = importlib.import_module("cv2") + except Exception as e: + raise ImportError( + "The 'cv2' module (OpenCV) is required but could not be imported or installed automatically. " + "Please install it manually with: pip install opencv-python" + ) from e + +import numpy as np +import time +import math +import serial +from utils.arguments import initialize_argparser + +def detect_colors_hsv(frame, draw=True): + """ + Detect red and blue colors in the frame using HSV color space + Returns the frame with outlines drawn around detected colors and the steering value + """ + # Convert to BGR format if needed (from NV12) + if len(frame.shape) == 2: + frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) + + # Get frame dimensions + frame_height, frame_width = frame.shape[:2] + frame_center_x = frame_width // 2 + + # Convert BGR to HSV + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + + # Define HSV ranges for red and blue + # Red needs two ranges as it wraps around 0 + red_lower1 = np.array([0, 120, 70]) + red_upper1 = np.array([10, 255, 255]) + red_lower2 = np.array([170, 120, 70]) + red_upper2 = np.array([180, 255, 255]) + + # Blue range + blue_lower = np.array([100, 150, 50]) + blue_upper = np.array([130, 255, 255]) + + # Create masks for each color + red_mask1 = cv2.inRange(hsv, red_lower1, red_upper1) + red_mask2 = cv2.inRange(hsv, red_lower2, red_upper2) + red_mask = cv2.bitwise_or(red_mask1, red_mask2) + blue_mask = cv2.inRange(hsv, blue_lower, blue_upper) + + # Apply morphological operations to clean up the masks + kernel = np.ones((5, 5), np.uint8) + red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_CLOSE, kernel) + red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_OPEN, kernel) + blue_mask = cv2.morphologyEx(blue_mask, cv2.MORPH_CLOSE, kernel) + blue_mask = cv2.morphologyEx(blue_mask, cv2.MORPH_OPEN, kernel) + + # Find contours for red objects + red_contours, _ = cv2.findContours(red_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + # Find contours for blue objects + blue_contours, _ = cv2.findContours(blue_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + # Variables to store centroids + red_centroid = None + blue_centroid = None + + # Draw outlines and labels for red objects + for contour in red_contours: + area = cv2.contourArea(contour) + if area > 500: # Filter small contours + if draw: + # Draw contour outline + cv2.drawContours(frame, [contour], -1, (0, 0, 255), 3) + # Get bounding box + x, y, w, h = cv2.boundingRect(contour) + # Draw bounding box + cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) + # Add label + cv2.putText(frame, "RED", (x, y - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) + + # Draw centroid + M = cv2.moments(contour) + if M["m00"] != 0: + cx = int(M["m10"] / M["m00"]) + cy = int(M["m01"] / M["m00"]) + if draw: + cv2.circle(frame, (cx, cy), 7, (0, 0, 255), -1) + # Store the first (largest) red centroid + if red_centroid is None: + red_centroid = (cx, cy) + + # Draw outlines and labels for blue objects + for contour in blue_contours: + area = cv2.contourArea(contour) + if area > 500: # Filter small contours + if draw: + # Draw contour outline + cv2.drawContours(frame, [contour], -1, (255, 0, 0), 3) + # Get bounding box + x, y, w, h = cv2.boundingRect(contour) + # Draw bounding box + cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2) + # Add label + cv2.putText(frame, "BLUE", (x, y - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) + + # Draw centroid + M = cv2.moments(contour) + if M["m00"] != 0: + cx = int(M["m10"] / M["m00"]) + cy = int(M["m01"] / M["m00"]) + if draw: + cv2.circle(frame, (cx, cy), 7, (255, 0, 0), -1) + # Store the first (largest) blue centroid + if blue_centroid is None: + blue_centroid = (cx, cy) + + # Calculate midpoint and steering value + steering_value = None + if red_centroid and blue_centroid: + # Calculate midpoint between red and blue + midpoint_x = (red_centroid[0] + blue_centroid[0]) // 2 + midpoint_y = (red_centroid[1] + blue_centroid[1]) // 2 + + if draw: + # Draw line between red and blue centroids + cv2.line(frame, red_centroid, blue_centroid, (0, 255, 0), 3) + # Draw the midpoint + cv2.circle(frame, (midpoint_x, midpoint_y), 10, (0, 255, 0), -1) + cv2.putText(frame, "MIDPOINT", (midpoint_x - 50, midpoint_y - 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) + # Draw vertical line at frame center for reference + cv2.line(frame, (frame_center_x, 0), (frame_center_x, frame_height), (255, 255, 0), 2) + cv2.putText(frame, "CENTER", (frame_center_x - 40, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) + + # Calculate steering value + # Normalize to -1.0 to 1.0 range based on frame width + steering_value = (midpoint_x - frame_center_x) / (frame_width / 2) + + if draw: + # Draw steering indicator + steering_text = f"Steering: {steering_value:.3f}" + color = (0, 255, 0) if abs(steering_value) < 0.1 else (0, 165, 255) + cv2.putText(frame, steering_text, (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) + # Draw direction arrow + arrow_start = (frame_center_x, frame_height - 50) + arrow_end = (int(frame_center_x + steering_value * 100), frame_height - 50) + cv2.arrowedLine(frame, arrow_start, arrow_end, color, 3, tipLength=0.3) + + # Add detection count info + red_count = len([c for c in red_contours if cv2.contourArea(c) > 500]) + blue_count = len([c for c in blue_contours if cv2.contourArea(c) > 500]) + + if draw: + info_text = f"Red: {red_count} | Blue: {blue_count}" + cv2.putText(frame, info_text, (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + + return frame if draw else None, steering_value + +def clamp(val, lo, hi): + return lo if val < lo else hi if val > hi else val + +_, args = initialize_argparser() + +device = dai.Device(dai.DeviceInfo(args.device)) if args.device else dai.Device() +print("Device Information: ", device.getDeviceInfo()) + +cam_features = {} +for cam in device.getConnectedCameraFeatures(): + cam_features[cam.socket] = (cam.width, cam.height) + +with dai.Pipeline(device) as pipeline: + print("Creating pipeline (headless)...") + + camera_sensors = device.getConnectedCameraFeatures() + color_detection_output = None + + for sensor in camera_sensors: + cam = pipeline.create(dai.node.Camera).build(sensor.socket) + + request_resolution = ( + (sensor.width, sensor.height) + if sensor.width <= 1280 and sensor.height <= 720 + else (1280, 720) + ) # keep moderate resolution for Pi + + # Only need BGR frames for color detection; no encoders/visualizers + if sensor.socket == dai.CameraBoardSocket.CAM_A and color_detection_output is None: + color_detection_output = cam.requestOutput( + request_resolution, dai.ImgFrame.Type.BGR888p, fps=args.fps_limit + ) + + # Prepare output queue for color detection BEFORE starting the pipeline + color_queue = None + if color_detection_output: + color_queue = color_detection_output.createOutputQueue(maxSize=1, blocking=False) + print("Color detection queue created successfully") + + print("Pipeline created.") + + # Serial setup for Arduino + ser = None + try: + ser = serial.Serial(args.serial_port, args.baud_rate, timeout=0.1) + # Small delay for Arduino reset on serial open + time.sleep(2.0) + print(f"Opened serial {args.serial_port} @ {args.baud_rate}") + except Exception as e: + print(f"WARNING: Could not open serial port {args.serial_port}: {e}") + + def send_turn_command(steering): + """Map steering (-1..1) to differential turn PWM. No forward motion. + Contract: + - Input: steering float in [-1,1], None means no data. + - Output: writes "m1,m2\n" over serial when available. + - Behavior: left/right wheels spin opposite directions to rotate in place. + """ + if ser is None: + return + + if steering is None: + m1 = 0 + m2 = 0 + else: + s = -steering if args.invert_turn else steering + if abs(s) < args.deadzone: + m1 = 0 + m2 = 0 + else: + pwm = clamp(int(args.kp_turn * s), -args.max_turn_pwm, args.max_turn_pwm) + # rotate-in-place: opposite directions + # Positive s -> turn RIGHT: left wheel forward (+), right wheel backward (-) + # Map to (m1,m2) consistent with Arduino: setBTS7960(M1, M2) + # Assuming m1 is right motor and inverted in Arduino (1*m1Val), we keep symmetry: + m1 = -pwm # right motor + m2 = pwm # left motor + + try: + cmd = f"{m1},{m2}\n" + ser.write(cmd.encode()) + except Exception as e: + # If serial fails mid-run, just log once-per error burst + print(f"Serial write error: {e}") + + # Main loop + pipeline.start() + print("Starting headless color-based centering. Press Ctrl+C to stop.") + + last_send = 0.0 + send_interval = 1.0 / max(1, args.command_rate_hz) + + try: + while pipeline.isRunning(): + if not color_queue: + time.sleep(0.05) + continue + + in_frame = color_queue.tryGet() + if in_frame is None: + # No frame ready yet + time.sleep(0.005) + continue + + frame = in_frame.getCvFrame() + _, steering_value = detect_colors_hsv(frame, draw=args.debug_viz) + + now = time.time() + if now - last_send >= send_interval: + send_turn_command(steering_value) + last_send = now + + except KeyboardInterrupt: + print("\nStopping...") + finally: + # Stop motors on exit for safety + try: + if 'ser' in locals() and ser: + ser.write(b"0,0\n") + time.sleep(0.05) + ser.close() + except Exception: + pass + +print("Color centering stopped.") \ No newline at end of file diff --git a/monkey_bot/requirements.txt b/monkey_bot/requirements.txt new file mode 100644 index 0000000..4cf0ed1 --- /dev/null +++ b/monkey_bot/requirements.txt @@ -0,0 +1,4 @@ +depthai==3.0.0 +numpy>=1.22 +opencv_python>=4.12.0 +pyserial>=3.5 \ No newline at end of file diff --git a/monkey_bot/utils/__init__.py b/monkey_bot/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/monkey_bot/utils/__pycache__/__init__.cpython-311.pyc b/monkey_bot/utils/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..6110db7 Binary files /dev/null and b/monkey_bot/utils/__pycache__/__init__.cpython-311.pyc differ diff --git a/monkey_bot/utils/__pycache__/__init__.cpython-313.pyc b/monkey_bot/utils/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..f1c01a1 Binary files /dev/null and b/monkey_bot/utils/__pycache__/__init__.cpython-313.pyc differ diff --git a/monkey_bot/utils/__pycache__/arguments.cpython-311.pyc b/monkey_bot/utils/__pycache__/arguments.cpython-311.pyc new file mode 100644 index 0000000..efcb830 Binary files /dev/null and b/monkey_bot/utils/__pycache__/arguments.cpython-311.pyc differ diff --git a/monkey_bot/utils/__pycache__/arguments.cpython-313.pyc b/monkey_bot/utils/__pycache__/arguments.cpython-313.pyc new file mode 100644 index 0000000..6e93cc2 Binary files /dev/null and b/monkey_bot/utils/__pycache__/arguments.cpython-313.pyc differ diff --git a/monkey_bot/utils/arguments.py b/monkey_bot/utils/arguments.py new file mode 100644 index 0000000..07e493b --- /dev/null +++ b/monkey_bot/utils/arguments.py @@ -0,0 +1,91 @@ +import argparse + + +def initialize_argparser(): + """Initialize the argument parser for the script.""" + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "-d", + "--device", + help="Optional name, DeviceID or IP of the camera to connect to.", + required=False, + default=None, + type=str, + ) + + parser.add_argument( + "-fps", + "--fps_limit", + help="FPS limit for the model runtime.", + required=False, + default=30, + type=int, + ) + + # Serial/controls for Arduino motor driver + parser.add_argument( + "--serial_port", + help="Serial port to Arduino (e.g., /dev/ttyACM0, /dev/ttyUSB0)", + required=False, + default="/dev/ttyACM0", + type=str, + ) + + parser.add_argument( + "--baud_rate", + help="Baud rate for Arduino serial.", + required=False, + default=9600, + type=int, + ) + + parser.add_argument( + "--kp_turn", + help="Proportional gain mapping steering (-1..1) to turn PWM.", + required=False, + default=180.0, + type=float, + ) + + parser.add_argument( + "--max_turn_pwm", + help="Max absolute PWM for turn commands (0..255).", + required=False, + default=200, + type=int, + ) + + parser.add_argument( + "--deadzone", + help="Deadzone around zero steering to avoid jitter (0..1).", + required=False, + default=0.05, + type=float, + ) + + parser.add_argument( + "--command_rate_hz", + help="How often to send serial motor commands (Hz).", + required=False, + default=20, + type=int, + ) + + parser.add_argument( + "--invert_turn", + help="Invert turning direction if the robot rotates the wrong way.", + action="store_true", + ) + + parser.add_argument( + "--debug_viz", + help="Enable drawing and extra logs for debugging (slower).", + action="store_true", + ) + + args = parser.parse_args() + + return parser, args