Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/prime-tunnel/src/prime_tunnel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from prime_tunnel.core import Config, TunnelClient
from prime_tunnel.exceptions import (
BinaryDownloadError,
TunnelAuthError,
TunnelConnectionError,
TunnelError,
TunnelLimitReachedError,
TunnelTimeoutError,
)
from prime_tunnel.models import TunnelInfo
Expand All @@ -22,8 +24,10 @@
# Models
"TunnelInfo",
# Exceptions
"TunnelError",
"BinaryDownloadError",
"TunnelAuthError",
"TunnelError",
"TunnelLimitReachedError",
"TunnelConnectionError",
"TunnelTimeoutError",
]
28 changes: 27 additions & 1 deletion packages/prime-tunnel/src/prime_tunnel/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
)

from prime_tunnel.core.config import Config
from prime_tunnel.exceptions import TunnelAuthError, TunnelError, TunnelTimeoutError
from prime_tunnel.exceptions import (
TunnelAuthError,
TunnelError,
TunnelLimitReachedError,
TunnelTimeoutError,
)
from prime_tunnel.models import TunnelInfo

# Retry configuration for transient connection errors
Expand Down Expand Up @@ -135,6 +140,15 @@ async def _handle_response(self, response: httpx.Response, operation: str) -> Di
raise TunnelAuthError("Payment required. Check billing status.")
elif response.status_code == 404:
return {} # Handle 404 specially in callers
elif response.status_code == 400:
try:
error_detail = response.json().get("detail", response.text)
except Exception:
error_detail = response.text
error_detail = str(error_detail)
if "maximum number of" in error_detail.lower():
Comment thread
kcoopermiller marked this conversation as resolved.
raise TunnelLimitReachedError(error_detail)
raise TunnelError(f"Failed to {operation}: {error_detail}")
elif response.status_code >= 400:
try:
error_detail = response.json().get("detail", response.text)
Expand Down Expand Up @@ -184,6 +198,8 @@ async def create_tunnel(
response = await self._request_with_retry("POST", url, json=payload)
except httpx.TimeoutException as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except TimeoutError as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except httpx.RequestError as e:
raise TunnelError(f"Failed to connect to API: {e}") from e

Expand Down Expand Up @@ -211,6 +227,8 @@ async def get_tunnel(self, tunnel_id: str) -> Optional[TunnelInfo]:
response = await self._idempotent_request_with_retry("GET", url)
except httpx.TimeoutException as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except TimeoutError as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except httpx.RequestError as e:
raise TunnelError(f"Failed to connect to API: {e}") from e

Expand All @@ -227,6 +245,7 @@ async def get_tunnel(self, tunnel_id: str) -> Optional[TunnelInfo]:
server_port=7000,
expires_at=data["expires_at"],
user_id=data.get("user_id"),
status=data.get("status"),
)

async def delete_tunnel(self, tunnel_id: str) -> bool:
Expand All @@ -247,6 +266,8 @@ async def delete_tunnel(self, tunnel_id: str) -> bool:
response = await self._idempotent_request_with_retry("DELETE", url)
except httpx.TimeoutException as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except TimeoutError as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except httpx.RequestError as e:
raise TunnelError(f"Failed to connect to API: {e}") from e

Expand All @@ -267,6 +288,8 @@ async def bulk_delete_tunnels(self, tunnel_ids: list[str]) -> dict:
response = await self._idempotent_request_with_retry("DELETE", url, json=payload)
except httpx.TimeoutException as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except TimeoutError as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except httpx.RequestError as e:
raise TunnelError(f"Failed to connect to API: {e}") from e

Expand Down Expand Up @@ -294,6 +317,8 @@ async def list_tunnels(self, team_id: Optional[str] = None) -> list[TunnelInfo]:
response = await self._idempotent_request_with_retry("GET", url, params=params)
except httpx.TimeoutException as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except TimeoutError as e:
raise TunnelTimeoutError(f"Request timed out: {e}") from e
except httpx.RequestError as e:
raise TunnelError(f"Failed to connect to API: {e}") from e

Expand All @@ -310,6 +335,7 @@ async def list_tunnels(self, team_id: Optional[str] = None) -> list[TunnelInfo]:
server_port=7000,
expires_at=t["expires_at"],
user_id=t.get("user_id"),
status=t.get("status"),
)
)
return tunnels
Expand Down
28 changes: 24 additions & 4 deletions packages/prime-tunnel/src/prime_tunnel/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,40 @@ class TunnelError(Exception):
pass


class TunnelConnectionError(TunnelError):
"""Tunnel connection failure with optional tunnel ID for diagnostics."""

def __init__(
self,
message: str | None = None,
*, # keyword-only below for backwards compat
tunnel_id: str | None = None,
):
self.tunnel_id = tunnel_id

if message is not None:
msg = message
elif tunnel_id:
msg = f"Tunnel {tunnel_id} is not running"
else:
msg = "Tunnel is not running"
super().__init__(msg)


class TunnelAuthError(TunnelError):
"""Authentication failed when registering tunnel."""

pass


class TunnelConnectionError(TunnelError):
"""Failed to establish tunnel connection."""
class TunnelTimeoutError(TunnelError):
"""Tunnel operation timed out."""

pass


class TunnelTimeoutError(TunnelError):
"""Tunnel operation timed out."""
class TunnelLimitReachedError(TunnelError):
"""Tunnel quota exceeded."""

pass

Expand Down
1 change: 1 addition & 0 deletions packages/prime-tunnel/src/prime_tunnel/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TunnelInfo(BaseModel):
expires_at: datetime = Field(..., description="Token expiration time")
# Optional because create_tunnel response doesn't include user_id
user_id: Optional[str] = Field(None, description="Owner user ID")
status: Optional[str] = Field(None, description="Current tunnel status")

class Config:
from_attributes = True
83 changes: 61 additions & 22 deletions packages/prime-tunnel/src/prime_tunnel/tunnel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import fcntl
import os
import re
import subprocess
import threading
import time
Expand All @@ -11,9 +12,49 @@

from prime_tunnel.binary import get_frpc_path
from prime_tunnel.core.client import TunnelClient
from prime_tunnel.exceptions import TunnelConnectionError, TunnelError, TunnelTimeoutError
from prime_tunnel.exceptions import (
TunnelConnectionError,
TunnelError,
TunnelTimeoutError,
)
from prime_tunnel.models import TunnelInfo

# timestamp + level + caller prefix + message
_LOG_RE = re.compile(
r"\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3}\s"
r"\[([EWIDT])\]\s"
r"\[.*?\]\s"
r"(?:\[.*?\]\s)*"
r"(.+)"
)
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")


def _parse_frpc_error(
output_lines: list[str],
tunnel_id: str | None = None,
return_code: int | None = None,
) -> TunnelConnectionError:
"""Parse frpc log output into a structured tunnel exception."""
error_messages: list[str] = []
for raw_line in output_lines:
line = _ANSI_RE.sub("", raw_line)
m = _LOG_RE.match(line)
if not m:
continue
level, msg = m.group(1), m.group(2)
if level in ("E", "W"):
error_messages.append(msg)

if error_messages:
message = error_messages[-1]
else:
output_text = "\n".join(output_lines) if output_lines else "(no output captured)"
exit_info = f" (exit code {return_code})" if return_code is not None else ""
message = f"frpc process failed{exit_info}: {output_text}"

return TunnelConnectionError(tunnel_id=tunnel_id, message=message)


class Tunnel:
"""Tunnel interface for exposing local services."""
Expand Down Expand Up @@ -83,7 +124,7 @@ async def start(self) -> str:

Raises:
TunnelError: If tunnel registration fails
TunnelConnectionError: If frpc fails to connect
TunnelConnectionError: If frpc fails to connect or tunnel is not running
TunnelTimeoutError: If connection times out
"""
if self._started:
Expand All @@ -101,7 +142,7 @@ async def start(self) -> str:
)
except BaseException as e:
await self._cleanup()
if isinstance(e, asyncio.CancelledError):
if isinstance(e, (asyncio.CancelledError, TunnelError)):
raise
raise TunnelError(f"Failed to register tunnel: {e}") from e

Expand All @@ -126,7 +167,7 @@ async def start(self) -> str:
await self._cleanup()
if isinstance(e, asyncio.CancelledError):
raise
raise TunnelConnectionError(f"Failed to start frpc: {e}") from e
raise TunnelConnectionError(message=f"Failed to start frpc: {e}") from e

# 5. Wait for connection
try:
Expand All @@ -142,7 +183,7 @@ async def start(self) -> str:
await self._cleanup()
if isinstance(e, asyncio.CancelledError):
raise
raise TunnelConnectionError(f"Failed to start pipe drain: {e}") from e
raise TunnelConnectionError(message=f"Failed to start pipe drain: {e}") from e

self._started = True

Expand Down Expand Up @@ -242,6 +283,9 @@ async def _cleanup(self) -> None:
def recent_output(self) -> list[str]:
"""Last N lines of frpc output (thread-safe). Falls back to startup output."""
if hasattr(self, "_output_lock"):
if not self.is_running and hasattr(self, "_drain_threads"):
for t in self._drain_threads:
t.join(timeout=2.0)
with self._output_lock:
return list(self._recent_output)
return list(self._output_lines)
Expand All @@ -256,9 +300,9 @@ def _start_pipe_drain(self) -> None:
if self._process is None:
return

self._recent_output: list[str] = []
self._output_lock = threading.Lock()
max_lines = 50
self._recent_output: list[str] = list(self._output_lines[-max_lines:])

def drain_pipe(pipe):
"""Read output from a pipe, retaining recent lines."""
Expand All @@ -275,8 +319,11 @@ def drain_pipe(pipe):
except (OSError, ValueError):
pass # Pipe closed

self._drain_threads: list[threading.Thread] = []
for pipe in (self._process.stdout, self._process.stderr):
threading.Thread(target=drain_pipe, args=(pipe,), daemon=True).start()
t = threading.Thread(target=drain_pipe, args=(pipe,), daemon=True)
t.start()
self._drain_threads.append(t)

def _write_frpc_config(self) -> Path:
"""Generate and write frpc configuration file."""
Expand Down Expand Up @@ -341,7 +388,7 @@ async def _wait_for_connection(self) -> None:

while time.time() - start_time < self.connection_timeout:
if self._process is None:
raise TunnelConnectionError("frpc process not running")
raise TunnelConnectionError(message="frpc process not running")

return_code = self._process.poll()
if return_code is not None:
Expand All @@ -352,14 +399,7 @@ async def _wait_for_connection(self) -> None:
remaining_output.extend(self._process.stderr.readlines())
self._output_lines.extend(line.strip() for line in remaining_output if line.strip())

# Build detailed error message
output_text = (
"\n".join(self._output_lines) if self._output_lines else "(no output captured)"
)
raise TunnelConnectionError(
f"frpc exited with code {return_code}\n"
f"--- frpc output ---\n{output_text}\n-------------------"
)
raise _parse_frpc_error(self._output_lines, self.tunnel_id, return_code)

if os.name == "posix":
# Set both pipes to non-blocking mode to drain them without deadlock
Expand Down Expand Up @@ -388,12 +428,11 @@ async def _wait_for_connection(self) -> None:
# Check for success/failure indicators
if "start proxy success" in line.lower():
return
if "login failed" in line.lower():
raise TunnelConnectionError(f"frpc login failed: {line}")
if "authorization failed" in line.lower():
raise TunnelConnectionError(
f"frpc authorization failed: {line}"
)
if (
"login to the server failed" in line.lower()
or "connect to server error" in line.lower()
):
raise _parse_frpc_error(self._output_lines, self.tunnel_id)
except (BlockingIOError, IOError):
pass # No more data available on this pipe
finally:
Expand Down
Loading
Loading