Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 351 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ petgraph = "0.8.3"
postcard = { version = "1", features = ["alloc"] }
prost = "0.14.1"
pyo3 = "0.29"
rmcp = { version = "1.7.0", default-features = false }
rmp-serde = "1"
rustc-hash = "2"
schemars = "1"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
smallvec = {version = "1.15.1", features = ["serde"] }
Expand Down
15 changes: 15 additions & 0 deletions domains/query_engine/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ publish.workspace = true
[features]
swagger = ["dep:utoipa", "dep:utoipa-swagger-ui"]
ui = ["dep:mime_guess", "dep:rust-embed"]
mcp = [
"dep:rmcp",
"dep:schemars",
"dep:serde_json",
"tokio/io-std",
"tokio/macros",
]

[dependencies]
axum = { version = "0.8.7" }
Expand All @@ -19,7 +26,15 @@ quent-query-engine-analyzer = { path = "../../../domains/query_engine/analyzer"
quent-query-engine-ui = { path = "../../../domains/query_engine/ui" }
quent-time = { path = "../../../crates/time" }
quent-ui = { path = "../../../crates/ui" }
rmcp = { workspace = true, features = [
"server",
"macros",
"transport-io",
"transport-streamable-http-server",
], optional = true }
schemars = { workspace = true, optional = true }
serde.workspace = true
serde_json = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "sync"]}
tonic.workspace = true
Expand Down
35 changes: 34 additions & 1 deletion domains/query_engine/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use tower_http::cors::CorsLayer;

pub mod analyzer_cache;
pub mod error;
#[cfg(feature = "mcp")]
pub mod mcp;
mod state;
mod timeline_cache;
mod ui;
Expand Down Expand Up @@ -66,8 +68,16 @@ where
timelines: TimelineCache::new(),
};

#[cfg(feature = "mcp")]
let mcp_service = mcp::http_service(state.clone());

let mut http_routes = axum::Router::new().nest("/api/engines", ui::routes(state));

#[cfg(feature = "mcp")]
{
http_routes = http_routes.nest_service("/mcp", mcp_service);
}

#[cfg(feature = "swagger")]
{
use utoipa::OpenApi;
Expand All @@ -78,14 +88,37 @@ where
}

if let Some(cors) = cors {
let cors = CorsLayer::new()
let mut cors = CorsLayer::new()
.allow_origin(cors.parse::<axum::http::HeaderValue>().unwrap())
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::OPTIONS,
])
.allow_headers([axum::http::header::CONTENT_TYPE]);

// The streamable HTTP MCP transport adds session/protocol headers, uses
// DELETE to tear down a session, and returns the session id to the
// client, so browser-based MCP clients need these allowed and exposed.
#[cfg(feature = "mcp")]
{
use axum::http::{HeaderName, Method, header::CONTENT_TYPE};
const MCP_SESSION_ID: HeaderName = HeaderName::from_static("mcp-session-id");
const MCP_PROTOCOL_VERSION: HeaderName =
HeaderName::from_static("mcp-protocol-version");
// Sent when a browser client resumes an interrupted SSE stream.
const LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
cors = cors
.allow_methods([Method::GET, Method::POST, Method::OPTIONS, Method::DELETE])
.allow_headers([
CONTENT_TYPE,
MCP_SESSION_ID,
MCP_PROTOCOL_VERSION,
LAST_EVENT_ID,
])
.expose_headers([MCP_SESSION_ID]);
}

http_routes = http_routes.layer(cors);
}

Expand Down
Loading
Loading