Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 212 additions & 20 deletions backend/app/api/routes/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

This module provides endpoints for:
- Starting new evaluations (POST /api/evaluate)
- Starting anonymous public evaluations (POST /api/evaluate/public)
- Streaming evaluation progress (GET /api/evaluate/{evaluation_id}/stream)
- Getting evaluation results (GET /api/evaluate/{evaluation_id}/result)
"""
Expand All @@ -11,13 +12,17 @@
import asyncio
import json
import logging
import time
import traceback
from typing import Any

from fastapi import APIRouter, Depends
from cachetools import TTLCache
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field

from app.api.deps import get_current_user, get_current_user_token, get_optional_user
from app.core.config import settings
from app.core.exceptions import CorkedError, EmptyCellarError
from app.services.evaluation_service import (
get_evaluation_progress,
Expand All @@ -32,6 +37,7 @@
SommelierProgressEvent,
create_sommelier_event,
)
from app.services.github_service import verify_public_repo
from app.services.task_registry import register_task
from app.services.quota import check_quota
from app.database.repositories.api_key import APIKeyRepository
Expand All @@ -40,6 +46,58 @@

router = APIRouter(prefix="/evaluate", tags=["evaluate"])

_ANONYMOUS_RATE_LIMIT = 5
_ANONYMOUS_RATE_WINDOW = 3600
# TTL cache with max 10000 entries, auto-expiring after 2x window to handle edge cases
_anonymous_rate_limit_store: TTLCache = TTLCache(
maxsize=10000, ttl=_ANONYMOUS_RATE_WINDOW * 2
)


def _check_anonymous_rate_limit(client_ip: str) -> bool:
"""Check if the client IP has exceeded the anonymous evaluation rate limit.

Uses a TTL-based cache to prevent unbounded memory growth.

Args:
client_ip: The client's IP address.

Returns:
True if the request is allowed, False if rate limited.
"""
now = time.time()
window_start = now - _ANONYMOUS_RATE_WINDOW
timestamps = _anonymous_rate_limit_store.get(client_ip, [])
timestamps = [ts for ts in timestamps if ts > window_start]

if len(timestamps) >= _ANONYMOUS_RATE_LIMIT:
_anonymous_rate_limit_store[client_ip] = timestamps
return False

timestamps.append(now)
_anonymous_rate_limit_store[client_ip] = timestamps
return True


def _get_client_ip(request: Request) -> str:
"""Extract the client IP address from the request.

Only trusts X-Forwarded-For header when TRUSTED_PROXY is enabled in settings.
This prevents IP spoofing attacks to bypass rate limiting.

Args:
request: The FastAPI request object.

Returns:
The client's IP address.
"""
# Only trust X-Forwarded-For when running behind a trusted proxy
if getattr(settings, "TRUSTED_PROXY", False):
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class EvaluateRequest(BaseModel):
"""Request model for starting an evaluation."""
Expand Down Expand Up @@ -214,16 +272,127 @@ async def run_in_background():
raise e
except Exception as e:
logger.error(f"[Evaluate] Exception: {type(e).__name__}: {str(e)}")
import traceback

logger.error(f"[Evaluate] Traceback: {traceback.format_exc()}")
raise CorkedError(f"Failed to start evaluation: {e!s}") from e


@router.post("/public", response_model=EvaluateResponse)
async def create_public_evaluation(
request: EvaluateRequest,
req: Request,
) -> EvaluateResponse:
"""Start anonymous evaluation for public repositories.

This endpoint allows unauthenticated users to evaluate public GitHub repositories.
Constraints:
- Only six_sommeliers mode allowed
- Only public GitHub repos (verified server-side)
- Forced model: gemini-3-flash-preview
- No BYOK/provider/model/temperature overrides
- Rate limited by IP (5 evaluations per hour)

Args:
request: The evaluation request containing repo_url and criteria.
req: The FastAPI request object for IP-based rate limiting.

Returns:
EvaluateResponse with evaluation_id and status.

Raises:
CorkedError: If rate limit exceeded, repo is private, or evaluation fails.
"""
client_ip = _get_client_ip(req)
logger.info(
f"[Evaluate Public] Request from {client_ip}: repo_url={request.repo_url}"
)

if not _check_anonymous_rate_limit(client_ip):
logger.warning(f"[Evaluate Public] Rate limit exceeded for IP: {client_ip}")
raise CorkedError(
"Rate limit exceeded. Please try again later or login for unlimited access."
)

await verify_public_repo(request.repo_url)

try:
eval_id = await start_evaluation(
repo_url=request.repo_url,
criteria=request.criteria,
user_id="anonymous",
custom_criteria=None, # Anonymous users cannot use custom criteria
evaluation_mode="six_sommeliers",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

event_channel = get_event_channel()
await event_channel.create_channel(eval_id)

async def run_in_background():
try:
await run_evaluation_pipeline_with_events(
evaluation_id=eval_id,
repo_url=request.repo_url,
criteria=request.criteria,
user_id="anonymous",
evaluation_mode="six_sommeliers",
provider="gemini",
model="gemini-3-flash-preview",
temperature=None,
api_key=None,
github_token=None,
)
except Exception as e:
logger.exception(f"Background evaluation failed: {eval_id}")
error_msg = str(e)
if "Resource not found" in error_msg or "404" in error_msg:
user_message = "Repository not found or is private. Please check the URL and try again."
elif "rate limit" in error_msg.lower():
user_message = (
"GitHub API rate limit exceeded. Please try again later."
)
else:
user_message = f"Evaluation failed: {error_msg}"
await event_channel.emit(
eval_id,
create_sommelier_event(
evaluation_id=eval_id,
sommelier="system",
event_type=EventType.EVALUATION_ERROR.value,
progress_percent=-1,
message=user_message,
),
)
await handle_evaluation_error(eval_id, error_msg)
finally:
await event_channel.close_channel(eval_id)

try:
task = asyncio.create_task(run_in_background())
await register_task(eval_id, task)
except Exception as e:
await event_channel.close_channel(eval_id)
raise CorkedError(f"Failed to start background task: {e!s}") from e

logger.info(f"[Evaluate Public] Background task started: {eval_id}")

return EvaluateResponse(
evaluation_id=eval_id,
status="pending",
evaluation_mode="six_sommeliers",
estimated_time=30,
)
except CorkedError as e:
logger.error(f"[Evaluate Public] CorkedError: {e.detail}")
raise e
except Exception as e:
logger.error(f"[Evaluate Public] Exception: {type(e).__name__}: {str(e)}")
logger.error(f"[Evaluate Public] Traceback: {traceback.format_exc()}")
raise CorkedError(f"Failed to start evaluation: {e!s}") from e
Comment on lines +279 to +389

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication between the create_public_evaluation function and the existing create_evaluation function. The logic for creating an event channel, defining the run_in_background coroutine, creating and registering the background task, and handling exceptions is nearly identical. This duplication increases maintenance overhead. I recommend refactoring the common logic into a shared helper function that both endpoints can call with their specific parameters.



@router.get("/{evaluation_id}/stream")
async def stream_evaluation(
evaluation_id: str,
user=Depends(get_current_user),
user=Depends(get_optional_user),
) -> Any:
"""Stream evaluation progress via Server-Sent Events.

Expand All @@ -235,13 +404,26 @@ async def stream_evaluation(
- evaluation_complete: When the entire evaluation is finished
- evaluation_error: When the entire evaluation fails
- heartbeat: Keep-alive signal (every 30 seconds)

Anonymous evaluations (user_id == "anonymous") can be accessed without
authentication. Authenticated users can only access their own evaluations.
"""
try:
progress = await get_evaluation_progress(evaluation_id)
except EmptyCellarError:
raise EmptyCellarError(f"Evaluation not found: {evaluation_id}") from None

if progress.get("user_id") != user.id:
eval_user_id = progress.get("user_id")

# Access control logic:
# 1. Anonymous evaluations are publicly accessible (no auth required)
# 2. Authenticated users can access their own evaluations
# 3. Authenticated users can also view anonymous evaluations
if eval_user_id == "anonymous":
pass # Public access allowed
elif user is None:
raise CorkedError("Authentication required to view this evaluation")
elif user.id != eval_user_id:
raise CorkedError("Access denied: evaluation belongs to another user")
Comment thread
coderabbitai[bot] marked this conversation as resolved.

event_channel = get_event_channel()
Expand Down Expand Up @@ -304,12 +486,12 @@ async def get_result(
This endpoint returns the complete evaluation results including
the final verdict from Jean-Pierre and all sommelier outputs.

Public demo evaluations (in PUBLIC_DEMO_EVALUATIONS) can be accessed
without authentication.
Public demo evaluations (in PUBLIC_DEMO_EVALUATIONS) and anonymous
evaluations (user_id == "anonymous") can be accessed without authentication.

Args:
evaluation_id: The evaluation ID.
user: The authenticated user (optional for public demos).
user: The authenticated user (optional for public access).

Returns:
The evaluation results.
Expand All @@ -318,24 +500,34 @@ async def get_result(
EmptyCellarError: If the evaluation is not found.
CorkedError: If the evaluation is still in progress.
"""
# Check if this is a public demo evaluation
is_public_demo = evaluation_id in PUBLIC_DEMO_EVALUATIONS

# Require auth for non-public evaluations
if not is_public_demo and user is None:
raise CorkedError("Authentication required to view this evaluation")
if is_public_demo:
result = await get_evaluation_result(evaluation_id)
if result is None:
raise EmptyCellarError(f"Evaluation result not found: {evaluation_id}")
return ResultResponse(
evaluation_id=result["evaluation_id"],
final_evaluation=result["final_evaluation"],
created_at=str(result["created_at"]),
)

try:
if not is_public_demo:
progress = await get_evaluation_progress(evaluation_id)
if progress.get("user_id") != user.id:
raise CorkedError("Access denied: evaluation belongs to another user")

result = await get_evaluation_result(evaluation_id)
progress = await get_evaluation_progress(evaluation_id)
except EmptyCellarError:
raise EmptyCellarError(f"Evaluation not found: {evaluation_id}") from None
except CorkedError:
raise

eval_user_id = progress.get("user_id")

# Access control: same logic as stream_evaluation
if eval_user_id == "anonymous":
pass # Public access allowed
elif user is None:
raise CorkedError("Authentication required to view this evaluation")
elif user.id != eval_user_id:
raise CorkedError("Access denied: evaluation belongs to another user")
Comment thread
coderabbitai[bot] marked this conversation as resolved.

result = await get_evaluation_result(evaluation_id)

if result is None:
raise EmptyCellarError(f"Evaluation result not found: {evaluation_id}")
Expand Down
Loading