Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added monkey_bot/.DS_Store
Binary file not shown.
295 changes: 295 additions & 0 deletions monkey_bot/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
import depthai as dai
# Attempt to import cv2; if unavailable, try to install opencv-python and re-import.
try:
import cv2
except Exception:
import sys
import subprocess
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "opencv-python"])
import importlib
cv2 = importlib.import_module("cv2")
except Exception as e:
raise ImportError(
"The 'cv2' module (OpenCV) is required but could not be imported or installed automatically. "
"Please install it manually with: pip install opencv-python"
) from e

import numpy as np
import time
import math
import serial
from utils.arguments import initialize_argparser

def detect_colors_hsv(frame, draw=True):
"""
Detect red and blue colors in the frame using HSV color space
Returns the frame with outlines drawn around detected colors and the steering value
"""
# Convert to BGR format if needed (from NV12)
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)

# Get frame dimensions
frame_height, frame_width = frame.shape[:2]
frame_center_x = frame_width // 2

# Convert BGR to HSV
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

# Define HSV ranges for red and blue
# Red needs two ranges as it wraps around 0
red_lower1 = np.array([0, 120, 70])
red_upper1 = np.array([10, 255, 255])
red_lower2 = np.array([170, 120, 70])
red_upper2 = np.array([180, 255, 255])

# Blue range
blue_lower = np.array([100, 150, 50])
blue_upper = np.array([130, 255, 255])

# Create masks for each color
red_mask1 = cv2.inRange(hsv, red_lower1, red_upper1)
red_mask2 = cv2.inRange(hsv, red_lower2, red_upper2)
red_mask = cv2.bitwise_or(red_mask1, red_mask2)
blue_mask = cv2.inRange(hsv, blue_lower, blue_upper)

# Apply morphological operations to clean up the masks
kernel = np.ones((5, 5), np.uint8)
red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_CLOSE, kernel)
red_mask = cv2.morphologyEx(red_mask, cv2.MORPH_OPEN, kernel)
blue_mask = cv2.morphologyEx(blue_mask, cv2.MORPH_CLOSE, kernel)
blue_mask = cv2.morphologyEx(blue_mask, cv2.MORPH_OPEN, kernel)

# Find contours for red objects
red_contours, _ = cv2.findContours(red_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

# Find contours for blue objects
blue_contours, _ = cv2.findContours(blue_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

# Variables to store centroids
red_centroid = None
blue_centroid = None

# Draw outlines and labels for red objects
for contour in red_contours:
area = cv2.contourArea(contour)
if area > 500: # Filter small contours
if draw:
# Draw contour outline
cv2.drawContours(frame, [contour], -1, (0, 0, 255), 3)
# Get bounding box
x, y, w, h = cv2.boundingRect(contour)
# Draw bounding box
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
# Add label
cv2.putText(frame, "RED", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

# Draw centroid
M = cv2.moments(contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
if draw:
cv2.circle(frame, (cx, cy), 7, (0, 0, 255), -1)
# Store the first (largest) red centroid
if red_centroid is None:
red_centroid = (cx, cy)

# Draw outlines and labels for blue objects
for contour in blue_contours:
area = cv2.contourArea(contour)
if area > 500: # Filter small contours
if draw:
# Draw contour outline
cv2.drawContours(frame, [contour], -1, (255, 0, 0), 3)
# Get bounding box
x, y, w, h = cv2.boundingRect(contour)
# Draw bounding box
cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
# Add label
cv2.putText(frame, "BLUE", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)

# Draw centroid
M = cv2.moments(contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
if draw:
cv2.circle(frame, (cx, cy), 7, (255, 0, 0), -1)
# Store the first (largest) blue centroid
if blue_centroid is None:
blue_centroid = (cx, cy)

# Calculate midpoint and steering value
steering_value = None
if red_centroid and blue_centroid:
# Calculate midpoint between red and blue
midpoint_x = (red_centroid[0] + blue_centroid[0]) // 2
midpoint_y = (red_centroid[1] + blue_centroid[1]) // 2

if draw:
# Draw line between red and blue centroids
cv2.line(frame, red_centroid, blue_centroid, (0, 255, 0), 3)
# Draw the midpoint
cv2.circle(frame, (midpoint_x, midpoint_y), 10, (0, 255, 0), -1)
cv2.putText(frame, "MIDPOINT", (midpoint_x - 50, midpoint_y - 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# Draw vertical line at frame center for reference
cv2.line(frame, (frame_center_x, 0), (frame_center_x, frame_height), (255, 255, 0), 2)
cv2.putText(frame, "CENTER", (frame_center_x - 40, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

# Calculate steering value
# Normalize to -1.0 to 1.0 range based on frame width
steering_value = (midpoint_x - frame_center_x) / (frame_width / 2)

if draw:
# Draw steering indicator
steering_text = f"Steering: {steering_value:.3f}"
color = (0, 255, 0) if abs(steering_value) < 0.1 else (0, 165, 255)
cv2.putText(frame, steering_text, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# Draw direction arrow
arrow_start = (frame_center_x, frame_height - 50)
arrow_end = (int(frame_center_x + steering_value * 100), frame_height - 50)
cv2.arrowedLine(frame, arrow_start, arrow_end, color, 3, tipLength=0.3)

# Add detection count info
red_count = len([c for c in red_contours if cv2.contourArea(c) > 500])
blue_count = len([c for c in blue_contours if cv2.contourArea(c) > 500])

if draw:
info_text = f"Red: {red_count} | Blue: {blue_count}"
cv2.putText(frame, info_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

return frame if draw else None, steering_value

def clamp(val, lo, hi):
return lo if val < lo else hi if val > hi else val

_, args = initialize_argparser()

device = dai.Device(dai.DeviceInfo(args.device)) if args.device else dai.Device()
print("Device Information: ", device.getDeviceInfo())

cam_features = {}
for cam in device.getConnectedCameraFeatures():
cam_features[cam.socket] = (cam.width, cam.height)

with dai.Pipeline(device) as pipeline:
print("Creating pipeline (headless)...")

camera_sensors = device.getConnectedCameraFeatures()
color_detection_output = None

for sensor in camera_sensors:
cam = pipeline.create(dai.node.Camera).build(sensor.socket)

request_resolution = (
(sensor.width, sensor.height)
if sensor.width <= 1280 and sensor.height <= 720
else (1280, 720)
) # keep moderate resolution for Pi

# Only need BGR frames for color detection; no encoders/visualizers
if sensor.socket == dai.CameraBoardSocket.CAM_A and color_detection_output is None:
color_detection_output = cam.requestOutput(
request_resolution, dai.ImgFrame.Type.BGR888p, fps=args.fps_limit
)

# Prepare output queue for color detection BEFORE starting the pipeline
color_queue = None
if color_detection_output:
color_queue = color_detection_output.createOutputQueue(maxSize=1, blocking=False)
print("Color detection queue created successfully")

print("Pipeline created.")

# Serial setup for Arduino
ser = None
try:
ser = serial.Serial(args.serial_port, args.baud_rate, timeout=0.1)
# Small delay for Arduino reset on serial open
time.sleep(2.0)
print(f"Opened serial {args.serial_port} @ {args.baud_rate}")
except Exception as e:
print(f"WARNING: Could not open serial port {args.serial_port}: {e}")

def send_turn_command(steering):
"""Map steering (-1..1) to differential turn PWM. No forward motion.
Contract:
- Input: steering float in [-1,1], None means no data.
- Output: writes "m1,m2\n" over serial when available.
- Behavior: left/right wheels spin opposite directions to rotate in place.
"""
if ser is None:
return

if steering is None:
m1 = 0
m2 = 0
else:
s = -steering if args.invert_turn else steering
if abs(s) < args.deadzone:
m1 = 0
m2 = 0
else:
pwm = clamp(int(args.kp_turn * s), -args.max_turn_pwm, args.max_turn_pwm)
# rotate-in-place: opposite directions
# Positive s -> turn RIGHT: left wheel forward (+), right wheel backward (-)
# Map to (m1,m2) consistent with Arduino: setBTS7960(M1, M2)
# Assuming m1 is right motor and inverted in Arduino (1*m1Val), we keep symmetry:
m1 = -pwm # right motor
m2 = pwm # left motor

try:
cmd = f"{m1},{m2}\n"
ser.write(cmd.encode())
except Exception as e:
# If serial fails mid-run, just log once-per error burst
print(f"Serial write error: {e}")

# Main loop
pipeline.start()
print("Starting headless color-based centering. Press Ctrl+C to stop.")

last_send = 0.0
send_interval = 1.0 / max(1, args.command_rate_hz)

try:
while pipeline.isRunning():
if not color_queue:
time.sleep(0.05)
continue

in_frame = color_queue.tryGet()
if in_frame is None:
# No frame ready yet
time.sleep(0.005)
continue

frame = in_frame.getCvFrame()
_, steering_value = detect_colors_hsv(frame, draw=args.debug_viz)

now = time.time()
if now - last_send >= send_interval:
send_turn_command(steering_value)
last_send = now

except KeyboardInterrupt:
print("\nStopping...")
finally:
# Stop motors on exit for safety
try:
if 'ser' in locals() and ser:
ser.write(b"0,0\n")
time.sleep(0.05)
ser.close()
except Exception:
pass

print("Color centering stopped.")
4 changes: 4 additions & 0 deletions monkey_bot/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
depthai==3.0.0
numpy>=1.22
opencv_python>=4.12.0
pyserial>=3.5
Empty file added monkey_bot/utils/__init__.py
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
91 changes: 91 additions & 0 deletions monkey_bot/utils/arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import argparse


def initialize_argparser():
"""Initialize the argument parser for the script."""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)

parser.add_argument(
"-d",
"--device",
help="Optional name, DeviceID or IP of the camera to connect to.",
required=False,
default=None,
type=str,
)

parser.add_argument(
"-fps",
"--fps_limit",
help="FPS limit for the model runtime.",
required=False,
default=30,
type=int,
)

# Serial/controls for Arduino motor driver
parser.add_argument(
"--serial_port",
help="Serial port to Arduino (e.g., /dev/ttyACM0, /dev/ttyUSB0)",
required=False,
default="/dev/ttyACM0",
type=str,
)

parser.add_argument(
"--baud_rate",
help="Baud rate for Arduino serial.",
required=False,
default=9600,
type=int,
)

parser.add_argument(
"--kp_turn",
help="Proportional gain mapping steering (-1..1) to turn PWM.",
required=False,
default=180.0,
type=float,
)

parser.add_argument(
"--max_turn_pwm",
help="Max absolute PWM for turn commands (0..255).",
required=False,
default=200,
type=int,
)

parser.add_argument(
"--deadzone",
help="Deadzone around zero steering to avoid jitter (0..1).",
required=False,
default=0.05,
type=float,
)

parser.add_argument(
"--command_rate_hz",
help="How often to send serial motor commands (Hz).",
required=False,
default=20,
type=int,
)

parser.add_argument(
"--invert_turn",
help="Invert turning direction if the robot rotates the wrong way.",
action="store_true",
)

parser.add_argument(
"--debug_viz",
help="Enable drawing and extra logs for debugging (slower).",
action="store_true",
)

args = parser.parse_args()

return parser, args