Skip to content

Commit d6f0284

Browse files
committed
enhance(executor): normalize flatten errors for the final response
1 parent ef6c8dd commit d6f0284

File tree

4 files changed

+306
-26
lines changed

4 files changed

+306
-26
lines changed

lib/executor/src/context.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use hive_router_query_planner::planner::plan_nodes::{FetchNode, FetchRewrite, Qu
44

55
use crate::{
66
headers::plan::ResponseHeaderAggregator,
7-
response::{graphql_error::GraphQLError, storage::ResponsesStorage, value::Value},
7+
response::{
8+
graphql_error::{GraphQLError, GraphQLErrorPathSegment},
9+
storage::ResponsesStorage,
10+
value::Value,
11+
},
812
};
913

1014
pub struct ExecutionContext<'a> {
@@ -38,10 +42,44 @@ impl<'a> ExecutionContext<'a> {
3842
}
3943
}
4044

41-
pub fn handle_errors(&mut self, errors: Option<Vec<GraphQLError>>) {
42-
if let Some(errors) = errors {
43-
for error in errors {
44-
self.errors.push(error);
45+
pub fn handle_errors(
46+
&mut self,
47+
errors: Option<Vec<GraphQLError>>,
48+
entity_index_error_map: Option<HashMap<&usize, Vec<Vec<GraphQLErrorPathSegment>>>>,
49+
) {
50+
if let Some(response_errors) = errors {
51+
for response_error in response_errors {
52+
match &response_error.path.as_deref() {
53+
Some(
54+
[GraphQLErrorPathSegment::String(maybe_entities), GraphQLErrorPathSegment::Index(entity_index), rest_of_path @ ..],
55+
) if maybe_entities == "_entities" => {
56+
if let Some(entity_error_paths) = entity_index_error_map
57+
.as_ref()
58+
.and_then(|m| m.get(entity_index))
59+
{
60+
for entity_error_path in entity_error_paths {
61+
let mut new_error_path = entity_error_path.clone();
62+
if !rest_of_path.is_empty() {
63+
new_error_path.extend_from_slice(rest_of_path);
64+
}
65+
self.errors.push(GraphQLError {
66+
path: Some(new_error_path),
67+
..response_error.clone()
68+
});
69+
}
70+
} else {
71+
// If we don't have the entity index in the map, just push the original error
72+
// without any path, because `_entities` is not a valid path in the final response.
73+
self.errors.push(GraphQLError {
74+
path: None,
75+
..response_error
76+
});
77+
}
78+
}
79+
_ => {
80+
self.errors.push(response_error);
81+
}
82+
}
4583
}
4684
}
4785
}

lib/executor/src/execution/plan.rs

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ impl<'exec> Executor<'exec> {
420420
ctx: &mut ExecutionContext<'exec>,
421421
response_bytes: Bytes,
422422
fetch_node_id: i64,
423-
) -> Option<(Value<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
423+
) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
424424
let idx = ctx.response_storage.add_response(response_bytes);
425425
// SAFETY: The `bytes` are transmuted to the lifetime `'a` of the `ExecutionContext`.
426426
// This is safe because the `response_storage` is part of the `ExecutionContext` (`ctx`)
@@ -452,9 +452,7 @@ impl<'exec> Executor<'exec> {
452452
}
453453
};
454454

455-
ctx.handle_errors(response.errors);
456-
457-
Some((response.data, output_rewrites))
455+
Some((response, output_rewrites))
458456
}
459457

460458
fn process_job_result(
@@ -472,16 +470,18 @@ impl<'exec> Executor<'exec> {
472470
&mut ctx.response_headers_aggregator,
473471
)?;
474472

475-
if let Some((mut data, output_rewrites)) =
473+
if let Some((mut response, output_rewrites)) =
476474
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
477475
{
476+
ctx.handle_errors(response.errors, None);
478477
if let Some(output_rewrites) = output_rewrites {
479478
for output_rewrite in output_rewrites {
480-
output_rewrite.rewrite(&self.schema_metadata.possible_types, &mut data);
479+
output_rewrite
480+
.rewrite(&self.schema_metadata.possible_types, &mut response.data);
481481
}
482482
}
483483

484-
deep_merge(&mut ctx.final_response, data);
484+
deep_merge(&mut ctx.final_response, response.data);
485485
}
486486
}
487487
ExecutionJob::FlattenFetch(job) => {
@@ -493,10 +493,10 @@ impl<'exec> Executor<'exec> {
493493
&mut ctx.response_headers_aggregator,
494494
)?;
495495

496-
if let Some((mut data, output_rewrites)) =
496+
if let Some((mut response, output_rewrites)) =
497497
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
498498
{
499-
if let Some(mut entities) = data.take_entities() {
499+
if let Some(mut entities) = response.data.take_entities() {
500500
if let Some(output_rewrites) = output_rewrites {
501501
for output_rewrite in output_rewrites {
502502
for entity in &mut entities {
@@ -508,15 +508,34 @@ impl<'exec> Executor<'exec> {
508508

509509
let mut index = 0;
510510
let normalized_path = job.flatten_node_path.as_slice();
511+
// If there is an error in the response, then collect the paths for normalizing the error
512+
let (initial_error_path_arr, mut entity_index_error_map) =
513+
if response.errors.is_some() {
514+
(
515+
Some(Vec::with_capacity(normalized_path.len() + 2)),
516+
Some(HashMap::with_capacity(entities.len())),
517+
)
518+
} else {
519+
(None, None)
520+
};
511521
traverse_and_callback_mut(
512522
&mut ctx.final_response,
513523
normalized_path,
514524
self.schema_metadata,
515-
&mut |target| {
525+
initial_error_path_arr,
526+
&mut |target, error_path| {
516527
let hash = job.representation_hashes[index];
517528
if let Some(entity_index) =
518529
job.representation_hash_to_index.get(&hash)
519530
{
531+
if let (Some(error_path), Some(entity_index_error_map)) =
532+
(error_path, entity_index_error_map.as_mut())
533+
{
534+
let error_paths = entity_index_error_map
535+
.entry(entity_index)
536+
.or_insert_with(Vec::new);
537+
error_paths.push(error_path);
538+
}
520539
if let Some(entity) = entities.get(*entity_index) {
521540
// SAFETY: `new_val` is a clone of an entity that lives for `'a`.
522541
// The transmute is to satisfy the compiler, but the lifetime
@@ -529,6 +548,7 @@ impl<'exec> Executor<'exec> {
529548
index += 1;
530549
},
531550
);
551+
ctx.handle_errors(response.errors, entity_index_error_map);
532552
}
533553
}
534554
}
@@ -714,6 +734,8 @@ fn select_fetch_variables<'a>(
714734

715735
#[cfg(test)]
716736
mod tests {
737+
use crate::context::ExecutionContext;
738+
717739
use super::select_fetch_variables;
718740
use sonic_rs::Value;
719741
use std::collections::{BTreeSet, HashMap};
@@ -768,4 +790,60 @@ mod tests {
768790

769791
assert!(selected.is_none());
770792
}
793+
#[test]
794+
/**
795+
* We have the same entity in two different paths ["a", 0] and ["b", 1],
796+
* and the subgraph response has an error for this entity.
797+
* So we should duplicate the error for both paths.
798+
*/
799+
fn normalize_entity_errors_correctly() {
800+
use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
801+
use std::collections::HashMap;
802+
let mut ctx = ExecutionContext::default();
803+
let mut entity_index_error_map: HashMap<&usize, Vec<Vec<GraphQLErrorPathSegment>>> =
804+
HashMap::new();
805+
entity_index_error_map.insert(
806+
&0,
807+
vec![
808+
vec![
809+
GraphQLErrorPathSegment::String("a".to_string()),
810+
GraphQLErrorPathSegment::Index(0),
811+
],
812+
vec![
813+
GraphQLErrorPathSegment::String("b".to_string()),
814+
GraphQLErrorPathSegment::Index(1),
815+
],
816+
],
817+
);
818+
let response_errors = vec![GraphQLError {
819+
message: "Error 1".to_string(),
820+
locations: None,
821+
path: Some(vec![
822+
GraphQLErrorPathSegment::String("_entities".to_string()),
823+
GraphQLErrorPathSegment::Index(0),
824+
GraphQLErrorPathSegment::String("field1".to_string()),
825+
]),
826+
extensions: None,
827+
}];
828+
ctx.handle_errors(Some(response_errors), Some(entity_index_error_map));
829+
assert_eq!(ctx.errors.len(), 2);
830+
assert_eq!(ctx.errors[0].message, "Error 1");
831+
assert_eq!(
832+
ctx.errors[0].path.as_ref().unwrap(),
833+
&vec![
834+
GraphQLErrorPathSegment::String("a".to_string()),
835+
GraphQLErrorPathSegment::Index(0),
836+
GraphQLErrorPathSegment::String("field1".to_string())
837+
]
838+
);
839+
assert_eq!(ctx.errors[1].message, "Error 1");
840+
assert_eq!(
841+
ctx.errors[1].path.as_ref().unwrap(),
842+
&vec![
843+
GraphQLErrorPathSegment::String("b".to_string()),
844+
GraphQLErrorPathSegment::Index(1),
845+
GraphQLErrorPathSegment::String("field1".to_string())
846+
]
847+
);
848+
}
771849
}

lib/executor/src/response/graphql_error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub struct GraphQLErrorLocation {
5252
pub column: usize,
5353
}
5454

55-
#[derive(Clone, Debug, Serialize)]
55+
#[derive(Clone, Debug, Serialize, PartialEq)]
5656
pub enum GraphQLErrorPathSegment {
5757
String(String),
5858
Index(usize),

0 commit comments

Comments
 (0)