Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,406 changes: 2,328 additions & 78 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions bench/subgraphs/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ async fn delay_middleware(req: Request, next: Next) -> Response {
next.run(req).await
}

async fn add_subgraph_header(req: Request, next: Next) -> Response {
let path = req.uri().path();
let subgraph_name = path.trim_start_matches('/').to_string();

let mut response = next.run(req).await;

if !subgraph_name.is_empty() && subgraph_name != "health" {
if let Ok(header_value) = subgraph_name.parse() {
response.headers_mut().insert("x-subgraph", header_value);
}
}

response
}

async fn health_check_handler() -> impl IntoResponse {
StatusCode::OK
}
Expand Down Expand Up @@ -57,6 +72,7 @@ async fn main() {
post_service(GraphQL::new(reviews::get_subgraph())),
)
.route("/health", get(health_check_handler))
.route_layer(middleware::from_fn(add_subgraph_header))
.route_layer(middleware::from_fn(delay_middleware));

println!("Starting server on http://localhost:4200");
Expand Down
2 changes: 1 addition & 1 deletion bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn router_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
let supergraph_sdl = router_config.supergraph.load().await?;
let parsed_schema = parse_schema(&supergraph_sdl);
let addr = router_config.http.address();
let shared_state = RouterSharedState::new(parsed_schema, router_config);
let shared_state = RouterSharedState::new(parsed_schema, router_config)?;

web::HttpServer::new(move || {
web::App::new()
Expand Down
29 changes: 24 additions & 5 deletions bin/router/src/pipeline/execution.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -6,11 +7,13 @@ use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, Pipel
use crate::pipeline::normalize::GraphQLNormalizationPayload;
use crate::shared_state::RouterSharedState;
use hive_router_plan_executor::execute_query_plan;
use hive_router_plan_executor::execution::plan::QueryPlanExecutionContext;
use hive_router_plan_executor::execution::plan::{
ClientRequestDetails, OperationDetails, PlanExecutionOutput, QueryPlanExecutionContext,
};
use hive_router_plan_executor::introspection::resolve::IntrospectionContext;
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
use hive_router_query_planner::state::supergraph_state::OperationKind;
use http::HeaderName;
use ntex::util::Bytes;
use ntex::web::HttpRequest;

static EXPOSE_QUERY_PLAN_HEADER: HeaderName = HeaderName::from_static("hive-expose-query-plan");
Expand All @@ -23,13 +26,14 @@ enum ExposeQueryPlanMode {
}

#[inline]
pub async fn execute_plan(
pub async fn execute_plan<'a>(
req: &mut HttpRequest,
query: Cow<'a, str>,
app_state: &Arc<RouterSharedState>,
normalized_payload: &Arc<GraphQLNormalizationPayload>,
query_plan_payload: &Arc<QueryPlan>,
variable_payload: &CoerceVariablesPayload,
) -> Result<Bytes, PipelineError> {
) -> Result<PlanExecutionOutput, PipelineError> {
let mut expose_query_plan = ExposeQueryPlanMode::No;

if app_state.router_config.query_planner.allow_expose {
Expand Down Expand Up @@ -64,14 +68,29 @@ pub async fn execute_plan(
execute_query_plan(QueryPlanExecutionContext {
query_plan: query_plan_payload,
projection_plan: &normalized_payload.projection_plan,
headers_plan: &app_state.headers_plan,
variable_values: &variable_payload.variables_map,
extensions,
client_request: ClientRequestDetails {
method: req.method().clone(),
url: req.uri().clone(),
headers: req.headers(),
operation: OperationDetails {
name: normalized_payload.operation_for_plan.name.clone(),
kind: match normalized_payload.operation_for_plan.operation_kind {
Some(OperationKind::Query) => "query",
Some(OperationKind::Mutation) => "mutation",
Some(OperationKind::Subscription) => "subscription",
None => "query",
},
query,
},
},
introspection_context: &introspection_context,
operation_type_name: normalized_payload.root_type_name,
executors: &app_state.subgraph_executor_map,
})
.await
.map(Bytes::from)
.map_err(|err| {
tracing::error!("Failed to execute query plan: {}", err);
req.new_pipeline_error(PipelineErrorVariant::PlanExecutionError(err))
Expand Down
21 changes: 17 additions & 4 deletions bin/router/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;
use std::{borrow::Cow, sync::Arc};

use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
use hive_router_query_planner::utils::cancellation::CancellationToken;
use http::{header::CONTENT_TYPE, HeaderValue, Method};
use ntex::{
Expand Down Expand Up @@ -52,15 +53,25 @@ pub async fn graphql_request_handler(
}

match execute_pipeline(req, body_bytes, state).await {
Ok(response_bytes) => {
Ok(response) => {
let response_bytes = Bytes::from(response.body);
let response_headers = response.headers;

let response_content_type: &'static HeaderValue =
if req.accepts_content_type(*APPLICATION_GRAPHQL_RESPONSE_JSON_STR) {
&APPLICATION_GRAPHQL_RESPONSE_JSON
} else {
&APPLICATION_JSON
};

web::HttpResponse::Ok()
let mut response_builder = web::HttpResponse::Ok();
for (header_name, header_value) in response_headers {
if let Some(header_name) = header_name {
response_builder.header(header_name, header_value);
}
}

response_builder
.header(http::header::CONTENT_TYPE, response_content_type)
.body(response_bytes)
}
Expand All @@ -73,14 +84,15 @@ pub async fn execute_pipeline(
req: &mut HttpRequest,
body_bytes: Bytes,
state: &Arc<RouterSharedState>,
) -> Result<Bytes, PipelineError> {
) -> Result<PlanExecutionOutput, PipelineError> {
let execution_request = get_execution_request(req, body_bytes).await?;
let parser_payload = parse_operation_with_cache(req, state, &execution_request).await?;
validate_operation_with_cache(req, state, &parser_payload).await?;

let progressive_override_ctx = request_override_context()?;
let normalize_payload =
normalize_request_with_cache(req, state, &execution_request, &parser_payload).await?;
let query = Cow::Owned(execution_request.query.clone());
let variable_payload =
coerce_request_variables(req, state, execution_request, &normalize_payload)?;

Expand All @@ -98,6 +110,7 @@ pub async fn execute_pipeline(

let execution_result = execute_plan(
req,
query,
state,
&normalize_payload,
&query_plan_payload,
Expand Down
17 changes: 14 additions & 3 deletions bin/router/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use graphql_parser::schema::Document;
use graphql_tools::validation::{utils::ValidationError, validate::ValidationPlan};
use hive_router_config::HiveRouterConfig;
use hive_router_plan_executor::{
headers::{
compile::compile_headers_plan, errors::HeaderRuleCompileError, plan::HeaderRulesPlan,
},
introspection::schema::{SchemaMetadata, SchemaWithMetadata},
SubgraphExecutorMap,
};
Expand All @@ -25,13 +28,14 @@ pub struct RouterSharedState {
pub parse_cache: Cache<u64, Arc<graphql_parser::query::Document<'static, String>>>,
pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
pub router_config: HiveRouterConfig,
pub headers_plan: HeaderRulesPlan,
}

impl RouterSharedState {
pub fn new(
parsed_supergraph_sdl: Document<'static, String>,
router_config: HiveRouterConfig,
) -> Arc<Self> {
) -> Result<Arc<Self>, SharedStateError> {
let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
let planner =
Planner::new_from_supergraph(&parsed_supergraph_sdl).expect("failed to create planner");
Expand All @@ -43,16 +47,23 @@ impl RouterSharedState {
)
.expect("Failed to create subgraph executor map");

Arc::new(Self {
Ok(Arc::new(Self {
schema_metadata,
planner,
validation_plan: graphql_tools::validation::rules::default_rules_validation_plan(),
headers_plan: compile_headers_plan(&router_config.headers).map_err(Box::new)?,
subgraph_executor_map,
plan_cache: moka::future::Cache::new(1000),
validate_cache: moka::future::Cache::new(1000),
parse_cache: moka::future::Cache::new(1000),
normalize_cache: moka::future::Cache::new(1000),
router_config,
})
}))
}
}

#[derive(thiserror::Error, Debug)]
pub enum SharedStateError {
#[error("invalid headers config: {0}")]
HeaderRuleCompileError(#[from] Box<HeaderRuleCompileError>),
}
Loading
Loading