diff --git a/packages/prime-tunnel/src/prime_tunnel/__init__.py b/packages/prime-tunnel/src/prime_tunnel/__init__.py index ff175c5d4..1a4f3718c 100644 --- a/packages/prime-tunnel/src/prime_tunnel/__init__.py +++ b/packages/prime-tunnel/src/prime_tunnel/__init__.py @@ -4,9 +4,11 @@ from prime_tunnel.core import Config, TunnelClient from prime_tunnel.exceptions import ( + BinaryDownloadError, TunnelAuthError, TunnelConnectionError, TunnelError, + TunnelLimitReachedError, TunnelTimeoutError, ) from prime_tunnel.models import TunnelInfo @@ -22,8 +24,10 @@ # Models "TunnelInfo", # Exceptions - "TunnelError", + "BinaryDownloadError", "TunnelAuthError", + "TunnelError", + "TunnelLimitReachedError", "TunnelConnectionError", "TunnelTimeoutError", ] diff --git a/packages/prime-tunnel/src/prime_tunnel/core/client.py b/packages/prime-tunnel/src/prime_tunnel/core/client.py index 1e33d290c..36346c38d 100644 --- a/packages/prime-tunnel/src/prime_tunnel/core/client.py +++ b/packages/prime-tunnel/src/prime_tunnel/core/client.py @@ -10,7 +10,12 @@ ) from prime_tunnel.core.config import Config -from prime_tunnel.exceptions import TunnelAuthError, TunnelError, TunnelTimeoutError +from prime_tunnel.exceptions import ( + TunnelAuthError, + TunnelError, + TunnelLimitReachedError, + TunnelTimeoutError, +) from prime_tunnel.models import TunnelInfo # Retry configuration for transient connection errors @@ -135,6 +140,15 @@ async def _handle_response(self, response: httpx.Response, operation: str) -> Di raise TunnelAuthError("Payment required. Check billing status.") elif response.status_code == 404: return {} # Handle 404 specially in callers + elif response.status_code == 400: + try: + error_detail = response.json().get("detail", response.text) + except Exception: + error_detail = response.text + error_detail = str(error_detail) + if "maximum number of" in error_detail.lower(): + raise TunnelLimitReachedError(error_detail) + raise TunnelError(f"Failed to {operation}: {error_detail}") elif response.status_code >= 400: try: error_detail = response.json().get("detail", response.text) @@ -184,6 +198,8 @@ async def create_tunnel( response = await self._request_with_retry("POST", url, json=payload) except httpx.TimeoutException as e: raise TunnelTimeoutError(f"Request timed out: {e}") from e + except TimeoutError as e: + raise TunnelTimeoutError(f"Request timed out: {e}") from e except httpx.RequestError as e: raise TunnelError(f"Failed to connect to API: {e}") from e @@ -211,6 +227,8 @@ async def get_tunnel(self, tunnel_id: str) -> Optional[TunnelInfo]: response = await self._idempotent_request_with_retry("GET", url) except httpx.TimeoutException as e: raise TunnelTimeoutError(f"Request timed out: {e}") from e + except TimeoutError as e: + raise TunnelTimeoutError(f"Request timed out: {e}") from e except httpx.RequestError as e: raise TunnelError(f"Failed to connect to API: {e}") from e @@ -227,6 +245,7 @@ async def get_tunnel(self, tunnel_id: str) -> Optional[TunnelInfo]: server_port=7000, expires_at=data["expires_at"], user_id=data.get("user_id"), + status=data.get("status"), ) async def delete_tunnel(self, tunnel_id: str) -> bool: @@ -247,6 +266,8 @@ async def delete_tunnel(self, tunnel_id: str) -> bool: response = await self._idempotent_request_with_retry("DELETE", url) except httpx.TimeoutException as e: raise TunnelTimeoutError(f"Request timed out: {e}") from e + except TimeoutError as e: + raise TunnelTimeoutError(f"Request timed out: {e}") from e except httpx.RequestError as e: raise TunnelError(f"Failed to connect to API: {e}") from e @@ -267,6 +288,8 @@ async def bulk_delete_tunnels(self, tunnel_ids: list[str]) -> dict: response = await self._idempotent_request_with_retry("DELETE", url, json=payload) except httpx.TimeoutException as e: raise TunnelTimeoutError(f"Request timed out: {e}") from e + except TimeoutError as e: + raise TunnelTimeoutError(f"Request timed out: {e}") from e except httpx.RequestError as e: raise TunnelError(f"Failed to connect to API: {e}") from e @@ -294,6 +317,8 @@ async def list_tunnels(self, team_id: Optional[str] = None) -> list[TunnelInfo]: response = await self._idempotent_request_with_retry("GET", url, params=params) except httpx.TimeoutException as e: raise TunnelTimeoutError(f"Request timed out: {e}") from e + except TimeoutError as e: + raise TunnelTimeoutError(f"Request timed out: {e}") from e except httpx.RequestError as e: raise TunnelError(f"Failed to connect to API: {e}") from e @@ -310,6 +335,7 @@ async def list_tunnels(self, team_id: Optional[str] = None) -> list[TunnelInfo]: server_port=7000, expires_at=t["expires_at"], user_id=t.get("user_id"), + status=t.get("status"), ) ) return tunnels diff --git a/packages/prime-tunnel/src/prime_tunnel/exceptions.py b/packages/prime-tunnel/src/prime_tunnel/exceptions.py index e95d2992d..3c034d6ab 100644 --- a/packages/prime-tunnel/src/prime_tunnel/exceptions.py +++ b/packages/prime-tunnel/src/prime_tunnel/exceptions.py @@ -4,20 +4,40 @@ class TunnelError(Exception): pass +class TunnelConnectionError(TunnelError): + """Tunnel connection failure with optional tunnel ID for diagnostics.""" + + def __init__( + self, + message: str | None = None, + *, # keyword-only below for backwards compat + tunnel_id: str | None = None, + ): + self.tunnel_id = tunnel_id + + if message is not None: + msg = message + elif tunnel_id: + msg = f"Tunnel {tunnel_id} is not running" + else: + msg = "Tunnel is not running" + super().__init__(msg) + + class TunnelAuthError(TunnelError): """Authentication failed when registering tunnel.""" pass -class TunnelConnectionError(TunnelError): - """Failed to establish tunnel connection.""" +class TunnelTimeoutError(TunnelError): + """Tunnel operation timed out.""" pass -class TunnelTimeoutError(TunnelError): - """Tunnel operation timed out.""" +class TunnelLimitReachedError(TunnelError): + """Tunnel quota exceeded.""" pass diff --git a/packages/prime-tunnel/src/prime_tunnel/models.py b/packages/prime-tunnel/src/prime_tunnel/models.py index 11ad2626a..5d0210eec 100644 --- a/packages/prime-tunnel/src/prime_tunnel/models.py +++ b/packages/prime-tunnel/src/prime_tunnel/models.py @@ -17,6 +17,7 @@ class TunnelInfo(BaseModel): expires_at: datetime = Field(..., description="Token expiration time") # Optional because create_tunnel response doesn't include user_id user_id: Optional[str] = Field(None, description="Owner user ID") + status: Optional[str] = Field(None, description="Current tunnel status") class Config: from_attributes = True diff --git a/packages/prime-tunnel/src/prime_tunnel/tunnel.py b/packages/prime-tunnel/src/prime_tunnel/tunnel.py index 38de07845..dd00239eb 100644 --- a/packages/prime-tunnel/src/prime_tunnel/tunnel.py +++ b/packages/prime-tunnel/src/prime_tunnel/tunnel.py @@ -1,6 +1,7 @@ import asyncio import fcntl import os +import re import subprocess import threading import time @@ -11,9 +12,49 @@ from prime_tunnel.binary import get_frpc_path from prime_tunnel.core.client import TunnelClient -from prime_tunnel.exceptions import TunnelConnectionError, TunnelError, TunnelTimeoutError +from prime_tunnel.exceptions import ( + TunnelConnectionError, + TunnelError, + TunnelTimeoutError, +) from prime_tunnel.models import TunnelInfo +# timestamp + level + caller prefix + message +_LOG_RE = re.compile( + r"\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3}\s" + r"\[([EWIDT])\]\s" + r"\[.*?\]\s" + r"(?:\[.*?\]\s)*" + r"(.+)" +) +_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") + + +def _parse_frpc_error( + output_lines: list[str], + tunnel_id: str | None = None, + return_code: int | None = None, +) -> TunnelConnectionError: + """Parse frpc log output into a structured tunnel exception.""" + error_messages: list[str] = [] + for raw_line in output_lines: + line = _ANSI_RE.sub("", raw_line) + m = _LOG_RE.match(line) + if not m: + continue + level, msg = m.group(1), m.group(2) + if level in ("E", "W"): + error_messages.append(msg) + + if error_messages: + message = error_messages[-1] + else: + output_text = "\n".join(output_lines) if output_lines else "(no output captured)" + exit_info = f" (exit code {return_code})" if return_code is not None else "" + message = f"frpc process failed{exit_info}: {output_text}" + + return TunnelConnectionError(tunnel_id=tunnel_id, message=message) + class Tunnel: """Tunnel interface for exposing local services.""" @@ -83,7 +124,7 @@ async def start(self) -> str: Raises: TunnelError: If tunnel registration fails - TunnelConnectionError: If frpc fails to connect + TunnelConnectionError: If frpc fails to connect or tunnel is not running TunnelTimeoutError: If connection times out """ if self._started: @@ -101,7 +142,7 @@ async def start(self) -> str: ) except BaseException as e: await self._cleanup() - if isinstance(e, asyncio.CancelledError): + if isinstance(e, (asyncio.CancelledError, TunnelError)): raise raise TunnelError(f"Failed to register tunnel: {e}") from e @@ -126,7 +167,7 @@ async def start(self) -> str: await self._cleanup() if isinstance(e, asyncio.CancelledError): raise - raise TunnelConnectionError(f"Failed to start frpc: {e}") from e + raise TunnelConnectionError(message=f"Failed to start frpc: {e}") from e # 5. Wait for connection try: @@ -142,7 +183,7 @@ async def start(self) -> str: await self._cleanup() if isinstance(e, asyncio.CancelledError): raise - raise TunnelConnectionError(f"Failed to start pipe drain: {e}") from e + raise TunnelConnectionError(message=f"Failed to start pipe drain: {e}") from e self._started = True @@ -242,6 +283,9 @@ async def _cleanup(self) -> None: def recent_output(self) -> list[str]: """Last N lines of frpc output (thread-safe). Falls back to startup output.""" if hasattr(self, "_output_lock"): + if not self.is_running and hasattr(self, "_drain_threads"): + for t in self._drain_threads: + t.join(timeout=2.0) with self._output_lock: return list(self._recent_output) return list(self._output_lines) @@ -256,9 +300,9 @@ def _start_pipe_drain(self) -> None: if self._process is None: return - self._recent_output: list[str] = [] self._output_lock = threading.Lock() max_lines = 50 + self._recent_output: list[str] = list(self._output_lines[-max_lines:]) def drain_pipe(pipe): """Read output from a pipe, retaining recent lines.""" @@ -275,8 +319,11 @@ def drain_pipe(pipe): except (OSError, ValueError): pass # Pipe closed + self._drain_threads: list[threading.Thread] = [] for pipe in (self._process.stdout, self._process.stderr): - threading.Thread(target=drain_pipe, args=(pipe,), daemon=True).start() + t = threading.Thread(target=drain_pipe, args=(pipe,), daemon=True) + t.start() + self._drain_threads.append(t) def _write_frpc_config(self) -> Path: """Generate and write frpc configuration file.""" @@ -341,7 +388,7 @@ async def _wait_for_connection(self) -> None: while time.time() - start_time < self.connection_timeout: if self._process is None: - raise TunnelConnectionError("frpc process not running") + raise TunnelConnectionError(message="frpc process not running") return_code = self._process.poll() if return_code is not None: @@ -352,14 +399,7 @@ async def _wait_for_connection(self) -> None: remaining_output.extend(self._process.stderr.readlines()) self._output_lines.extend(line.strip() for line in remaining_output if line.strip()) - # Build detailed error message - output_text = ( - "\n".join(self._output_lines) if self._output_lines else "(no output captured)" - ) - raise TunnelConnectionError( - f"frpc exited with code {return_code}\n" - f"--- frpc output ---\n{output_text}\n-------------------" - ) + raise _parse_frpc_error(self._output_lines, self.tunnel_id, return_code) if os.name == "posix": # Set both pipes to non-blocking mode to drain them without deadlock @@ -388,12 +428,11 @@ async def _wait_for_connection(self) -> None: # Check for success/failure indicators if "start proxy success" in line.lower(): return - if "login failed" in line.lower(): - raise TunnelConnectionError(f"frpc login failed: {line}") - if "authorization failed" in line.lower(): - raise TunnelConnectionError( - f"frpc authorization failed: {line}" - ) + if ( + "login to the server failed" in line.lower() + or "connect to server error" in line.lower() + ): + raise _parse_frpc_error(self._output_lines, self.tunnel_id) except (BlockingIOError, IOError): pass # No more data available on this pipe finally: diff --git a/packages/prime/src/prime_cli/commands/tunnel.py b/packages/prime/src/prime_cli/commands/tunnel.py index 132e2caee..f99824e54 100644 --- a/packages/prime/src/prime_cli/commands/tunnel.py +++ b/packages/prime/src/prime_cli/commands/tunnel.py @@ -5,6 +5,11 @@ import typer from prime_tunnel import Tunnel from prime_tunnel.core.client import TunnelClient +from prime_tunnel.exceptions import ( + TunnelConnectionError, + TunnelLimitReachedError, + TunnelTimeoutError, +) from rich.console import Console from rich.table import Table @@ -49,8 +54,33 @@ def signal_handler(): console.print(f"\n[dim]Forwarding to localhost:{port}[/dim]") console.print("[dim]Press Ctrl+C to stop the tunnel[/dim]\n") - await shutdown_event.wait() - + # Monitor tunnel health while waiting for shutdown signal + while not shutdown_event.is_set(): + if not tunnel.is_running: + output = "\n".join(tunnel.recent_output) or "(no output captured)" + raise TunnelConnectionError( + message=( + f"Tunnel process exited unexpectedly\n--- frpc output ---\n{output}" + ), + tunnel_id=tunnel.tunnel_id, + ) + try: + await asyncio.wait_for(shutdown_event.wait(), timeout=2.0) + except asyncio.TimeoutError: + pass + + except TunnelConnectionError as e: + console.print(f"\n[red]Tunnel error:[/red] {e}", style="bold") + if e.tunnel_id: + console.print(f"[dim]Tunnel ID: {e.tunnel_id}[/dim]") + raise typer.Exit(1) + except TunnelLimitReachedError as e: + console.print(f"\n[red]Tunnel limit reached:[/red] {e}", style="bold") + console.print("[dim]Delete an existing tunnel before creating a new one.[/dim]") + raise typer.Exit(1) + except TunnelTimeoutError as e: + console.print(f"\n[red]Connection timed out:[/red] {e}", style="bold") + raise typer.Exit(1) except Exception as e: console.print(f"[red]Error:[/red] {e}", style="bold") raise typer.Exit(1) @@ -95,12 +125,24 @@ async def fetch_tunnels(): table = Table(title="Active Tunnels") table.add_column("Tunnel ID", style="cyan") table.add_column("URL", style="green") + table.add_column("Status") table.add_column("Expires At") for tunnel in tunnels: + status_display = tunnel.status or "unknown" + if status_display == "CONNECTED": + status_display = "[green]connected[/green]" + elif status_display == "PENDING": + status_display = "[yellow]pending[/yellow]" + elif status_display == "DISCONNECTED": + status_display = "[red]disconnected[/red]" + elif status_display == "EXPIRED": + status_display = "[dim]expired[/dim]" + table.add_row( tunnel.tunnel_id, tunnel.url, + status_display, str(tunnel.expires_at), ) @@ -133,6 +175,7 @@ async def fetch_status(): console.print(f"[bold]Tunnel ID:[/bold] {tunnel.tunnel_id}") console.print(f"[bold]URL:[/bold] {tunnel.url}") console.print(f"[bold]Hostname:[/bold] {tunnel.hostname}") + console.print(f"[bold]Status:[/bold] {tunnel.status or 'unknown'}") console.print(f"[bold]Expires At:[/bold] {tunnel.expires_at}")