Skip to content
Open
148 changes: 148 additions & 0 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6613,11 +6613,16 @@ def _child_rows() -> list:
get_last_workspace,
set_last_workspace,
git_info_for_workspace,
authorize_escape_target,
EscapeAuthorizationExpiredError,
list_dir,
list_authorized_escape_dir,
dir_signature,
list_workspace_suggestions,
read_file_content,
read_authorized_escape_file_content,
safe_resolve_ws,
raw_authorized_escape_target,
resolve_trusted_workspace,
open_anchored_fd,
open_anchored_create_fd,
Expand Down Expand Up @@ -9821,6 +9826,9 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/list":
return _handle_list_dir(handler, parsed)
Comment thread
rodboev marked this conversation as resolved.
Comment thread
rodboev marked this conversation as resolved.

if parsed.path == "/api/escape/list":
return _handle_escape_list_dir(handler, parsed)

if parsed.path == "/api/git/status":
return _handle_git_status(handler, parsed)

Expand Down Expand Up @@ -9971,12 +9979,18 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/file/raw":
return _handle_file_raw(handler, parsed)

if parsed.path == "/api/escape/file/raw":
return _handle_escape_file_raw(handler, parsed)

if parsed.path == "/api/folder/download":
return _handle_folder_download(handler, parsed)

if parsed.path == "/api/file":
return _handle_file_read(handler, parsed)

if parsed.path == "/api/escape/file/read":
return _handle_escape_file_read(handler, parsed)

if parsed.path == "/api/approval/pending":
return _handle_approval_pending(handler, parsed)

Expand Down Expand Up @@ -10479,6 +10493,9 @@ def handle_post(handler, parsed) -> bool:
diag.finish()
raise

if parsed.path == "/api/escape/authorize":
return _handle_escape_authorize(handler, parsed, body)

if parsed.path == "/api/updates/check":
settings = load_settings()
if not settings.get("check_for_updates", True):
Expand Down Expand Up @@ -12916,6 +12933,137 @@ def _handle_list_dir(handler, parsed):
return bad(handler, _sanitize_error(e), 404)


def _read_json_request_body(handler, *, max_bytes: int = 4096) -> dict:
try:
length = _safe_content_length(handler, max_bytes)
except (ValueError, OverflowError) as exc:
raise ValueError(_sanitize_error(exc)) from exc
raw = handler.rfile.read(length) if length else b"{}"
try:
payload = json.loads(raw.decode("utf-8"))
except Exception as exc:
raise ValueError("invalid JSON body") from exc
return payload if isinstance(payload, dict) else {}


def _handle_escape_authorize(handler, parsed, body: dict | None = None):
if handler.command != "POST":
return bad(handler, "method not allowed", 405)
if not handler.headers.get("Origin"):
return bad(handler, "browser origin required", 403)
if not _check_csrf(handler):
return bad(handler, _csrf_rejection_error(handler), 403)
if body is None:
try:
body = _read_json_request_body(handler)
except ValueError as exc:
return bad(handler, _sanitize_error(exc), 400)
qs = parse_qs(parsed.query)
sid = str(body.get("session_id") or qs.get("session_id", [""])[0] or "").strip()
rel = str(body.get("path") or qs.get("path", [""])[0] or "").strip()
token = str(body.get("token") or qs.get("token", [""])[0] or "").strip()
if token:
return bad(handler, "token must not be provided", 400)
if not sid:
return bad(handler, "session_id is required")
if not rel:
return bad(handler, "path is required")
try:
s = get_session_for_file_ops(sid)
except KeyError:
return bad(handler, "Session not found", 404)
try:
payload = authorize_escape_target(Path(s.workspace), sid, rel)
except ValueError as exc:
return bad(handler, _sanitize_error(exc), 404)
return j(handler, payload)


def _handle_escape_list_dir(handler, parsed):
qs = parse_qs(parsed.query)
sid = qs.get("session_id", [""])[0]
token = qs.get("token", [""])[0]
if not sid:
return bad(handler, "session_id is required")
if not token:
return bad(handler, "token is required")
try:
s = get_session_for_file_ops(sid)
except KeyError:
return bad(handler, "Session not found", 404)
rel_path = qs.get("path", ["."])[0]
try:
payload = list_authorized_escape_dir(Path(s.workspace), sid, token, rel_path)
return j(handler, payload)
except FileNotFoundError as exc:
return bad(handler, _sanitize_error(exc), 404)
except EscapeAuthorizationExpiredError as exc:
return bad(handler, _sanitize_error(exc), 403)
except ValueError as exc:
return bad(handler, _sanitize_error(exc), 404)


def _handle_escape_file_read(handler, parsed):
qs = parse_qs(parsed.query)
sid = qs.get("session_id", [""])[0]
token = qs.get("token", [""])[0]
if not sid:
return bad(handler, "session_id is required")
if not token:
return bad(handler, "token is required")
try:
s = get_session_for_file_ops(sid)
except KeyError:
return bad(handler, "Session not found", 404)
rel = qs.get("path", [""])[0]
try:
return j(handler, read_authorized_escape_file_content(Path(s.workspace), sid, token, rel))
except FileNotFoundError as exc:
return bad(handler, _sanitize_error(exc), 404)
except EscapeAuthorizationExpiredError as exc:
return bad(handler, _sanitize_error(exc), 403)
except ValueError as exc:
return bad(handler, _sanitize_error(exc), 404)


def _handle_escape_file_raw(handler, parsed):
qs = parse_qs(parsed.query)
sid = qs.get("session_id", [""])[0]
token = qs.get("token", [""])[0]
if not sid:
return bad(handler, "session_id is required")
if not token:
return bad(handler, "token is required")
try:
s = get_session_for_file_ops(sid)
except KeyError:
return bad(handler, "Session not found", 404)
rel = qs.get("path", [""])[0]
force_download = qs.get("download", [""])[0] == "1"
try:
anchor_root, target = raw_authorized_escape_target(Path(s.workspace), sid, token, rel)
except FileNotFoundError:
return j(handler, {"error": "not found"}, status=404)
except EscapeAuthorizationExpiredError as exc:
return bad(handler, _sanitize_error(exc), 403)
except ValueError as exc:
return bad(handler, _sanitize_error(exc), 404)
if not target.exists() or not target.is_file():
return j(handler, {"error": "not found"}, status=404)
ext = target.suffix.lower()
mime = MIME_MAP.get(ext, "application/octet-stream")
inline_preview = qs.get("inline", [""])[0] == "1"
dangerous_types = {"text/html", "application/xhtml+xml", "image/svg+xml"}
html_inline_ok = inline_preview and mime == "text/html"
disposition = "attachment" if force_download or (mime in dangerous_types and not html_inline_ok) else "inline"
sandbox_csp = "sandbox allow-scripts allow-popups allow-popups-to-escape-sandbox"
# Content-Security-Policy sandboxing is carried through the csp=sandbox_csp handoff below.
csp = sandbox_csp if (inline_preview and not force_download and disposition == "inline") else None
if html_inline_ok:
return _serve_inline_html_preview(handler, target, "no-store", csp=sandbox_csp, anchor_root=anchor_root)
return _serve_file_bytes(handler, target, mime, disposition, "no-store", csp=csp, anchor_root=anchor_root)


def _sse_with_id(handler, event, data, event_id=None):
if event_id:
handler.wfile.write(f"id: {event_id}\n".encode("utf-8"))
Expand Down
Loading
Loading