diff --git a/backend/src/analytics_agent/agent/graph.py b/backend/src/analytics_agent/agent/graph.py index 3f48ca1..5521cec 100644 --- a/backend/src/analytics_agent/agent/graph.py +++ b/backend/src/analytics_agent/agent/graph.py @@ -43,6 +43,7 @@ def build_graph( enabled_mutations: set[str] | None = None, context_tools: list | None = None, # pre-built from DB context platforms at request time engine_tools: list | None = None, # pre-built for MCP data sources (bypasses QueryEngine) + suppress_business_context_skill: bool = False, ): from analytics_agent.agent.chart_generator import chart_node from analytics_agent.engines.factory import get_registry @@ -51,6 +52,8 @@ def build_graph( llm = get_llm(streaming=True) from analytics_agent.agent.chart_tool import create_chart + from analytics_agent.agent.proposals_tool import present_proposals + from analytics_agent.agent.results_tool import report_proposal_results # Context platform tools — built dynamically from DB at request time. # Falls back to env-var based build only when caller doesn't provide them. @@ -64,7 +67,10 @@ def build_graph( # Always-on skills (context search etc.) + opt-in write-back skills from analytics_agent.skills.loader import build_always_on_skill_tools, build_skill_tools - skill_tools = build_always_on_skill_tools() + build_skill_tools(enabled_mutations or set()) + always_on_skills = build_always_on_skill_tools() + if suppress_business_context_skill: + always_on_skills = [t for t in always_on_skills if t.name != "search_business_context"] + skill_tools = always_on_skills + build_skill_tools(enabled_mutations or set()) # Engine tools — MCP data sources supply pre-built tools; native engines use QueryEngine if engine_tools is not None: @@ -77,7 +83,9 @@ def build_graph( raise ValueError(f"Engine '{engine_name}' not found.") engine_tools = [t for t in engine.get_tools() if t.name not in disabled] chart_tools = [] if "create_chart" in disabled else [create_chart] - all_tools = datahub_tools + skill_tools + engine_tools + chart_tools + proposal_tools = [] if "present_proposals" in disabled else [present_proposals] + results_tools = [] if "report_proposal_results" in disabled else [report_proposal_results] + all_tools = datahub_tools + skill_tools + engine_tools + chart_tools + proposal_tools + results_tools if system_prompt_override: from analytics_agent.skills.loader import ( @@ -87,12 +95,17 @@ def build_graph( ) system_prompt = system_prompt_override.format(engine_name=engine_name) - system_prompt += get_search_business_context_section() + if not suppress_business_context_skill: + system_prompt += get_search_business_context_section() system_prompt += get_improve_context_prompt_section() if enabled_mutations: system_prompt += get_skill_system_prompt_section(enabled_mutations) else: - system_prompt = build_system_prompt(engine_name, enabled_skills=enabled_mutations) + system_prompt = build_system_prompt( + engine_name, + enabled_skills=enabled_mutations, + include_business_context=not suppress_business_context_skill, + ) # Enable per-tool error handling so validation errors (e.g. hallucinated # arguments like filter= on get_entities) are returned as tool messages diff --git a/backend/src/analytics_agent/agent/history.py b/backend/src/analytics_agent/agent/history.py index e708178..d7b580c 100644 --- a/backend/src/analytics_agent/agent/history.py +++ b/backend/src/analytics_agent/agent/history.py @@ -74,14 +74,23 @@ def build_history( "input": payload.get("tool_input", {}), } ) - elif evt in ("TOOL_RESULT", "SQL"): + elif evt in ("TOOL_RESULT", "SQL", "PROPOSALS", "PROPOSAL_RESULTS"): idx = len(tool_results) call_id = tool_calls[idx]["id"] if idx < len(tool_calls) else msg.id + if evt == "PROPOSALS": + result_text = orjson.dumps(payload).decode()[:4000] + tool_name = "present_proposals" + elif evt == "PROPOSAL_RESULTS": + result_text = orjson.dumps(payload).decode()[:4000] + tool_name = "report_proposal_results" + else: + result_text = payload.get("result", payload.get("sql", ""))[:4000] + tool_name = payload.get("tool_name", "") tool_results.append( { "id": call_id, - "name": payload.get("tool_name", ""), - "result": payload.get("result", payload.get("sql", ""))[:4000], + "name": tool_name, + "result": result_text, } ) elif evt == "TEXT": diff --git a/backend/src/analytics_agent/agent/mcp_app_tool.py b/backend/src/analytics_agent/agent/mcp_app_tool.py new file mode 100644 index 0000000..d18833a --- /dev/null +++ b/backend/src/analytics_agent/agent/mcp_app_tool.py @@ -0,0 +1,37 @@ +"""Side-channel for MCP App tool results. + +Mirrors the _pending_charts pattern in chart_tool.py: +- The wrapped tool returns a short marker string (MCP_APP_READY:). +- The actual structured payload lives here, keyed by app_id. +- streaming.py pops from this dict when it sees the marker in on_tool_end. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class PendingApp: + app_id: str + connection_key: str + server_name: str + tool_name: str + tool_input: dict + # Structured CallToolResult content (list of content blocks), preserved so + # the frontend can forward it verbatim as `ui/notifications/tool-result` + # params per the MCP Apps spec. + tool_result: Any + resource_uri: str + csp: str | None = None + permissions: list[str] = field(default_factory=list) + # Tool names scoped to this app's connection that the iframe is allowed to + # call via the Phase 2 tool-proxy endpoint. Populated at wrap time from the + # full tool list for the originating connection_key. Persisted in the MCP_APP + # SSE payload so the endpoint can rehydrate it from the DB row. + allowed_tools: list[str] = field(default_factory=list) + + +# Keyed by app_id; popped once streaming.py emits the MCP_APP SSE event. +_pending_apps: dict[str, PendingApp] = {} diff --git a/backend/src/analytics_agent/agent/proposals_tool.py b/backend/src/analytics_agent/agent/proposals_tool.py new file mode 100644 index 0000000..ad8212c --- /dev/null +++ b/backend/src/analytics_agent/agent/proposals_tool.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import logging +import uuid +from typing import Literal + +from langchain_core.tools import tool +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +# Side-channel: keyed by prop_id so streaming.py can fetch the payload +# without the model ever seeing the full JSON. +_pending_proposals: dict[str, dict] = {} + + +class ProposalItem(BaseModel): + id: str + kind: Literal["new_doc", "update_doc", "fix_description"] + title: str + detail: str + target: dict | None = None # e.g. {"urn": "...", "field_path": "..."} + + +@tool +async def present_proposals( + prompt: str, + proposals: list[dict], +) -> str: + """ + Present a list of improvement proposals to the user for review and selection. + Call this at the end of Step 3 of the /improve-context workflow, after drafting + proposals. The UI will render a card with checkboxes — do NOT print a markdown + list yourself. + + Args: + prompt: Short framing sentence shown above the proposals + (e.g. "I found 3 improvements based on our conversation.") + proposals: List of proposal dicts, each with: + - id: unique string identifier (e.g. "1", "2", "3") + - kind: one of "new_doc", "update_doc", "fix_description" + - title: short title for the proposal + - detail: 1-2 sentence description of what to add/change + - target: optional dict with "urn" and/or "field_path" for existing entities + + Example: + present_proposals( + prompt="Based on our conversation, here are 3 documentation improvements:", + proposals=[ + {"id": "1", "kind": "new_doc", "title": "Revenue Metrics Guide", + "detail": "Define net ARR vs gross ARR and specify the revenue table."}, + {"id": "2", "kind": "fix_description", "title": "orders.status column", + "detail": "Current description is empty. Values: pending, confirmed, shipped.", + "target": {"urn": "urn:li:dataset:...", "field_path": "status"}}, + ] + ) + """ + try: + validated = [ProposalItem(**p) for p in proposals] + except Exception as e: + return f"present_proposals: invalid proposals format — {e}" + + prop_id = str(uuid.uuid4()) + _pending_proposals[prop_id] = { + "prompt": prompt, + "proposals": [p.model_dump() for p in validated], + } + + return ( + f"PROPOSALS_READY:{prop_id} " + f"({len(validated)} proposals; awaiting user selection)" + ) diff --git a/backend/src/analytics_agent/agent/results_tool.py b/backend/src/analytics_agent/agent/results_tool.py new file mode 100644 index 0000000..df97d0f --- /dev/null +++ b/backend/src/analytics_agent/agent/results_tool.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import logging +import uuid +from typing import Literal + +from langchain_core.tools import tool +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +# Side-channel: keyed by result_id so streaming.py can fetch the payload +# without the model ever seeing the full JSON. +_pending_results: dict[str, dict] = {} + + +class ProposalResultItem(BaseModel): + id: str + kind: Literal["new_doc", "update_doc", "fix_description"] + title: str + status: Literal["success", "error"] + urn: str | None = None + error: str | None = None + + +@tool +async def report_proposal_results( + results: list[dict], +) -> str: + """ + Report the outcomes of writing approved proposals back to DataHub. + Call this ONCE after all save_correction calls have completed in Step 5 + of the /improve-context workflow. The UI will render a results card — + do NOT write any additional summary text after calling this tool. + + Args: + results: List of result dicts, each with: + - id: proposal id (matches the id from present_proposals) + - kind: one of "new_doc", "update_doc", "fix_description" + - title: proposal title + - status: "success" or "error" + - urn: the URN of the created/updated entity (set on success) + - error: error message (set on error) + + Example: + report_proposal_results(results=[ + {"id": "1", "kind": "new_doc", "title": "Revenue Metrics Guide", + "status": "success", "urn": "urn:li:corpUser:..."}, + {"id": "3", "kind": "fix_description", "title": "orders.status column", + "status": "error", "error": "Permission denied"}, + ]) + """ + try: + validated = [ProposalResultItem(**r) for r in results] + except Exception as e: + return f"report_proposal_results: invalid results format — {e}" + + result_id = str(uuid.uuid4()) + _pending_results[result_id] = { + "results": [r.model_dump() for r in validated], + } + + successes = sum(1 for r in validated if r.status == "success") + return ( + f"RESULTS_READY:{result_id} " + f"({successes}/{len(validated)} succeeded)" + ) diff --git a/backend/src/analytics_agent/agent/streaming.py b/backend/src/analytics_agent/agent/streaming.py index da01197..ba94a37 100644 --- a/backend/src/analytics_agent/agent/streaming.py +++ b/backend/src/analytics_agent/agent/streaming.py @@ -67,6 +67,18 @@ async def stream_graph_events( final_text_parts: list[str] = [] final_state: dict[str, Any] = {} chart_emitted = False # guard against double-emitting CHART + # Once report_proposal_results emits, the PROPOSAL_RESULTS card IS the summary + # for the remainder of the turn — suppress any trailing model text so the user + # doesn't see the card AND a redundant narrative explanation underneath. + suppress_trailing_text = False + + # Track active tool runs by run_id -> tool_name so we can detect when a + # wrapped tool (e.g. MCP UI wrapper) invokes an inner tool of the same name. + # LangGraph v2 fires on_tool_start/on_tool_end for both the outer StructuredTool + # and the inner delegated tool, which would otherwise double-record TOOL_CALL + # and emit a redundant TOOL_RESULT alongside the MCP_APP bubble. + active_tool_runs: dict[str, str] = {} + suppressed_tool_runs: set[str] = set() try: from analytics_agent.config import settings as _settings @@ -78,22 +90,29 @@ async def stream_graph_events( data: dict[str, Any] = event.get("data", {}) name: str = event.get("name", "") run_id: str = event.get("run_id", "") + parent_ids: list[str] = event.get("parent_ids", []) or [] node: str = event.get("metadata", {}).get("langgraph_node", "") # ── TEXT ── if event_type == "on_chat_model_stream" and node not in ("chart", ""): + if suppress_trailing_text: + continue chunk = data.get("chunk") if chunk is None: continue if getattr(chunk, "tool_call_chunks", None): continue content = chunk.content if hasattr(chunk, "content") else "" + # Each chunk gets its own unique message_id. All other event + # types already do this (uuid4). Sharing run_id across chunks + # collides on the Message.id primary key in persistence and + # silently drops every chunk after the first (see chat.py). if isinstance(content, str) and content: final_text_parts.append(content) yield { "event": "TEXT", "conversation_id": conversation_id, - "message_id": run_id, + "message_id": str(uuid.uuid4()), "payload": {"text": content}, } elif isinstance(content, list): @@ -105,17 +124,23 @@ async def stream_graph_events( yield { "event": "TEXT", "conversation_id": conversation_id, - "message_id": run_id, + "message_id": str(uuid.uuid4()), "payload": {"text": text}, } # ── TOOL_CALL ── elif event_type == "on_tool_start": tool_input = data.get("input", {}) + # Suppress inner tool calls nested under a same-named outer tool + # (MCP UI wrapper pattern) so the bubble/history isn't duplicated. + if any(active_tool_runs.get(pid) == name for pid in parent_ids): + suppressed_tool_runs.add(run_id) + continue + active_tool_runs[run_id] = name if name == "execute_sql": pending_sql[run_id] = tool_input.get("sql", "") - # create_chart renders as a CHART event — don't show a tool call bubble - if name == "create_chart": + # create_chart / present_proposals / report_proposal_results render as dedicated events — no tool call bubble + if name in ("create_chart", "present_proposals", "report_proposal_results"): continue yield { "event": "TOOL_CALL", @@ -126,6 +151,11 @@ async def stream_graph_events( # ── TOOL_ERROR (unhandled exception from tool) ── elif event_type == "on_tool_error": + if run_id in suppressed_tool_runs: + suppressed_tool_runs.discard(run_id) + active_tool_runs.pop(run_id, None) + pending_sql.pop(run_id, None) + continue error_msg = str(data.get("error", "Tool failed")) yield { "event": "TOOL_RESULT", @@ -138,9 +168,16 @@ async def stream_graph_events( }, } pending_sql.pop(run_id, None) + active_tool_runs.pop(run_id, None) # ── SQL / TOOL_RESULT / CHART ── elif event_type == "on_tool_end": + if run_id in suppressed_tool_runs: + suppressed_tool_runs.discard(run_id) + active_tool_runs.pop(run_id, None) + pending_sql.pop(run_id, None) + continue + active_tool_runs.pop(run_id, None) output = data.get("output", "") if hasattr(output, "content"): output = output.content @@ -184,6 +221,57 @@ async def stream_graph_events( "is_error": True, }, } + elif output_str.startswith("MCP_APP_READY:"): + # Pop the PendingApp from the side-channel and emit an MCP_APP event. + # HTML is never included in the SSE payload — the frontend fetches + # it via GET .../mcp-app/{message_id}/ui (prefetch warms the cache). + app_id = output_str.split(":", 1)[1].split()[0].strip() + from analytics_agent.agent.mcp_app_tool import _pending_apps + + pending_app = _pending_apps.pop(app_id, None) + if pending_app: + yield { + "event": "MCP_APP", + "conversation_id": conversation_id, + "message_id": str(uuid.uuid4()), + "payload": { + "app_id": app_id, + "connection_key": pending_app.connection_key, + "server_name": pending_app.server_name, + "tool_name": pending_app.tool_name, + "tool_input": pending_app.tool_input, + "tool_result": pending_app.tool_result, + "resource_uri": pending_app.resource_uri, + "csp": pending_app.csp, + "permissions": pending_app.permissions, + "allowed_tools": pending_app.allowed_tools, + }, + } + elif output_str.startswith("PROPOSALS_READY:"): + prop_id = output_str.split(":", 1)[1].split()[0].strip() + from analytics_agent.agent.proposals_tool import _pending_proposals + + pending = _pending_proposals.pop(prop_id, None) + if pending: + yield { + "event": "PROPOSALS", + "conversation_id": conversation_id, + "message_id": str(uuid.uuid4()), + "payload": pending, + } + elif output_str.startswith("RESULTS_READY:"): + result_id = output_str.split(":", 1)[1].split()[0].strip() + from analytics_agent.agent.results_tool import _pending_results + + pending_result = _pending_results.pop(result_id, None) + if pending_result: + yield { + "event": "PROPOSAL_RESULTS", + "conversation_id": conversation_id, + "message_id": str(uuid.uuid4()), + "payload": pending_result, + } + suppress_trailing_text = True elif name == "create_chart": # Fetch chart spec from side-channel (tool returns only a short marker) if output_str.startswith("CHART_READY:"): diff --git a/backend/src/analytics_agent/api/__init__.py b/backend/src/analytics_agent/api/__init__.py index c96a525..fdd148f 100644 --- a/backend/src/analytics_agent/api/__init__.py +++ b/backend/src/analytics_agent/api/__init__.py @@ -2,11 +2,12 @@ from fastapi import APIRouter -from analytics_agent.api import chat, conversations, oauth, settings +from analytics_agent.api import chat, conversations, mcp_apps, oauth, settings api_router = APIRouter() api_router.include_router(conversations.router) api_router.include_router(chat.router) +api_router.include_router(mcp_apps.router) api_router.include_router(settings.router) api_router.include_router(oauth.router) diff --git a/backend/src/analytics_agent/api/chat.py b/backend/src/analytics_agent/api/chat.py index d4dad54..5b9bff1 100644 --- a/backend/src/analytics_agent/api/chat.py +++ b/backend/src/analytics_agent/api/chat.py @@ -74,6 +74,19 @@ async def _compute_quality_background(conv_id: str, factory) -> None: class ChatMessageRequest(BaseModel): text: str + # Phase 2: MCP App iframe-originated turns include these for auditing and + # frontend rendering (source="mcp_app" rows render as SelectionChip, not a + # user bubble; origin_message_id anchors the chip under its selector iframe). + source: str | None = None # e.g. "mcp_app" + app_id: str | None = None + origin_message_id: str | None = None + # Short human-readable label shown on the SelectionChip; `text` may be a + # richer agent-facing message (e.g. includes URN + instruction to prevent + # redundant disambiguation tool calls). + display_text: str | None = None + # Audit field for proposal_select turns: the raw proposal IDs the user checked + # before submitting. Stored in the user message payload for traceability. + selected_ids: list[str] | None = None async def _persist_message( @@ -83,9 +96,10 @@ async def _persist_message( role: str, payload: dict, sequence: int, + message_id: str | None = None, ) -> None: msg = Message( - id=str(uuid.uuid4()), + id=message_id or str(uuid.uuid4()), conversation_id=conversation_id, event_type=event_type, role=role, @@ -103,6 +117,10 @@ async def _run_and_broadcast( user_text: str, engine_name: str, keepalive_interval: int, + source: str | None = None, + app_id: str | None = None, + origin_message_id: str | None = None, + display_text: str | None = None, ) -> None: """ Background task: runs the full agent pipeline independently of the HTTP @@ -123,8 +141,17 @@ def _broadcast(evt: dict) -> None: msg_repo = MessageRepo(session) sequence = await msg_repo.next_sequence(conversation_id) + user_payload: dict = {"text": user_text} + if source: + user_payload["source"] = source + if app_id: + user_payload["app_id"] = app_id + if origin_message_id: + user_payload["origin_message_id"] = origin_message_id + if display_text: + user_payload["display_text"] = display_text await _persist_message( - session, conversation_id, "TEXT", "user", {"text": user_text}, sequence + session, conversation_id, "TEXT", "user", user_payload, sequence ) await session.commit() sequence += 1 @@ -238,6 +265,10 @@ def _broadcast(evt: dict) -> None: try: context_tools: list = [] include_mutations = bool(enabled_mutations) + suppress_business_context_skill = False + + # Each platform owns its disabled_tools and include_mutations state. + # build_platform() reads from DB config; get_tools() filters internally. for row in all_cp_rows: platform = build_platform( row, @@ -248,6 +279,12 @@ def _broadcast(evt: dict) -> None: continue tools = await platform.get_tools() context_tools.extend(tools) + # If a datahub-mcp server advertises `get_context`, it supersedes + # the packaged `search_business_context` skill. Check post-filter + # so an operator who explicitly disabled `get_context` keeps the + # packaged skill active. + if row.type == "datahub-mcp" and any(t.name == "get_context" for t in tools): + suppress_business_context_skill = True logger.info( "Total context_tools=%d for conversation %s", @@ -268,6 +305,7 @@ def _broadcast(evt: dict) -> None: enabled_mutations=enabled_mutations, context_tools=context_tools, engine_tools=engine_tools, + suppress_business_context_skill=suppress_business_context_skill, ) except Exception as exc: for _evt in cast( @@ -310,7 +348,7 @@ def _broadcast(evt: dict) -> None: history=history, ): if evt.get("event") not in (None, "KEEPALIVE"): - with contextlib.suppress(Exception): + try: await _persist_message( session, conversation_id, @@ -318,9 +356,24 @@ def _broadcast(evt: dict) -> None: "assistant", evt.get("payload", {}), sequence, + message_id=evt.get("message_id"), ) await session.commit() sequence += 1 + except Exception as persist_exc: + # A failed commit (e.g. duplicate PK) leaves the async + # session in a doomed transaction — every subsequent + # commit on it would also fail until we rollback. + # Without this, one bad insert silently drops every + # later event in the same turn (incl. COMPLETE). + logger.warning( + "Persist failed for %s event in conv %s: %s", + evt.get("event"), + conversation_id, + persist_exc, + ) + with contextlib.suppress(Exception): + await session.rollback() if evt.get("event") == "TOOL_RESULT": tool_name = evt.get("payload", {}).get("tool_name", "") @@ -405,6 +458,14 @@ async def send_message( if not body.text.strip(): raise HTTPException(status_code=422, detail="Message text cannot be empty") + if body.source: + logger.info( + "Message in conv %s sourced from %s (app_id=%s)", + conversation_id, + body.source, + body.app_id, + ) + stream = ConvStream(task=None) _active_streams[conversation_id] = stream stream.task = asyncio.create_task( @@ -414,6 +475,10 @@ async def send_message( body.text.strip(), conv.engine_name, settings.sse_keepalive_interval, + source=body.source, + app_id=body.app_id, + origin_message_id=body.origin_message_id, + display_text=body.display_text, ) ) diff --git a/backend/src/analytics_agent/api/mcp_apps.py b/backend/src/analytics_agent/api/mcp_apps.py new file mode 100644 index 0000000..619fcd2 --- /dev/null +++ b/backend/src/analytics_agent/api/mcp_apps.py @@ -0,0 +1,279 @@ +"""MCP App endpoints. + +Phase 1: GET /api/conversations/{conversation_id}/mcp-app/{message_id}/ui + +Resolves the persisted {connection_key, resource_uri} from the MCP_APP message +row, fetches HTML via MCPResourceClient (cache-first, live-server fallback), and +returns {html, csp, permissions}. + +Returns HTTP 404 when both the cache and the live MCP server are unavailable. + +Phase 2: POST /api/conversations/{conversation_id}/mcp-app/{app_id}/tool-call + +Scoped MCP tool proxy. The iframe sends {tool_name, arguments}; this endpoint: + 1. Loads the PendingApp from the in-memory side-channel (fast path, live + during streaming) or rehydrates from the persisted MCP_APP message row. + 2. Validates that (connection_key, tool_name) is in the per-app allow-list + so the iframe can never reach engine tools, other MCP servers, or + create_chart. + 3. Dispatches tools/call on the originating MCP client session. + 4. Returns the CallToolResult as JSON. +""" + +from __future__ import annotations + +import logging +from typing import Any + +import orjson +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from analytics_agent.db.base import get_session +from analytics_agent.db.repository import ConversationRepo, MessageRepo + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/conversations", tags=["mcp-apps"]) + + +# ── Phase 2 helpers ─────────────────────────────────────────────────────────── + + +class _AppContext: + """Minimal info needed to authorise and dispatch a scoped tool call.""" + + def __init__(self, connection_key: str, server_name: str, allowed_tools: list[str]) -> None: + self.connection_key = connection_key + self.server_name = server_name + self.allowed_tools = allowed_tools + + +async def _load_app_context( + conversation_id: str, + app_id: str, + session: AsyncSession, +) -> _AppContext: + """Return app context from the in-memory side-channel or the DB row. + + Prefers the in-memory PendingApp (available while streaming is active or + before it was popped by on_tool_end). Falls back to scanning the + conversation's MCP_APP message rows when the in-memory entry is gone + (replay / post-stream calls). + + Raises HTTPException 404 if neither source has a record for this app_id. + """ + from analytics_agent.agent.mcp_app_tool import _pending_apps + + pending = _pending_apps.get(app_id) + if pending is not None: + return _AppContext( + connection_key=pending.connection_key, + server_name=pending.server_name, + allowed_tools=pending.allowed_tools, + ) + + # Rehydrate from persisted message rows. + msg_repo = MessageRepo(session) + messages = await msg_repo.list_for_conversation(conversation_id) + for msg in messages: + if msg.event_type != "MCP_APP" or not msg.payload: + continue + try: + payload: dict = orjson.loads(msg.payload) + except Exception: + continue + if payload.get("app_id") == app_id: + connection_key = payload.get("connection_key", "") + server_name = payload.get("server_name", "") + allowed_tools: list[str] = payload.get("allowed_tools") or [] + if not connection_key: + break + return _AppContext( + connection_key=connection_key, + server_name=server_name, + allowed_tools=allowed_tools, + ) + + raise HTTPException(status_code=404, detail=f"MCP App not found: app_id={app_id!r}") + + +async def _dispatch_tool_call( + connection_key: str, + server_name: str, + tool_name: str, + arguments: dict[str, Any], +) -> dict: + """Call tool_name on the originating MCP client; return the CallToolResult dict.""" + from analytics_agent.context.mcp_resources import _mcp_clients + + if connection_key not in _mcp_clients: + raise HTTPException( + status_code=503, + detail=f"MCP client for connection {connection_key!r} is not available — " + "the server may have restarted. Reload the page to reconnect.", + ) + + client, _sname = _mcp_clients[connection_key] + if server_name and _sname != server_name: + # server_name mismatch — use the one from the registry to be safe + logger.warning( + "server_name mismatch for connection_key=%s: stored=%s registry=%s", + connection_key, + server_name, + _sname, + ) + server_name = _sname + + try: + async with client.session(server_name) as mcp_session: + result = await mcp_session.call_tool(tool_name, arguments) + except Exception as exc: + logger.error( + "MCP tool call failed: connection=%s server=%s tool=%s: %s", + connection_key, + server_name, + tool_name, + exc, + ) + raise HTTPException( + status_code=502, + detail=f"MCP tool call failed: {exc}", + ) from exc + + # Serialise the result into a plain dict (content blocks list). + content: list[dict] = [] + for block in result.content or []: + if hasattr(block, "model_dump"): + content.append(block.model_dump()) + elif isinstance(block, dict): + content.append(block) + else: + content.append({"type": "text", "text": str(block)}) + + return { + "content": content, + "isError": getattr(result, "isError", False) or False, + } + + +class ToolCallRequest(BaseModel): + tool_name: str + arguments: dict[str, Any] = {} + + +# ── Endpoints ───────────────────────────────────────────────────────────────── + + +@router.get("/{conversation_id}/mcp-app/{message_id}/ui") +async def get_mcp_app_ui( + conversation_id: str, + message_id: str, + session: AsyncSession = Depends(get_session), +): + """Return the HTML, CSP, and permissions for a persisted MCP App message. + + Cache-first: serves from disk cache within TTL. Falls back to a live + resources/read call when the cache is stale or missing. Returns 404 with + a structured placeholder when both the cache and the live server are + unavailable. + """ + conv_repo = ConversationRepo(session) + conv = await conv_repo.get(conversation_id) + if not conv: + raise HTTPException(status_code=404, detail="Conversation not found") + + msg_repo = MessageRepo(session) + message = await msg_repo.get_by_id(message_id) + + if message is None or message.conversation_id != conversation_id: + raise HTTPException(status_code=404, detail="Message not found") + + if message.event_type != "MCP_APP": + raise HTTPException(status_code=404, detail="Message is not an MCP App event") + + try: + payload: dict = orjson.loads(message.payload) + except Exception: + raise HTTPException(status_code=500, detail="Corrupt message payload") + + connection_key: str | None = payload.get("connection_key") + resource_uri: str | None = payload.get("resource_uri") + + if not connection_key or not resource_uri: + raise HTTPException( + status_code=404, + detail="Message payload missing connection_key or resource_uri", + ) + + from analytics_agent.context.mcp_resources import resource_client + + try: + result = await resource_client.read_ui_resource(connection_key, resource_uri) + except RuntimeError as exc: + logger.warning( + "MCP app UI unavailable for message_id=%s connection_key=%s uri=%s: %s", + message_id, + connection_key, + resource_uri, + exc, + ) + raise HTTPException( + status_code=404, + detail={ + "message": "MCP app HTML unavailable — server offline and no cache", + "connection_key": connection_key, + "resource_uri": resource_uri, + }, + ) + + return { + "html": result["html"], + "csp": result.get("csp"), + "permissions": result.get("permissions", []), + "is_stale": result.get("is_stale", False), + } + + +@router.post("/{conversation_id}/mcp-app/{app_id}/tool-call") +async def mcp_app_tool_call( + conversation_id: str, + app_id: str, + body: ToolCallRequest, + session: AsyncSession = Depends(get_session), +): + """Phase 2: scoped MCP tool proxy for iframe apps. + + Validates the requested (connection_key, tool_name) pair against the + per-app allow-list, then dispatches tools/call on the originating MCP + client session. Never routes to engine tools, other connections, or + create_chart. + """ + conv_repo = ConversationRepo(session) + conv = await conv_repo.get(conversation_id) + if not conv: + raise HTTPException(status_code=404, detail="Conversation not found") + + app_ctx = await _load_app_context(conversation_id, app_id, session) + + # Security: ensure the requested tool is in the per-app allow-list. + if app_ctx.allowed_tools and body.tool_name not in app_ctx.allowed_tools: + logger.warning( + "Blocked tool call from iframe: app_id=%s tool=%s allowed=%s", + app_id, + body.tool_name, + app_ctx.allowed_tools, + ) + raise HTTPException( + status_code=403, + detail=f"Tool {body.tool_name!r} is not in the allow-list for this app.", + ) + + result = await _dispatch_tool_call( + connection_key=app_ctx.connection_key, + server_name=app_ctx.server_name, + tool_name=body.tool_name, + arguments=body.arguments, + ) + return result diff --git a/backend/src/analytics_agent/context/mcp_platform.py b/backend/src/analytics_agent/context/mcp_platform.py index 81223bc..eb3d192 100644 --- a/backend/src/analytics_agent/context/mcp_platform.py +++ b/backend/src/analytics_agent/context/mcp_platform.py @@ -110,6 +110,14 @@ async def get_tools(self) -> list[BaseTool]: client = MultiServerMCPClient({self.name: conn}) # type: ignore[dict-item] all_tools = await client.get_tools() + + from analytics_agent.context.mcp_ui import wrap_tools_with_ui_resources + + connection_key = f"ctx:{self.name}" + all_tools = await wrap_tools_with_ui_resources( + connection_key, client, self.name, all_tools + ) + self._tools_cache = all_tools result = [t for t in all_tools if t.name not in self.disabled_tools] logger.info( diff --git a/backend/src/analytics_agent/context/mcp_resources.py b/backend/src/analytics_agent/context/mcp_resources.py new file mode 100644 index 0000000..8634f04 --- /dev/null +++ b/backend/src/analytics_agent/context/mcp_resources.py @@ -0,0 +1,240 @@ +"""MCPResourceClient — reads ui:// resources from MCP servers. + +Reuses the MultiServerMCPClient instances in the _mcp_clients registry so we +open a fresh session (per resources/read call) without duplicating connection +config or spinning up a second stdio process. + +Disk cache layout:: + + ./data/mcp-app-cache/.html + ./data/mcp-app-cache/.meta.json + +The .meta.json sidecar holds {etag, last_modified, fetched_at, content_hash}. +Cache policy: + - Within TTL (default 1 h): serve from disk without contacting the server. + - Past TTL: re-validate using resources/read; compare content_hash if the + server doesn't return ETag/Last-Modified. + - Server offline past TTL: return stale cache (HTTP 200) with + is_stale=True; 404 only when both cache and live server are unavailable. +""" + +from __future__ import annotations + +import hashlib +import logging +import time +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import orjson + +if TYPE_CHECKING: + from langchain_mcp_adapters.client import MultiServerMCPClient + +logger = logging.getLogger(__name__) + +# Process-local registry populated by mcp_ui.register_mcp_client(). +# Maps connection_key -> (MultiServerMCPClient, server_name). +_mcp_clients: dict[str, tuple[Any, str]] = {} + +# Default disk-cache TTL in seconds (1 hour). +_DEFAULT_TTL = 3600 + +_CACHE_DIR: Path | None = None + + +def _get_cache_dir() -> Path: + global _CACHE_DIR # noqa: PLW0603 + if _CACHE_DIR is None: + from analytics_agent.config import settings + + db_path = Path(settings.database_url.split("///")[-1]) + data_dir = db_path.parent if db_path.parent.exists() else Path("data") + _CACHE_DIR = data_dir / "mcp-app-cache" + _CACHE_DIR.mkdir(parents=True, exist_ok=True) + return _CACHE_DIR + + +def _cache_key(connection_key: str, uri: str) -> str: + return hashlib.sha256(f"{connection_key}\x00{uri}".encode()).hexdigest() + + +def _html_path(key: str) -> Path: + return _get_cache_dir() / f"{key}.html" + + +def _meta_path(key: str) -> Path: + return _get_cache_dir() / f"{key}.meta.json" + + +def _read_cache(key: str) -> tuple[str | None, dict]: + """Return (html, meta) from disk; html is None on miss.""" + html_file = _html_path(key) + meta_file = _meta_path(key) + if not html_file.exists() or not meta_file.exists(): + return None, {} + try: + html = html_file.read_text(encoding="utf-8") + meta = orjson.loads(meta_file.read_bytes()) + return html, meta + except Exception: + return None, {} + + +def _write_cache(key: str, html: str, meta: dict) -> None: + try: + _html_path(key).write_text(html, encoding="utf-8") + _meta_path(key).write_bytes(orjson.dumps(meta)) + except Exception: + logger.warning("MCP app cache write failed for key=%s", key) + + +def _is_fresh(meta: dict, ttl: int) -> bool: + fetched_at = meta.get("fetched_at", 0) + return (time.time() - fetched_at) < ttl + + +class MCPResourceClient: + """Reads ui:// resources from MCP servers with disk caching. + + Instantiate once per process; the underlying MultiServerMCPClient instances + are shared via the _mcp_clients registry. + """ + + def __init__(self, ttl: int = _DEFAULT_TTL) -> None: + self.ttl = ttl + + async def read_ui_resource( + self, + connection_key: str, + uri: str, + *, + use_cache: bool = True, + ) -> dict: + """Fetch a ui:// resource. + + Returns a dict with keys: html, csp, permissions, etag, last_modified, + is_stale (True when falling back to a stale cache entry). + + Raises RuntimeError if both cache and live server are unavailable. + """ + key = _cache_key(connection_key, uri) + cached_html, cached_meta = _read_cache(key) + + if use_cache and cached_html is not None and _is_fresh(cached_meta, self.ttl): + logger.debug("MCP app cache hit (fresh) for uri=%s", uri) + return { + "html": cached_html, + "csp": cached_meta.get("csp"), + "permissions": cached_meta.get("permissions", []), + "etag": cached_meta.get("etag"), + "last_modified": cached_meta.get("last_modified"), + "is_stale": False, + } + + # Need to fetch / re-validate from the server. + try: + result = await self._fetch_from_server(connection_key, uri, cached_meta) + except Exception as exc: + if cached_html is not None: + logger.warning( + "MCP server unavailable for uri=%s; serving stale cache: %s", + uri, + exc, + ) + return { + "html": cached_html, + "csp": cached_meta.get("csp"), + "permissions": cached_meta.get("permissions", []), + "etag": cached_meta.get("etag"), + "last_modified": cached_meta.get("last_modified"), + "is_stale": True, + } + raise RuntimeError( + f"MCP resource unavailable and no cache for uri={uri!r}: {exc}" + ) from exc + + if result is None: + # Server said "not modified" (304-style). + assert cached_html is not None + new_meta = {**cached_meta, "fetched_at": time.time()} + _write_cache(key, cached_html, new_meta) + return { + "html": cached_html, + "csp": cached_meta.get("csp"), + "permissions": cached_meta.get("permissions", []), + "etag": cached_meta.get("etag"), + "last_modified": cached_meta.get("last_modified"), + "is_stale": False, + } + + html, new_meta = result + _write_cache(key, html, new_meta) + return { + "html": html, + "csp": new_meta.get("csp"), + "permissions": new_meta.get("permissions", []), + "etag": new_meta.get("etag"), + "last_modified": new_meta.get("last_modified"), + "is_stale": False, + } + + async def _fetch_from_server( + self, + connection_key: str, + uri: str, + cached_meta: dict, + ) -> tuple[str, dict] | None: + """Fetch resource from the MCP server. + + Returns (html, new_meta) on success, None if content is unchanged. + Raises on failure. + """ + if connection_key not in _mcp_clients: + raise RuntimeError( + f"No MCP client registered for connection_key={connection_key!r}" + ) + + client, server_name = _mcp_clients[connection_key] + + from pydantic import AnyUrl + + async with client.session(server_name) as session: + result = await session.read_resource(AnyUrl(uri)) + + if not result.contents: + raise RuntimeError(f"Empty resource response for uri={uri!r}") + + first = result.contents[0] + if not hasattr(first, "text"): + raise RuntimeError( + f"Resource {uri!r} returned non-text content (type={type(first).__name__})" + ) + html: str = first.text + + # Compare content hash for change detection when server doesn't send ETags. + content_hash = hashlib.sha256(html.encode()).hexdigest() + if cached_meta.get("content_hash") == content_hash: + return None # unchanged + + new_meta: dict = { + "fetched_at": time.time(), + "content_hash": content_hash, + "etag": None, + "last_modified": None, + "csp": None, + "permissions": [], + } + + # Extract optional csp/permissions from resource metadata (if exposed). + meta_obj = getattr(result, "meta", None) or {} + if isinstance(meta_obj, dict): + ui = meta_obj.get("ui") or {} + new_meta["csp"] = ui.get("csp") + new_meta["permissions"] = ui.get("permissions") or [] + + return html, new_meta + + +# Module-level singleton shared across call sites. +resource_client = MCPResourceClient() diff --git a/backend/src/analytics_agent/context/mcp_ui.py b/backend/src/analytics_agent/context/mcp_ui.py new file mode 100644 index 0000000..f1912b6 --- /dev/null +++ b/backend/src/analytics_agent/context/mcp_ui.py @@ -0,0 +1,220 @@ +"""MCP UI helpers — client registry + tool wrapping. + +Two responsibilities: +1. _mcp_clients registry: populated when a platform/engine is built; provides + the MultiServerMCPClient reference that MCPResourceClient uses for + resources/read calls. +2. wrap_tools_with_ui_resources(): called from both MCPContextPlatform.get_tools() + and MCPQueryEngine.get_tools_async(). For every tool that advertises a + _meta.ui.resourceUri, it: + a. Kicks off an asyncio.create_task() prefetch of the resource into the disk + cache so the cache is warm by the time the agent calls the tool. + b. Replaces the tool with a wrapper that, after the underlying MCP tool/call + returns, stuffs {tool_result, connection_key, tool_name, tool_input, + resource_uri, csp, permissions} into _pending_apps and returns the short + marker MCP_APP_READY:. +""" + +from __future__ import annotations + +import asyncio +import logging +import uuid +from typing import TYPE_CHECKING, Any + +import orjson + +if TYPE_CHECKING: + from langchain_core.tools import BaseTool + +logger = logging.getLogger(__name__) + + +_DEFAULT_UI_AGENT_INSTRUCTIONS = ( + "An interactive UI card has been rendered inline in the chat for the user to " + "act on. Do not restate the options, do not call any other tools, and do not " + "answer the user's question yet. Stop and wait for the user's next message." +) + + +def _extract_agent_instructions(content_blocks: list[dict]) -> str | None: + """Return the `_agent_instructions` string from any JSON-bearing text block. + + MCP servers that render UI can embed `_agent_instructions` in their tool + output so the host model knows to stop and wait for user interaction. Our + wrapper previously swallowed the entire payload by returning only a marker, + which meant the LLM never saw this directive and would happily call more + tools. Surface it back into the tool's return value so it lands in the + ToolMessage the agent reads. + """ + for block in content_blocks: + if not isinstance(block, dict) or block.get("type") != "text": + continue + text = block.get("text", "") + if not isinstance(text, str) or "_agent_instructions" not in text: + continue + try: + data = orjson.loads(text) + except Exception: + continue + if isinstance(data, dict): + instructions = data.get("_agent_instructions") + if isinstance(instructions, str) and instructions.strip(): + return instructions.strip() + return None + + +def register_mcp_client( + connection_key: str, + client: Any, + server_name: str, +) -> None: + """Register a MultiServerMCPClient in the shared registry (mcp_resources._mcp_clients).""" + from analytics_agent.context.mcp_resources import _mcp_clients + + _mcp_clients[connection_key] = (client, server_name) + logger.debug("Registered MCP client for connection_key=%s", connection_key) + + +def _get_ui_meta(tool: BaseTool) -> dict: + """Extract _meta.ui from a LangChain tool's metadata (may be empty).""" + meta = (tool.metadata or {}).get("_meta") or {} + if isinstance(meta, dict): + return meta.get("ui") or {} + return {} + + +async def _prefetch_resource(connection_key: str, uri: str) -> None: + """Best-effort prefetch of a ui:// resource into the disk cache.""" + try: + from analytics_agent.context.mcp_resources import resource_client + + await resource_client.read_ui_resource(connection_key, uri, use_cache=False) + logger.debug("Prefetched MCP ui resource: connection=%s uri=%s", connection_key, uri) + except Exception as exc: + logger.warning( + "MCP ui resource prefetch failed (connection=%s uri=%s): %s", + connection_key, + uri, + exc, + ) + + +def _make_content_blocks(raw: Any) -> list[dict]: + """Coerce an ainvoke() return value into standard MCP content blocks. + + The MCP Apps spec requires `ui/notifications/tool-result` params to be a + CallToolResult (`{content: [{type, text, ...}, ...]}`). Preserve structured + content so the iframe app can parse it; wrap plain strings as a single + text block. + """ + if isinstance(raw, list): + blocks: list[dict] = [] + for block in raw: + if isinstance(block, dict): + blocks.append(block) + elif isinstance(block, str): + blocks.append({"type": "text", "text": block}) + return blocks + if isinstance(raw, str): + return [{"type": "text", "text": raw}] + return [{"type": "text", "text": str(raw)}] + + +def _wrap_tool( + original: BaseTool, + connection_key: str, + server_name: str, + resource_uri: str, + csp: str | None, + permissions: list[str], + allowed_tools: list[str], +) -> BaseTool: + """Return a new StructuredTool that calls original then stuffs _pending_apps.""" + from langchain_core.tools import StructuredTool + + from analytics_agent.agent.mcp_app_tool import PendingApp, _pending_apps + + tool_name = original.name + + async def _wrapper(**kwargs: Any) -> str: + raw = await original.ainvoke(kwargs) + content_blocks = _make_content_blocks(raw) + + app_id = str(uuid.uuid4()) + _pending_apps[app_id] = PendingApp( + app_id=app_id, + connection_key=connection_key, + server_name=server_name, + tool_name=tool_name, + tool_input=dict(kwargs), + tool_result=content_blocks, + resource_uri=resource_uri, + csp=csp, + permissions=list(permissions), + allowed_tools=list(allowed_tools), + ) + # Preserve the server-provided `_agent_instructions` (or fall back to a + # sensible default) so the LLM actually receives a directive to stop + # and wait for the user. streaming.py still parses the leading + # `MCP_APP_READY:` marker on the first line. + instructions = ( + _extract_agent_instructions(content_blocks) + or _DEFAULT_UI_AGENT_INSTRUCTIONS + ) + return f"MCP_APP_READY:{app_id} ({tool_name})\n{instructions}" + + return StructuredTool( + name=original.name, + description=original.description, + args_schema=original.args_schema, + coroutine=_wrapper, + metadata=original.metadata, + ) + + +async def wrap_tools_with_ui_resources( + connection_key: str, + client: Any, + server_name: str, + tools: list[BaseTool], +) -> list[BaseTool]: + """Register the client and wrap any UI-bearing tools. + + For each tool whose MCP descriptor carries _meta.ui.resourceUri: + - Kick off an asyncio.create_task() prefetch into the disk cache. + - Replace the tool with a wrapper that emits MCP_APP_READY:. + + Tools without a resourceUri are returned unchanged. + """ + register_mcp_client(connection_key, client, server_name) + + # Collect all tool names on this connection for the Phase 2 allow-list. + all_tool_names = [t.name for t in tools] + + result: list[BaseTool] = [] + for tool in tools: + ui_meta = _get_ui_meta(tool) + resource_uri: str | None = ui_meta.get("resourceUri") + if not resource_uri: + result.append(tool) + continue + + csp: str | None = ui_meta.get("csp") + permissions: list[str] = ui_meta.get("permissions") or [] + + # Fire-and-forget prefetch so the cache is warm before the agent calls the tool. + asyncio.create_task( + _prefetch_resource(connection_key, resource_uri), + name=f"mcp-prefetch-{connection_key}-{tool.name}", + ) + + wrapped = _wrap_tool( + tool, connection_key, server_name, resource_uri, csp, permissions, all_tool_names + ) + result.append(wrapped) + logger.info( + "MCP tool '%s' wrapped with UI resource: %s", tool.name, resource_uri + ) + + return result diff --git a/backend/src/analytics_agent/db/repository.py b/backend/src/analytics_agent/db/repository.py index 3487f5f..9439b36 100644 --- a/backend/src/analytics_agent/db/repository.py +++ b/backend/src/analytics_agent/db/repository.py @@ -90,6 +90,9 @@ async def create(self, message: Message) -> Message: await self._session.commit() return message + async def get_by_id(self, message_id: str) -> Message | None: + return await self._session.get(Message, message_id) + async def list_for_conversation(self, conversation_id: str) -> list[Message]: result = await self._session.execute( select(Message) diff --git a/backend/src/analytics_agent/engines/mcp/engine.py b/backend/src/analytics_agent/engines/mcp/engine.py index b0ab318..9ba25a4 100644 --- a/backend/src/analytics_agent/engines/mcp/engine.py +++ b/backend/src/analytics_agent/engines/mcp/engine.py @@ -70,6 +70,15 @@ async def get_tools_async(self) -> list[BaseTool]: client = MultiServerMCPClient({"engine": conn}) # type: ignore[dict-item] tools = await client.get_tools() + + from analytics_agent.context.mcp_ui import wrap_tools_with_ui_resources + + # Use a stable key derived from the MCP config so multiple MCP engines + # can coexist in the same process without registry collisions. + engine_label = self._mcp_cfg.get("name") or self._mcp_cfg.get("url", "engine") + connection_key = f"engine:{engine_label}" + tools = await wrap_tools_with_ui_resources(connection_key, client, "engine", tools) + logger.info("MCP engine provided %d tools", len(tools)) return tools diff --git a/backend/src/analytics_agent/prompts/system.py b/backend/src/analytics_agent/prompts/system.py index c925791..7c194fe 100644 --- a/backend/src/analytics_agent/prompts/system.py +++ b/backend/src/analytics_agent/prompts/system.py @@ -11,6 +11,7 @@ def get_prompt_template() -> str: def build_system_prompt( engine_name: str, enabled_skills: set[str] | None = None, + include_business_context: bool = True, ) -> str: from analytics_agent.skills.loader import ( get_improve_context_prompt_section, @@ -21,8 +22,10 @@ def build_system_prompt( today = date.today().strftime("%B %d, %Y") base = get_prompt_template().format(engine_name=engine_name, today=today) - # Always inject always-on meta-skills - base = base + get_search_business_context_section() + # Always inject always-on meta-skills. `search_business_context` is skipped + # when a richer `get_context` tool is in play (see build_graph). + if include_business_context: + base = base + get_search_business_context_section() base = base + get_improve_context_prompt_section() if enabled_skills: diff --git a/backend/src/analytics_agent/prompts/system_prompt.md b/backend/src/analytics_agent/prompts/system_prompt.md index 02e6af7..2927fe8 100644 --- a/backend/src/analytics_agent/prompts/system_prompt.md +++ b/backend/src/analytics_agent/prompts/system_prompt.md @@ -11,7 +11,7 @@ Your goal is to answer the user's data questions by: ## Available tool groups **DataHub (catalog & context)** -- search_documents: search business documentation, metric definitions, and domain knowledge — USE THIS FIRST +- search_documents: search business documentation, metric definitions, and domain knowledge - grep_documents: search document content for specific terms or patterns - search: find datasets, dashboards, and other data assets by keyword - get_entities: get detailed metadata (schema, description, owners) for specific assets — takes a LIST OF URNs only (no filters); call search() first to get URNs, then pass them here @@ -124,7 +124,7 @@ me about whether I'm querying the right table?* ## Workflow -For every data question, follow this order: +For every data question, follow this order only if "get_context" tool is unavailable: **Step 1 — Understand the business context first** Always call search_documents before doing anything else. Search for: diff --git a/backend/src/analytics_agent/skills/improve-context/SKILL.md b/backend/src/analytics_agent/skills/improve-context/SKILL.md index 98f9a69..1b5a23b 100644 --- a/backend/src/analytics_agent/skills/improve-context/SKILL.md +++ b/backend/src/analytics_agent/skills/improve-context/SKILL.md @@ -44,25 +44,53 @@ Format each proposal clearly, like this: Keep each proposal to 1–2 sentences. Be specific about what to add or change. -### Step 4 — Ask for approval +### Step 4 — Present proposals via tool -Present the numbered list and ask: **"Which of these would you like me to publish? Reply with the numbers, 'all', or 'none'."** +Call `present_proposals` with a short framing `prompt` and the `proposals` list you drafted in Step 3. The UI will render a review card with checkboxes — do **not** print a markdown numbered list yourself. -Do NOT call any write-back tools until the user explicitly approves. +Each proposal must include: +- `id`: a string like `"1"`, `"2"`, `"3"` matching the Step 3 draft order +- `kind`: one of `"new_doc"`, `"update_doc"`, `"fix_description"` +- `title`: short label (e.g. `"Revenue Metrics Guide"`) +- `detail`: 1–2 sentence description of what to add or change +- `target` (optional): `{"urn": "...", "field_path": "..."}` for fix_description proposals that target a known entity -### Step 5 — Execute approved changes +Example call: +``` +present_proposals( + prompt="I found 3 documentation improvements based on our conversation:", + proposals=[ + {"id": "1", "kind": "new_doc", "title": "Revenue Metrics Guide", + "detail": "Define net ARR vs gross ARR and specify the revenue table as source of truth."}, + {"id": "2", "kind": "update_doc", "title": "Orders FAQ", + "detail": "Add: deleted_at IS NULL means the order is active; non-null means soft-deleted."}, + {"id": "3", "kind": "fix_description", "title": "orders.status column", + "detail": "Current description is empty. Values: pending, confirmed, shipped, cancelled, refunded.", + "target": {"urn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,orders,PROD)", "field_path": "status"}}, + ] +) +``` -For each approved proposal, follow the `save_correction` skill instructions to -confirm the change with the user before writing: +Do **not** call any write-back tools until the user explicitly selects proposals and submits. -- **New doc** → Use `save_correction` Mode 3: find the right parent folder first - via `search_documents`, then call with `doc_title`, `doc_body`, `parent_doc_urn` -- **Update existing doc** → Use `save_correction` Mode 2: call with `doc_urn`, - `doc_title`, `doc_body` (full updated body) -- **Fix description** → Use `save_correction` Mode 1: call with `entity_urn`, - `corrected_description`, and `field_path` if field-level +### Step 5 — Execute approved changes directly -After each write, report the URN and location so the user can find it in DataHub. +> **Note: the user has already approved these changes via the proposals card; do NOT ask for another confirmation.** + +The user's submission of the proposals card is the final confirmation. Do **NOT** ask for any further confirmation. Do **NOT** print the doc body for review before writing. + +For each approved proposal (and only the approved ones, in order): + +1. **Draft the full content now:** + - `new_doc` → write the full markdown body using the template from `save_correction` Mode 3 Step 2. Find the parent folder via `search_documents`. + - `update_doc` → fetch the existing doc with `search_documents` or `get_entities`, produce the full updated body. + - `fix_description` → compose the corrected description text. + +2. **Call `save_correction` immediately** with the drafted content. **Skip the in-skill confirmation sub-steps** (Mode 1 Step 3, Mode 2 Step 2, Mode 3 Step 3). Collect the result (success + URN, or error message). + +3. Do not write any interim "I'm working on it" text between `save_correction` calls. Proceed silently through all selected proposals. + +After all writes complete, call `report_proposal_results` **once** with the full list of outcomes. This is the **final action of the turn**. Do **NOT** output any text after calling `report_proposal_results` — no narrative summary, no restating of what succeeded/failed, no "Here's what happened", no bullet list of results. The card rendered from the tool call IS the complete summary; any additional text is redundant noise that will be stripped. ### Step 6 — Graceful degradation (write-back not available) diff --git a/backend/src/analytics_agent/skills/save-correction/SKILL.md b/backend/src/analytics_agent/skills/save-correction/SKILL.md index 4991d0f..366ffbc 100644 --- a/backend/src/analytics_agent/skills/save-correction/SKILL.md +++ b/backend/src/analytics_agent/skills/save-correction/SKILL.md @@ -17,6 +17,10 @@ metadata: # save_correction +## When called from /improve-context + +When this skill is invoked during `/improve-context` execution (after the user has submitted the proposals card), skip the confirmation sub-steps in each Mode (Mode 1 Step 3, Mode 2 Step 2, Mode 3 Step 3). The user has already approved the change via the proposals card; proceed directly to the save call without showing diffs or asking "Shall I apply this?". + ## Overview This skill writes correct knowledge back to DataHub. It handles three cases: diff --git a/frontend/src/api/conversations.ts b/frontend/src/api/conversations.ts index 6c60251..f858047 100644 --- a/frontend/src/api/conversations.ts +++ b/frontend/src/api/conversations.ts @@ -1,4 +1,5 @@ -import type { ConversationDetail, ConversationSummary, Engine } from "@/types"; +import type { ConversationDetail, ConversationSummary, Engine, ProposalItem } from "@/types"; +import { streamMessage } from "@/api/stream"; const BASE = "/api"; @@ -59,3 +60,31 @@ export async function getContextQuality(conversationId: string): Promise selectedIds.includes(p.id)); + const agentText = + selectedItems.length === 0 + ? "Skip all proposals — make no changes." + : `Publish the following proposals: ${selectedItems.map((p) => `"${p.title}"`).join(", ")}.`; + const displayText = + selectedItems.length === 0 + ? "Skipped proposals" + : `Selected ${selectedItems.length} proposal${selectedItems.length === 1 ? "" : "s"}`; + + return streamMessage(conversationId, agentText, undefined, { + source: "proposal_select", + origin_message_id: originMessageId, + display_text: displayText, + selected_ids: selectedIds, + }); +} diff --git a/frontend/src/api/mcpApp.ts b/frontend/src/api/mcpApp.ts new file mode 100644 index 0000000..ec8e667 --- /dev/null +++ b/frontend/src/api/mcpApp.ts @@ -0,0 +1,62 @@ +const BASE = "/api"; + +export interface McpAppUi { + html: string; + csp: string | null; + permissions: string[]; +} + +/** + * Fetch the HTML for an MCP App by message ID. + * + * The backend serves this from the disk cache (warmed by the prefetch + * at tool-discovery time) or falls back to a live resources/read. + * Returns null if both cache and server are unavailable (404). + */ +export async function fetchMcpAppUi( + conversationId: string, + messageId: string +): Promise { + const res = await fetch( + `${BASE}/conversations/${conversationId}/mcp-app/${messageId}/ui` + ); + if (res.status === 404) return null; + if (!res.ok) throw new Error(`Failed to fetch MCP App UI: ${res.status}`); + return res.json(); +} + +export interface McpToolCallResult { + content: Array<{ type: string; text?: string; [k: string]: unknown }>; + isError?: boolean; +} + +/** + * Phase 2: scoped MCP tool proxy. + * + * Called by useMcpAppBridge when the iframe sends a `tools/call` JSON-RPC + * request. The backend validates the (connection_key, tool_name) allow-list + * before dispatching to the originating MCP server. + * + * Throws on network / HTTP errors; the hook re-raises as a JSON-RPC error + * reply to the iframe. + */ +export async function callMcpAppTool( + conversationId: string, + appId: string, + toolName: string, + args: Record = {} +): Promise { + const res = await fetch( + `${BASE}/conversations/${conversationId}/mcp-app/${appId}/tool-call`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ tool_name: toolName, arguments: args }), + } + ); + if (!res.ok) { + const detail = await res.text().catch(() => res.statusText); + throw new Error(`MCP tool call failed (${res.status}): ${detail}`); + } + return res.json(); +} diff --git a/frontend/src/api/stream.ts b/frontend/src/api/stream.ts index 9bda768..e103f3d 100644 --- a/frontend/src/api/stream.ts +++ b/frontend/src/api/stream.ts @@ -33,12 +33,14 @@ export async function* reattachStream( export async function* streamMessage( conversationId: string, text: string, - signal?: AbortSignal + signal?: AbortSignal, + /** Extra body fields forwarded verbatim (e.g. source, app_id for auditing). */ + extraBody?: Record ): AsyncIterator { const res = await fetch(`/api/conversations/${conversationId}/messages`, { method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ text }), + body: JSON.stringify({ text, ...extraBody }), signal, }); diff --git a/frontend/src/components/Chat/ChatView.tsx b/frontend/src/components/Chat/ChatView.tsx index f911085..56dceac 100644 --- a/frontend/src/components/Chat/ChatView.tsx +++ b/frontend/src/components/Chat/ChatView.tsx @@ -198,26 +198,12 @@ export function ChatView() { setExportModalOpen(false); }, [activeConv?.title]); - const handleSend = async (text: string) => { - if (!activeId || isStreaming) return; - - // Append user message immediately - appendMessage({ - id: crypto.randomUUID(), - event_type: "TEXT", - role: "user", - payload: { text }, - }); - + const consumeStream = async (stream: AsyncIterator, convId: string) => { setStreaming(true); resetStreamingText(); // new turn — reset so TEXT goes to a fresh message - const conversationId = activeId; let aborted = false; try { - const controller = new AbortController(); - streamAbortRef.current = controller; - const stream = streamMessage(conversationId, text, controller.signal); let result = await stream.next(); const sendTurnUsage: TurnUsage = { input_tokens: 0, output_tokens: 0, total_tokens: 0, cache_read_tokens: 0, cache_creation_tokens: 0, calls: 0 }; while (!result.done) { @@ -225,7 +211,6 @@ export function ChatView() { if (event.event === "TEXT") { appendStreamingText((event.payload as { text: string }).text); } else if (event.event === "TOOL_CALL") { - // Text before this tool call was reasoning — mark it as a thinking block markCurrentAsThinking(); appendMessage({ id: event.message_id, @@ -279,13 +264,30 @@ export function ChatView() { setStreaming(false); if (!aborted) { // Fire-and-forget title generation after the turn completes - generateTitle(conversationId).then((r) => { - if (r.updated) updateConversationTitle(conversationId, r.title); + generateTitle(convId).then((r) => { + if (r.updated) updateConversationTitle(convId, r.title); }).catch(() => {}); } } }; + const handleSend = async (text: string) => { + if (!activeId || isStreaming) return; + + // Append user message immediately + appendMessage({ + id: crypto.randomUUID(), + event_type: "TEXT", + role: "user", + payload: { text }, + }); + + const controller = new AbortController(); + streamAbortRef.current = controller; + const stream = streamMessage(activeId, text, controller.signal); + await consumeStream(stream, activeId); + }; + const handleWelcomeSend = async (text: string, engineName: string) => { pendingFirstMessage.current = text; const conv = await createConversation(engineName); @@ -351,7 +353,26 @@ export function ChatView() { { + if (!activeId || isStreaming) return; + // Append the selection chip as a user message immediately so it + // appears anchored under the proposals card without waiting for reload. + appendMessage({ + id: crypto.randomUUID(), + event_type: "TEXT", + role: "user", + payload: { + text: userPayload.text, + display_text: userPayload.display_text, + source: "proposal_select", + origin_message_id: userPayload.origin_message_id, + selected_ids: userPayload.selected_ids, + }, + }); + await consumeStream(stream, activeId); + }} onChartError={(error) => { if (chartErrorRetried.current || isStreaming) return; chartErrorRetried.current = true; diff --git a/frontend/src/components/Chat/MessageList.tsx b/frontend/src/components/Chat/MessageList.tsx index 072f438..b7cee2b 100644 --- a/frontend/src/components/Chat/MessageList.tsx +++ b/frontend/src/components/Chat/MessageList.tsx @@ -1,17 +1,44 @@ import { Fragment, useEffect, useRef } from "react"; -import type { UIMessage } from "@/types"; +import type { + UIMessage, + MCPAppPayload, + ProposalsPayload, + ProposalResultsPayload, + SSEEvent, +} from "@/types"; import { TextMessage } from "./messages/TextMessage"; import { AgentWorkBlock } from "./messages/AgentWorkBlock"; +import { SelectionChip, type SelectionChipPayload } from "./messages/SelectionChip"; +import { MCPAppMessage } from "./messages/MCPAppMessage"; +import { ProposalsMessage } from "./messages/ProposalsMessage"; +import { ProposalResultsMessage } from "./messages/ProposalResultsMessage"; import { groupIntoTurns } from "@/lib/groupMessages"; interface Props { messages: UIMessage[]; isStreaming?: boolean; + conversationId?: string; showReasoning?: boolean; onChartError?: (error: string) => void; + onProposalStream?: ( + stream: AsyncIterator, + userPayload: { + text: string; + display_text: string; + origin_message_id: string; + selected_ids: string[]; + } + ) => void; } -export function MessageList({ messages, isStreaming = false, showReasoning = true, onChartError }: Props) { +export function MessageList({ + messages, + isStreaming = false, + conversationId, + showReasoning = true, + onChartError, + onProposalStream, +}: Props) { const bottomRef = useRef(null); useEffect(() => { @@ -30,42 +57,101 @@ export function MessageList({ messages, isStreaming = false, showReasoning = tru return (
- {groups.map((group) => ( - - {/* User message */} - {group.userMsg && ( -
- { + // MCP App / proposal-select user turns render as a compact selection + // chip (flavor C) — not a full user bubble. + const userSource = (group.userMsg?.payload as Record | undefined)?.source; + const isMcpAppSelection = + group.userMsg?.event_type === "TEXT" && + (userSource === "mcp_app" || userSource === "proposal_select"); + return ( + + {/* User message */} + {group.userMsg && isMcpAppSelection && ( +
+ +
+ )} + {group.userMsg && !isMcpAppSelection && ( +
+ +
+ )} + + {/* Agent work block — only shown when there are intermediate steps */} + {group.workMsgs.length > 0 && ( + -
- )} + )} - {/* Agent work block — only shown when there are intermediate steps */} - {group.workMsgs.length > 0 && ( - - )} + {/* Interactive cards (MCP apps / proposals / results) — sibling to work block, never collapses */} + {group.interactiveMsgs.map((msg) => { + if (msg.event_type === "MCP_APP" && conversationId) { + return ( +
+ +
+ ); + } + if (msg.event_type === "PROPOSALS" && conversationId) { + const submitted = messages.some( + (m) => + m.role === "user" && + (m.payload as Record).source === "proposal_select" && + (m.payload as Record).origin_message_id === msg.id + ); + return ( +
+ + onProposalStream?.(stream, userPayload) + } + /> +
+ ); + } + if (msg.event_type === "PROPOSAL_RESULTS") { + return ( +
+ +
+ ); + } + return null; + })} - {/* Final visible response */} - {group.finalMsg && (group.finalMsg.payload as { text?: string }).text?.trim() && ( -
- -
- )} -
- ))} + {/* Final visible response */} + {group.finalMsg && (group.finalMsg.payload as { text?: string }).text?.trim() && ( +
+ +
+ )} + + ); + })}
); diff --git a/frontend/src/components/Chat/messages/MCPAppMessage.tsx b/frontend/src/components/Chat/messages/MCPAppMessage.tsx new file mode 100644 index 0000000..e5f7056 --- /dev/null +++ b/frontend/src/components/Chat/messages/MCPAppMessage.tsx @@ -0,0 +1,123 @@ +/** + * MCPAppMessage — adapter between an MCP_APP SSE payload and MCPAppFrame. + * + * On mount, always fetches HTML via GET .../mcp-app/{message_id}/ui. + * The prefetched disk cache on the backend makes this fast for first render; + * the same code path handles replay when the conversation is reloaded. + * + * Shows a skeleton while the fetch is in flight, a graceful placeholder on + * 404 (both cache and server unavailable), and the sandboxed iframe once ready. + */ + +import { useEffect, useState } from "react"; +import { Loader2, AppWindow } from "lucide-react"; +import type { MCPAppPayload } from "@/types"; +import { fetchMcpAppUi } from "@/api/mcpApp"; +import { MCPAppFrame } from "@/lib/mcpApps/MCPAppFrame"; + +interface Props { + messageId: string; + conversationId: string; + payload: MCPAppPayload; +} + +type FetchState = + | { status: "loading" } + | { status: "ready"; html: string; csp: string | null; permissions: string[] } + | { status: "unavailable" } + | { status: "error"; message: string }; + +export function MCPAppMessage({ messageId, conversationId, payload }: Props) { + const [fetchState, setFetchState] = useState({ status: "loading" }); + const [appReady, setAppReady] = useState(false); + + useEffect(() => { + let cancelled = false; + setFetchState({ status: "loading" }); + setAppReady(false); + + fetchMcpAppUi(conversationId, messageId) + .then((ui) => { + if (cancelled) return; + if (ui === null) { + setFetchState({ status: "unavailable" }); + } else { + setFetchState({ + status: "ready", + html: ui.html, + csp: ui.csp, + permissions: ui.permissions, + }); + } + }) + .catch((err: unknown) => { + if (cancelled) return; + setFetchState({ + status: "error", + message: err instanceof Error ? err.message : String(err), + }); + }); + + return () => { + cancelled = true; + }; + }, [conversationId, messageId]); + + return ( +
+ {/* Header strip */} +
+ + {payload.tool_name} + {payload.server_name && ( + {payload.server_name} + )} +
+ + {/* Content area */} + {fetchState.status === "loading" && ( +
+ + Loading app… +
+ )} + + {fetchState.status === "unavailable" && ( +
+

App unavailable

+

+ The MCP server is offline and no cached version is available. +

+
+ )} + + {fetchState.status === "error" && ( +
+ {fetchState.message} +
+ )} + + {fetchState.status === "ready" && ( +
+ {!appReady && ( +
+ +
+ )} + setAppReady(true)} + className="w-full border-0 block" + conversationId={conversationId} + appId={payload.app_id} + originMessageId={messageId} + /> +
+ )} +
+ ); +} diff --git a/frontend/src/components/Chat/messages/ProposalResultsMessage.tsx b/frontend/src/components/Chat/messages/ProposalResultsMessage.tsx new file mode 100644 index 0000000..f1ece83 --- /dev/null +++ b/frontend/src/components/Chat/messages/ProposalResultsMessage.tsx @@ -0,0 +1,157 @@ +/** + * ProposalResultsMessage — renders the outcomes of a /improve-context write + * run as a compact status card. Each row shows a green check (success) or + * red X (error), the kind badge, the title, the written URN, and an + * "Open in DataHub" link for successes. + */ + +import { useEffect, useState } from "react"; +import { CheckCircle2, XCircle, FilePlus, FileText, Tag, ExternalLink, ClipboardList } from "lucide-react"; +import type { ProposalResultItem, ProposalResultsPayload } from "@/types"; +import { listConnections } from "@/api/settings"; + +interface Props { + payload: ProposalResultsPayload; +} + +const KIND_META: Record< + ProposalResultItem["kind"], + { label: string; className: string; icon: React.ReactNode } +> = { + new_doc: { + label: "New doc", + className: "bg-emerald-100 text-emerald-700 dark:bg-emerald-900/40 dark:text-emerald-400", + icon: , + }, + update_doc: { + label: "Update doc", + className: "bg-blue-100 text-blue-700 dark:bg-blue-900/40 dark:text-blue-400", + icon: , + }, + fix_description: { + label: "Fix description", + className: "bg-amber-100 text-amber-700 dark:bg-amber-900/40 dark:text-amber-400", + icon: , + }, +}; + +/** Derive the DataHub UI base URL from the GMS URL stored in the connection config. + * e.g. "https://instance.acryl.io/gms" → "https://instance.acryl.io" */ +function useDataHubBaseUrl(): string | null { + const [baseUrl, setBaseUrl] = useState(null); + + useEffect(() => { + listConnections() + .then((connections) => { + const dhConn = connections.find( + (c) => c.type === "datahub" || c.type === "datahub_mcp" + ); + if (!dhConn) return; + const urlField = dhConn.fields.find((f) => f.key === "url"); + if (!urlField?.value) return; + // Strip trailing /gms to get the UI base + const raw = urlField.value.replace(/\/gms\/?$/, "").replace(/\/$/, ""); + setBaseUrl(raw); + }) + .catch(() => {}); + }, []); + + return baseUrl; +} + +function buildEntityUrl(baseUrl: string, urn: string): string { + return `${baseUrl}/entity/${encodeURIComponent(urn)}`; +} + +export function ProposalResultsMessage({ payload }: Props) { + const { results } = payload; + const baseUrl = useDataHubBaseUrl(); + + const successCount = results.filter((r) => r.status === "success").length; + const total = results.length; + + return ( +
+ {/* Header strip */} +
+ + Write results + + {successCount} of {total} succeeded + +
+ + {/* Results list */} +
    + {results.map((item) => { + const meta = KIND_META[item.kind]; + const isSuccess = item.status === "success"; + + return ( +
  • + {/* Status icon */} +
    + {isSuccess ? ( + + ) : ( + + )} +
    + + {/* Content */} +
    +
    + {/* Kind badge */} + + {meta.icon} + {meta.label} + + + {item.title} + +
    + + {isSuccess && item.urn && ( +
    + + {item.urn} + + {baseUrl && ( + e.stopPropagation()} + > + + Open + + )} +
    + )} + + {!isSuccess && item.error && ( +

    + {item.error} +

    + )} +
    +
  • + ); + })} +
+ + {/* Footer summary */} +
+ {successCount === total + ? `All ${total} change${total !== 1 ? "s" : ""} applied successfully.` + : successCount === 0 + ? `All ${total} change${total !== 1 ? "s" : ""} failed.` + : `${successCount} of ${total} changes applied; ${total - successCount} failed.`} +
+
+ ); +} diff --git a/frontend/src/components/Chat/messages/ProposalsMessage.tsx b/frontend/src/components/Chat/messages/ProposalsMessage.tsx new file mode 100644 index 0000000..2c5c7e6 --- /dev/null +++ b/frontend/src/components/Chat/messages/ProposalsMessage.tsx @@ -0,0 +1,227 @@ +/** + * ProposalsMessage — renders a set of agent-proposed documentation changes + * as an interactive card. The user selects which proposals to apply and + * clicks Submit (or Skip to bypass all). Once submitted, the card + * transitions to a disabled read-only state. + */ + +import { useState, useEffect } from "react"; +import { FileText, FilePlus, Tag, CheckSquare, Square, Loader2 } from "lucide-react"; +import ReactMarkdown from "react-markdown"; +import remarkGfm from "remark-gfm"; +import type { ProposalItem, ProposalsPayload } from "@/types"; +import { sendProposalSelection } from "@/api/conversations"; + +interface Props { + messageId: string; + conversationId: string; + payload: ProposalsPayload; + submitted: boolean; + onStream: ( + stream: AsyncIterator, + userPayload: { + text: string; + display_text: string; + origin_message_id: string; + selected_ids: string[]; + } + ) => void; +} + +const KIND_META: Record< + ProposalItem["kind"], + { label: string; className: string; icon: React.ReactNode } +> = { + new_doc: { + label: "New doc", + className: "bg-emerald-100 text-emerald-700 dark:bg-emerald-900/40 dark:text-emerald-400", + icon: , + }, + update_doc: { + label: "Update doc", + className: "bg-blue-100 text-blue-700 dark:bg-blue-900/40 dark:text-blue-400", + icon: , + }, + fix_description: { + label: "Fix description", + className: "bg-amber-100 text-amber-700 dark:bg-amber-900/40 dark:text-amber-400", + icon: , + }, +}; + +export function ProposalsMessage({ messageId, conversationId, payload, submitted, onStream }: Props) { + const { proposals } = payload; + const [selected, setSelected] = useState>(() => new Set()); + const [isSubmitting, setIsSubmitting] = useState(false); + + // Clear the spinner once the parent confirms submission (selection chip appended) + useEffect(() => { + if (submitted) setIsSubmitting(false); + }, [submitted]); + + const toggle = (id: string) => { + if (submitted || isSubmitting) return; + setSelected((prev) => { + const next = new Set(prev); + if (next.has(id)) next.delete(id); + else next.add(id); + return next; + }); + }; + + const selectAll = () => { + if (submitted || isSubmitting) return; + setSelected(new Set(proposals.map((p) => p.id))); + }; + + const selectNone = () => { + if (submitted || isSubmitting) return; + setSelected(new Set()); + }; + + const handleSubmit = async (skipAll = false) => { + if (submitted || isSubmitting) return; + setIsSubmitting(true); + try { + const ids = skipAll ? [] : Array.from(selected); + const selectedItems = proposals.filter((p) => ids.includes(p.id)); + const agentText = + selectedItems.length === 0 + ? "Skip all proposals — make no changes." + : `Publish the following proposals: ${selectedItems.map((p) => `"${p.title}"`).join(", ")}.`; + const displayText = + selectedItems.length === 0 + ? "Skipped proposals" + : `Selected ${selectedItems.length} proposal${selectedItems.length === 1 ? "" : "s"}`; + + const stream = sendProposalSelection(conversationId, messageId, ids, proposals); + onStream(stream, { + text: agentText, + display_text: displayText, + origin_message_id: messageId, + selected_ids: ids, + }); + } catch { + setIsSubmitting(false); + } + }; + + const disabled = submitted || isSubmitting; + + return ( +
+ {/* Header strip */} +
+ + Proposed changes + + {proposals.length} proposal{proposals.length !== 1 ? "s" : ""} + +
+ + {/* Prompt / description */} + {payload.prompt && ( +
+ {payload.prompt} +
+ )} + + {/* Proposal list */} +
    + {proposals.map((proposal) => { + const meta = KIND_META[proposal.kind]; + const isChecked = selected.has(proposal.id); + + return ( +
  • toggle(proposal.id)} + > + {/* Checkbox */} +
    + {isChecked ? ( + + ) : ( + + )} +
    + + {/* Content */} +
    +
    + {/* Kind badge */} + + {meta.icon} + {meta.label} + + + {proposal.title} + +
    + {proposal.detail && ( +
    + {proposal.detail} +
    + )} + {proposal.target?.field_path && ( +

    + {proposal.target.field_path} +

    + )} +
    +
  • + ); + })} +
+ + {/* Footer */} +
+ {/* Select all / none */} + {!disabled && ( +
+ + +
+ )} + {disabled &&
} + + {/* Action buttons */} +
+ {!disabled && ( + + )} + +
+
+
+ ); +} diff --git a/frontend/src/components/Chat/messages/SelectionChip.tsx b/frontend/src/components/Chat/messages/SelectionChip.tsx new file mode 100644 index 0000000..11127bd --- /dev/null +++ b/frontend/src/components/Chat/messages/SelectionChip.tsx @@ -0,0 +1,39 @@ +/** + * SelectionChip — compact pill rendered when an MCP App iframe posts a + * `ui/message` on behalf of the user (flavor C). + * + * Appears anchored under the selector iframe that originated the turn + * (referenced by origin_message_id in the persisted payload) instead of + * rendering a full user-bubble. Keeps the transcript auditable without a + * phantom "I picked X" user turn cluttering the conversation. + */ + +import { MousePointerClick } from "lucide-react"; + +export interface SelectionChipPayload { + /** Agent-facing text (may be verbose to prevent LLM re-disambiguation). */ + text: string; + /** Optional short label for display; falls back to `text` if absent. */ + display_text?: string; + source: "mcp_app"; + app_id?: string; + origin_message_id?: string; +} + +interface Props { + payload: SelectionChipPayload; +} + +export function SelectionChip({ payload }: Props) { + const label = payload.display_text?.trim() || payload.text; + return ( +
+ + {label} +
+ ); +} diff --git a/frontend/src/lib/buildUiMessages.ts b/frontend/src/lib/buildUiMessages.ts index 3de3426..0d4d7f3 100644 --- a/frontend/src/lib/buildUiMessages.ts +++ b/frontend/src/lib/buildUiMessages.ts @@ -91,6 +91,9 @@ export function buildUiMessages(records: MessageRecord[]): { case "TOOL_RESULT": case "SQL": case "CHART": + case "MCP_APP": + case "PROPOSALS": + case "PROPOSAL_RESULTS": case "ERROR": result.push({ id: m.id, event_type: m.event_type, role: "assistant", payload: m.payload, created_at: m.created_at }); break; diff --git a/frontend/src/lib/groupMessages.ts b/frontend/src/lib/groupMessages.ts index aa3a76f..7d5698b 100644 --- a/frontend/src/lib/groupMessages.ts +++ b/frontend/src/lib/groupMessages.ts @@ -6,24 +6,31 @@ export interface TurnGroup { key: string; userMsg?: UIMessage; workMsgs: UIMessage[]; + // Interactive cards (proposals + proposal results) — rendered outside the + // collapsible work block since users need to see and act on them. + interactiveMsgs: UIMessage[]; finalMsg?: UIMessage; isActivelyStreaming: boolean; } +const INTERACTIVE_TYPES = new Set(["PROPOSALS", "PROPOSAL_RESULTS", "MCP_APP"]); + export function groupIntoTurns(messages: UIMessage[], globalStreaming: boolean): TurnGroup[] { const groups: TurnGroup[] = []; - let current: TurnGroup = { key: "init", workMsgs: [], isActivelyStreaming: false }; + let current: TurnGroup = { key: "init", workMsgs: [], interactiveMsgs: [], isActivelyStreaming: false }; for (const msg of messages) { if (msg.role === "user") { - if (current.userMsg || current.workMsgs.length > 0 || current.finalMsg) { + if (current.userMsg || current.workMsgs.length > 0 || current.interactiveMsgs.length > 0 || current.finalMsg) { groups.push(current); } - current = { key: msg.id, userMsg: msg, workMsgs: [], isActivelyStreaming: false }; + current = { key: msg.id, userMsg: msg, workMsgs: [], interactiveMsgs: [], isActivelyStreaming: false }; continue; } - if (msg.event_type === "TEXT" && !msg.isThinking) { + if (INTERACTIVE_TYPES.has(msg.event_type)) { + current.interactiveMsgs.push(msg); + } else if (msg.event_type === "TEXT" && !msg.isThinking) { if (msg.isStreaming) current.isActivelyStreaming = true; current.finalMsg = msg; } else if (WORK_TYPES.has(msg.event_type) || (msg.event_type === "TEXT" && msg.isThinking)) { @@ -31,7 +38,7 @@ export function groupIntoTurns(messages: UIMessage[], globalStreaming: boolean): } } - if (current.userMsg || current.workMsgs.length > 0 || current.finalMsg) { + if (current.userMsg || current.workMsgs.length > 0 || current.interactiveMsgs.length > 0 || current.finalMsg) { if (globalStreaming && !current.finalMsg?.isStreaming) { current.isActivelyStreaming = globalStreaming; } diff --git a/frontend/src/lib/mcpApps/MCPAppFrame.tsx b/frontend/src/lib/mcpApps/MCPAppFrame.tsx new file mode 100644 index 0000000..c87d4bd --- /dev/null +++ b/frontend/src/lib/mcpApps/MCPAppFrame.tsx @@ -0,0 +1,137 @@ +/** + * MCPAppFrame — sandboxed iframe wrapper for MCP App HTML. + * + * Security posture: + * - sandbox="allow-scripts" (no allow-same-origin, no allow-forms, etc.) + * - csp attribute on the iframe element (W3C, Chromium-enforced before any + * script runs; stronger than meta CSP because it supports frame-ancestors). + * - When csp is provided and the browser doesn't support the iframe csp + * attribute (non-Chromium), we inject a into + * the HTML before setting srcDoc as a documented weaker fallback. + * + * Wires up useMcpAppBridge so the app receives ui/initialize ack + + * ui/toolResult push as soon as it loads. + * + * Height: starts at MIN_FRAME_HEIGHT_PX and grows via + * `ui/notifications/size-changed` from the app. We never shrink below + * the min, and we ignore non-finite or non-positive values from the + * app (defensive — a buggy app shouldn't collapse the frame to 0). + */ + +import { useCallback, useEffect, useRef, useState } from "react"; +import { useMcpAppBridge } from "./useMcpAppBridge"; + +const MIN_FRAME_HEIGHT_PX = 120; + +export interface MCPAppFrameProps { + /** Raw HTML to render inside the iframe. */ + html: string; + /** Tool input arguments (what the agent called the tool with). */ + toolInput: Record; + /** Tool result to push to the app after ui/initialize. */ + toolResult: unknown; + /** CSP string from the MCP server's _meta.ui.csp field. */ + csp?: string | null; + /** permissions from the MCP server's _meta.ui.permissions field. */ + permissions?: string[]; + /** Called when the app sends ui/notifications/initialized. */ + onReady?: () => void; + className?: string; + /** + * Phase 2: conversation the frame lives in. Passed to useMcpAppBridge so + * tools/call requests can be routed to the correct backend endpoint. + */ + conversationId?: string; + /** + * Phase 2: the app_id for this MCP App instance — used as the path segment + * in POST .../mcp-app/{app_id}/tool-call. + */ + appId?: string; + /** + * Phase 2: the message ID of the MCP_APP message that created this frame. + * Forwarded to useMcpAppBridge so `ui/message` posts include it as + * `origin_message_id`, enabling the SelectionChip to be anchored under this + * selector on replay. + */ + originMessageId?: string; +} + +/** Returns true if the browser supports the iframe `csp` attribute. */ +function supportsCspAttribute(): boolean { + return "csp" in HTMLIFrameElement.prototype; +} + +/** Inject a as a documented weaker fallback. */ +function injectMetaCsp(html: string, csp: string): string { + const metaTag = ``; + if (/]*>/i.test(html)) { + return html.replace(/(]*>)/i, `$1\n ${metaTag}`); + } + // No — prepend before everything + return `${metaTag}\n${html}`; +} + +export function MCPAppFrame({ + html, + toolInput, + toolResult, + csp, + onReady, + className, + conversationId, + appId, + originMessageId, +}: MCPAppFrameProps) { + const iframeRef = useRef(null); + const [iframeWindow, setIframeWindow] = useState(null); + const [frameHeight, setFrameHeight] = useState(MIN_FRAME_HEIGHT_PX); + + // Resolve srcDoc: inject meta CSP fallback for non-Chromium if needed + const srcDoc = (() => { + if (csp && !supportsCspAttribute()) { + return injectMetaCsp(html, csp); + } + return html; + })(); + + const handleSizeChanged = useCallback((height: number) => { + setFrameHeight(Math.max(MIN_FRAME_HEIGHT_PX, Math.ceil(height))); + }, []); + + useMcpAppBridge({ + iframeWindow, + toolInput, + toolResult, + onReady, + onSizeChanged: handleSizeChanged, + conversationId, + appId, + originMessageId, + }); + + const handleLoad = useCallback(() => { + setIframeWindow(iframeRef.current?.contentWindow ?? null); + }, []); + + useEffect(() => { + setIframeWindow(null); + setFrameHeight(MIN_FRAME_HEIGHT_PX); + }, [srcDoc]); + + return ( +