Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Reachy Mini controlled by an LLM via MCP with a special architecture and client to allow background tasks for very agentic capabilities.

## Requirements

Using Reachy Mini Lite for easy media stream.
Tested on Windows with Python 3.12.

Expand All @@ -11,6 +12,7 @@ Defaults to local and if endpoint is not accessible, uses Groq.
Currently using gpt-oss-20b.

### Local LLM

For local setup, I SSH into a GPU server and deploy with [vLLM](https://docs.vllm.ai/en/latest/getting_started/quickstart/):

`vllm serve openai/gpt-oss-20b --tool-call-parser openai --enable-auto-tool-choice --port 6000`
Expand All @@ -22,6 +24,7 @@ This is done in VS Code for the automatic port forwarding. To test this is succe
This endpoint is currently hardcoded. Change in code if different.

### Groq API

[Groq](https://console.groq.com/keys) is an inference provider with a free tier for personal use but has limits.
To use it, get an API key and set it as an environment variable.

Expand All @@ -40,8 +43,8 @@ python -m venv reachy_mini_env
pip install -r requirements.txt
```


## Usage

Start Reachy Mini's server on the default port 8000:

`uv run reachy-mini-daemon`
Expand All @@ -67,24 +70,50 @@ Or, when the agent is running, visit `http://localhost:8765/` in your browser.
There are the basic robot MCP tools and some more advanced cool ones like facial analysis.
Ask the LLM to learn more.

### Reliability & Timeouts

The agent is configured with:

- **MCP connection timeout**: 60 seconds for tool calls, 10 seconds for initial connection
- **Automatic retries**: Tools retry up to 3 times on failure
- **Connection pooling**: Up to 100 concurrent connections to prevent bottlenecks

If tools freeze or timeout, ensure both MCP servers are running and accessible.

### Background Workers

There are two MCP tools that make this work.

First is the tool to **launch a background worker**.
This calls the same agent script as the main agent in a subprocess with instructions that will be injected into the system prompt.
The background worker has access to the same MCP server as the main agent and its job is to do that specific task.
Logs can be found in `logs/workers/`.
We keep track of worker ids and system prompts so if a subprocess dies unexpectedly, we can tell the main agent.
Logs can be found in `logs/workers/`.
We keep track of worker ids and system prompts so if a subprocess dies unexpectedly, we can tell the main agent.
What does dying unexpectedly mean? That brings us to the second tool.

We introduce server to client communication.
This comes in the form of the **callback** MCP tool.
We introduce server to client communication.
This comes in the form of the **callback** MCP tool.
Background workers are instructed to call this tool when they have completed their task.
The tool posts to an endpoint our custom agentic client has exposed.
On the client side endpoint, the callback message will be injected as a special user message for the main agent to process and to inform the user.

### STT

STT uses a similar workflow as the background workers, but it's special enough to warrant a 'hack'.
The STT loop is always started programmatically when the MCP server is launched.
It simulates natural conversation flow by listening until pauses with VAD and then transcribing with Whisper.
After a complete user turn, and the user pauses, this tool posts to the client endpoint similar to the callback tool.
After a complete user turn, and the user pauses, this tool posts to the client endpoint similar to the callback tool.

### Idle Breathing

The robot features an automatic breathing animation inspired by the [reachy-personal-assistant](https://github.com/brevdev/reachy-personal-assistant) project. When idle for more than 2 seconds:

- Gentle z-axis breathing motion (5mm amplitude, 6 breaths/minute)
- Antenna sway in opposite directions (15° amplitude)
- Automatic pause when performing actions, resume after inactivity

## Credits

This project incorporates code and concepts from:

- **[reachy-personal-assistant](https://github.com/brevdev/reachy-personal-assistant)** by [brevdev](https://github.com/brevdev) - Breathing animation system and movement manager architecture
33 changes: 29 additions & 4 deletions rag_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,23 @@
)

print(model)
mcp_server = MCPServerStreamableHTTP("http://localhost:5001/mcp")
mcp_server2 = MCPServerStreamableHTTP("http://localhost:9090/mcp")
# Configure MCP servers with increased timeouts to prevent freezing
import httpx
mcp_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(60.0, connect=10.0, read=60.0), # 60s timeout, 10s connect
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
print("Initializing MCP server connection to http://localhost:5001/mcp ...")
mcp_server = MCPServerStreamableHTTP(
"http://localhost:5001/mcp",
http_client=mcp_http_client
)
# If adding any additional MCP servers, ensure to add the tool
# to the toolsets below in the _make_agent function
# mcp_server2 = MCPServerStreamableHTTP(
# "http://localhost:9090/mcp",
# http_client=mcp_http_client
# )


BASE_INSTRUCTIONS = """
Expand Down Expand Up @@ -95,8 +110,9 @@ def _make_agent(extra_instructions: str = "") -> Agent:
instructions = f"{instructions}\n\n{extra_instructions}"
return Agent(
model,
toolsets=[mcp_server, mcp_server2],
toolsets=[mcp_server],
instructions=instructions,
retries=3, # Retry failed tool calls up to 3 times
)


Expand Down Expand Up @@ -219,7 +235,16 @@ def _event_worker() -> None:
worker_message = f"[Worker callback] {message} (worker_id={worker_id}, done={done}). Inform the user."
try:
print("running agent with message: " + worker_message)
result = agent.run_sync(worker_message, message_history=_message_history)
import asyncio
# Set a reasonable timeout for the agent run to prevent indefinite freezing
try:
result = agent.run_sync(worker_message, message_history=_message_history)
except asyncio.TimeoutError:
error_msg = f"[Timeout] Agent took too long to respond (>120s). MCP tools may be unresponsive."
print(error_msg)
_push_outgoing("model", error_msg, worker_id=payload.get("worker_id"), done=payload.get("done"))
continue

_message_history.clear()
_message_history.extend(result.all_messages())
output = (result.output or "").strip()
Expand Down
4 changes: 3 additions & 1 deletion server/controller/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .audio import speak
from .controls import play_emotion, list_emotions, goto_target
from .vision import take_picture, save_image_person, describe_image, detect_faces, analyze_face, _IMAGES_DIR
from .vision import take_picture, save_image_person, describe_image, detect_faces, analyze_face, _IMAGES_DIR
from .movement_manager import MovementManager
from .breathing import BreathingMove
100 changes: 100 additions & 0 deletions server/controller/breathing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Breathing move with interpolation to neutral and continuous breathing patterns."""

from __future__ import annotations
import numpy as np
from numpy.typing import NDArray
from typing import Tuple

from reachy_mini.motion.move import Move
from reachy_mini.utils import create_head_pose


class BreathingMove(Move): # type: ignore
"""Breathing move with interpolation to neutral and then continuous breathing patterns."""

def __init__(
self,
interpolation_start_pose: NDArray[np.float32],
interpolation_start_antennas: Tuple[float, float],
interpolation_duration: float = 1.0,
):
"""Initialize breathing move.

Args:
interpolation_start_pose: 4x4 matrix of current head pose to interpolate from
interpolation_start_antennas: Current antenna positions to interpolate from
interpolation_duration: Duration of interpolation to neutral (seconds)
"""
self.interpolation_start_pose = interpolation_start_pose
self.interpolation_start_antennas = np.array(interpolation_start_antennas)
self.interpolation_duration = interpolation_duration

# Neutral positions for breathing base
self.neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
self.neutral_antennas = np.array([0.0, 0.0])

# Breathing parameters
self.breathing_z_amplitude = 0.005 # 5mm gentle breathing
self.breathing_frequency = 0.1 # Hz (6 breaths per minute)
self.antenna_sway_amplitude = np.deg2rad(15) # 15 degrees
self.antenna_frequency = 0.5 # Hz (faster antenna sway)

@property
def duration(self) -> float:
"""Duration property required by official Move interface."""
return float("inf") # Continuous breathing (never ends naturally)

def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
"""Evaluate breathing move at time t."""
if t < self.interpolation_duration:
# Phase 1: Interpolate to neutral base position
interpolation_t = t / self.interpolation_duration

# Simple linear interpolation for head pose
# (Using simple approach since we don't have linear_pose_interpolation helper)
head_pose = self._interpolate_pose(
self.interpolation_start_pose,
self.neutral_head_pose,
interpolation_t
)

# Interpolate antennas
antennas_interp = (
1 - interpolation_t
) * self.interpolation_start_antennas + interpolation_t * self.neutral_antennas
antennas = antennas_interp.astype(np.float64)

else:
# Phase 2: Breathing patterns from neutral base
breathing_time = t - self.interpolation_duration

# Gentle z-axis breathing
z_offset = self.breathing_z_amplitude * np.sin(
2 * np.pi * self.breathing_frequency * breathing_time
)
head_pose = create_head_pose(
x=0, y=0, z=z_offset, roll=0, pitch=0, yaw=0, degrees=True, mm=False
)

# Antenna sway (opposite directions)
antenna_sway = self.antenna_sway_amplitude * np.sin(
2 * np.pi * self.antenna_frequency * breathing_time
)
antennas = np.array([antenna_sway, -antenna_sway], dtype=np.float64)

# Return in official Move interface format: (head_pose, antennas_array, body_yaw)
return (head_pose, antennas, 0.0)

def _interpolate_pose(
self,
start_pose: NDArray[np.float32],
end_pose: NDArray[np.float32],
t: float
) -> NDArray[np.float64]:
"""Simple linear interpolation for 4x4 transformation matrices.

For production use, consider using proper SE(3) interpolation.
"""
# Linear blend (not geometrically correct but good enough for small movements)
result = (1 - t) * start_pose + t * end_pose
return result.astype(np.float64)
18 changes: 18 additions & 0 deletions server/controller/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@
from reachy_mini.motion.recorded_move import RecordedMoves
from reachy_mini.utils import create_head_pose
from threading import Thread
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .movement_manager import MovementManager

EMOTIONS_DATASET = "pollen-robotics/reachy-mini-emotions-library"
recorded_emotions = RecordedMoves(EMOTIONS_DATASET)
move_names = recorded_emotions.list_moves()
moves_and_descriptions = {name: recorded_emotions.get(name).description for name in move_names}

# Global reference to movement manager (set by server.py)
_movement_manager: "MovementManager | None" = None

def set_movement_manager(manager: "MovementManager") -> None:
"""Set the global movement manager reference."""
global _movement_manager
_movement_manager = manager

def _play_emotion_worker(mini: ReachyMini, emotion: str) -> None:
"""Internal helper to play an emotion in a separate thread."""
move = recorded_emotions.get(emotion)
Expand All @@ -19,6 +31,9 @@ def play_emotion(mini: ReachyMini, emotion: str):
Runs in a background thread to avoid AsyncToSync being used
from the same thread as the FastMCP async event loop.
"""
if _movement_manager:
_movement_manager.mark_activity()

if emotion not in moves_and_descriptions:
return "Emotion not found! Use list_emotions to get the list of available emotions."
else:
Expand Down Expand Up @@ -59,6 +74,9 @@ def goto_target(
method (InterpolationTechnique): Interpolation method to use ("linear", "minjerk", "ease", "cartoon"). Default is "minjerk".
body_yaw (float | None): Body yaw angle in radians. Use None to keep the current yaw.
"""
if _movement_manager:
_movement_manager.mark_activity()

mini.goto_target(
head=create_head_pose(
x=head_x,
Expand Down
Loading