Skip to content

Commit a40dfcf

Browse files
feat(router): Advanced Header Management (#438)
Co-authored-by: Dotan Simha <[email protected]>
1 parent 50f4bbc commit a40dfcf

File tree

28 files changed

+6126
-272
lines changed

28 files changed

+6126
-272
lines changed

Cargo.lock

Lines changed: 2328 additions & 78 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench/subgraphs/main.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ async fn delay_middleware(req: Request, next: Next) -> Response {
3030
next.run(req).await
3131
}
3232

33+
async fn add_subgraph_header(req: Request, next: Next) -> Response {
34+
let path = req.uri().path();
35+
let subgraph_name = path.trim_start_matches('/').to_string();
36+
37+
let mut response = next.run(req).await;
38+
39+
if !subgraph_name.is_empty() && subgraph_name != "health" {
40+
if let Ok(header_value) = subgraph_name.parse() {
41+
response.headers_mut().insert("x-subgraph", header_value);
42+
}
43+
}
44+
45+
response
46+
}
47+
3348
async fn health_check_handler() -> impl IntoResponse {
3449
StatusCode::OK
3550
}
@@ -57,6 +72,7 @@ async fn main() {
5772
post_service(GraphQL::new(reviews::get_subgraph())),
5873
)
5974
.route("/health", get(health_check_handler))
75+
.route_layer(middleware::from_fn(add_subgraph_header))
6076
.route_layer(middleware::from_fn(delay_middleware));
6177

6278
println!("Starting server on http://localhost:4200");

bin/router/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub async fn router_entrypoint() -> Result<(), Box<dyn std::error::Error>> {
3636
let supergraph_sdl = router_config.supergraph.load().await?;
3737
let parsed_schema = parse_schema(&supergraph_sdl);
3838
let addr = router_config.http.address();
39-
let shared_state = RouterSharedState::new(parsed_schema, router_config);
39+
let shared_state = RouterSharedState::new(parsed_schema, router_config)?;
4040

4141
web::HttpServer::new(move || {
4242
web::App::new()

bin/router/src/pipeline/execution.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::borrow::Cow;
12
use std::collections::HashMap;
23
use std::sync::Arc;
34

@@ -6,11 +7,13 @@ use crate::pipeline::error::{PipelineError, PipelineErrorFromAcceptHeader, Pipel
67
use crate::pipeline::normalize::GraphQLNormalizationPayload;
78
use crate::shared_state::RouterSharedState;
89
use hive_router_plan_executor::execute_query_plan;
9-
use hive_router_plan_executor::execution::plan::QueryPlanExecutionContext;
10+
use hive_router_plan_executor::execution::plan::{
11+
ClientRequestDetails, OperationDetails, PlanExecutionOutput, QueryPlanExecutionContext,
12+
};
1013
use hive_router_plan_executor::introspection::resolve::IntrospectionContext;
1114
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
15+
use hive_router_query_planner::state::supergraph_state::OperationKind;
1216
use http::HeaderName;
13-
use ntex::util::Bytes;
1417
use ntex::web::HttpRequest;
1518

1619
static EXPOSE_QUERY_PLAN_HEADER: HeaderName = HeaderName::from_static("hive-expose-query-plan");
@@ -23,13 +26,14 @@ enum ExposeQueryPlanMode {
2326
}
2427

2528
#[inline]
26-
pub async fn execute_plan(
29+
pub async fn execute_plan<'a>(
2730
req: &mut HttpRequest,
31+
query: Cow<'a, str>,
2832
app_state: &Arc<RouterSharedState>,
2933
normalized_payload: &Arc<GraphQLNormalizationPayload>,
3034
query_plan_payload: &Arc<QueryPlan>,
3135
variable_payload: &CoerceVariablesPayload,
32-
) -> Result<Bytes, PipelineError> {
36+
) -> Result<PlanExecutionOutput, PipelineError> {
3337
let mut expose_query_plan = ExposeQueryPlanMode::No;
3438

3539
if app_state.router_config.query_planner.allow_expose {
@@ -64,14 +68,29 @@ pub async fn execute_plan(
6468
execute_query_plan(QueryPlanExecutionContext {
6569
query_plan: query_plan_payload,
6670
projection_plan: &normalized_payload.projection_plan,
71+
headers_plan: &app_state.headers_plan,
6772
variable_values: &variable_payload.variables_map,
6873
extensions,
74+
client_request: ClientRequestDetails {
75+
method: req.method().clone(),
76+
url: req.uri().clone(),
77+
headers: req.headers(),
78+
operation: OperationDetails {
79+
name: normalized_payload.operation_for_plan.name.clone(),
80+
kind: match normalized_payload.operation_for_plan.operation_kind {
81+
Some(OperationKind::Query) => "query",
82+
Some(OperationKind::Mutation) => "mutation",
83+
Some(OperationKind::Subscription) => "subscription",
84+
None => "query",
85+
},
86+
query,
87+
},
88+
},
6989
introspection_context: &introspection_context,
7090
operation_type_name: normalized_payload.root_type_name,
7191
executors: &app_state.subgraph_executor_map,
7292
})
7393
.await
74-
.map(Bytes::from)
7594
.map_err(|err| {
7695
tracing::error!("Failed to execute query plan: {}", err);
7796
req.new_pipeline_error(PipelineErrorVariant::PlanExecutionError(err))

bin/router/src/pipeline/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use std::sync::Arc;
1+
use std::{borrow::Cow, sync::Arc};
22

3+
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
34
use hive_router_query_planner::utils::cancellation::CancellationToken;
45
use http::{header::CONTENT_TYPE, HeaderValue, Method};
56
use ntex::{
@@ -52,15 +53,25 @@ pub async fn graphql_request_handler(
5253
}
5354

5455
match execute_pipeline(req, body_bytes, state).await {
55-
Ok(response_bytes) => {
56+
Ok(response) => {
57+
let response_bytes = Bytes::from(response.body);
58+
let response_headers = response.headers;
59+
5660
let response_content_type: &'static HeaderValue =
5761
if req.accepts_content_type(*APPLICATION_GRAPHQL_RESPONSE_JSON_STR) {
5862
&APPLICATION_GRAPHQL_RESPONSE_JSON
5963
} else {
6064
&APPLICATION_JSON
6165
};
6266

63-
web::HttpResponse::Ok()
67+
let mut response_builder = web::HttpResponse::Ok();
68+
for (header_name, header_value) in response_headers {
69+
if let Some(header_name) = header_name {
70+
response_builder.header(header_name, header_value);
71+
}
72+
}
73+
74+
response_builder
6475
.header(http::header::CONTENT_TYPE, response_content_type)
6576
.body(response_bytes)
6677
}
@@ -73,14 +84,15 @@ pub async fn execute_pipeline(
7384
req: &mut HttpRequest,
7485
body_bytes: Bytes,
7586
state: &Arc<RouterSharedState>,
76-
) -> Result<Bytes, PipelineError> {
87+
) -> Result<PlanExecutionOutput, PipelineError> {
7788
let execution_request = get_execution_request(req, body_bytes).await?;
7889
let parser_payload = parse_operation_with_cache(req, state, &execution_request).await?;
7990
validate_operation_with_cache(req, state, &parser_payload).await?;
8091

8192
let progressive_override_ctx = request_override_context()?;
8293
let normalize_payload =
8394
normalize_request_with_cache(req, state, &execution_request, &parser_payload).await?;
95+
let query = Cow::Owned(execution_request.query.clone());
8496
let variable_payload =
8597
coerce_request_variables(req, state, execution_request, &normalize_payload)?;
8698

@@ -98,6 +110,7 @@ pub async fn execute_pipeline(
98110

99111
let execution_result = execute_plan(
100112
req,
113+
query,
101114
state,
102115
&normalize_payload,
103116
&query_plan_payload,

bin/router/src/shared_state.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ use graphql_parser::schema::Document;
44
use graphql_tools::validation::{utils::ValidationError, validate::ValidationPlan};
55
use hive_router_config::HiveRouterConfig;
66
use hive_router_plan_executor::{
7+
headers::{
8+
compile::compile_headers_plan, errors::HeaderRuleCompileError, plan::HeaderRulesPlan,
9+
},
710
introspection::schema::{SchemaMetadata, SchemaWithMetadata},
811
SubgraphExecutorMap,
912
};
@@ -25,13 +28,14 @@ pub struct RouterSharedState {
2528
pub parse_cache: Cache<u64, Arc<graphql_parser::query::Document<'static, String>>>,
2629
pub normalize_cache: Cache<u64, Arc<GraphQLNormalizationPayload>>,
2730
pub router_config: HiveRouterConfig,
31+
pub headers_plan: HeaderRulesPlan,
2832
}
2933

3034
impl RouterSharedState {
3135
pub fn new(
3236
parsed_supergraph_sdl: Document<'static, String>,
3337
router_config: HiveRouterConfig,
34-
) -> Arc<Self> {
38+
) -> Result<Arc<Self>, SharedStateError> {
3539
let supergraph_state = SupergraphState::new(&parsed_supergraph_sdl);
3640
let planner =
3741
Planner::new_from_supergraph(&parsed_supergraph_sdl).expect("failed to create planner");
@@ -43,16 +47,23 @@ impl RouterSharedState {
4347
)
4448
.expect("Failed to create subgraph executor map");
4549

46-
Arc::new(Self {
50+
Ok(Arc::new(Self {
4751
schema_metadata,
4852
planner,
4953
validation_plan: graphql_tools::validation::rules::default_rules_validation_plan(),
54+
headers_plan: compile_headers_plan(&router_config.headers).map_err(Box::new)?,
5055
subgraph_executor_map,
5156
plan_cache: moka::future::Cache::new(1000),
5257
validate_cache: moka::future::Cache::new(1000),
5358
parse_cache: moka::future::Cache::new(1000),
5459
normalize_cache: moka::future::Cache::new(1000),
5560
router_config,
56-
})
61+
}))
5762
}
5863
}
64+
65+
#[derive(thiserror::Error, Debug)]
66+
pub enum SharedStateError {
67+
#[error("invalid headers config: {0}")]
68+
HeaderRuleCompileError(#[from] Box<HeaderRuleCompileError>),
69+
}

0 commit comments

Comments
 (0)