Skip to content

Commit 30e643d

Browse files
authored
feat(executor): normalize flatten errors for the final response (#454)
1 parent 6244e07 commit 30e643d

File tree

4 files changed

+374
-29
lines changed

4 files changed

+374
-29
lines changed

lib/executor/src/context.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ use hive_router_query_planner::planner::plan_nodes::{FetchNode, FetchRewrite, Qu
44

55
use crate::{
66
headers::plan::ResponseHeaderAggregator,
7-
response::{graphql_error::GraphQLError, storage::ResponsesStorage, value::Value},
7+
response::{
8+
graphql_error::{GraphQLError, GraphQLErrorPath},
9+
storage::ResponsesStorage,
10+
value::Value,
11+
},
812
};
913

1014
pub struct ExecutionContext<'a> {
@@ -38,10 +42,20 @@ impl<'a> ExecutionContext<'a> {
3842
}
3943
}
4044

41-
pub fn handle_errors(&mut self, errors: Option<Vec<GraphQLError>>) {
42-
if let Some(errors) = errors {
43-
for error in errors {
44-
self.errors.push(error);
45+
pub fn handle_errors(
46+
&mut self,
47+
errors: Option<Vec<GraphQLError>>,
48+
entity_index_error_map: Option<HashMap<&usize, Vec<GraphQLErrorPath>>>,
49+
) {
50+
if let Some(response_errors) = errors {
51+
for response_error in response_errors {
52+
if let Some(entity_index_error_map) = &entity_index_error_map {
53+
let normalized_errors =
54+
response_error.normalize_entity_error(entity_index_error_map);
55+
self.errors.extend(normalized_errors);
56+
} else {
57+
self.errors.push(response_error);
58+
}
4559
}
4660
}
4761
}

lib/executor/src/execution/plan.rs

Lines changed: 95 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ use crate::{
3636
response::project_by_operation,
3737
},
3838
response::{
39-
graphql_error::GraphQLError, merge::deep_merge, subgraph_response::SubgraphResponse,
39+
graphql_error::{GraphQLError, GraphQLErrorPath},
40+
merge::deep_merge,
41+
subgraph_response::SubgraphResponse,
4042
value::Value,
4143
},
4244
utils::{
@@ -420,7 +422,7 @@ impl<'exec> Executor<'exec> {
420422
ctx: &mut ExecutionContext<'exec>,
421423
response_bytes: Bytes,
422424
fetch_node_id: i64,
423-
) -> Option<(Value<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
425+
) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
424426
let idx = ctx.response_storage.add_response(response_bytes);
425427
// SAFETY: The `bytes` are transmuted to the lifetime `'a` of the `ExecutionContext`.
426428
// This is safe because the `response_storage` is part of the `ExecutionContext` (`ctx`)
@@ -452,9 +454,7 @@ impl<'exec> Executor<'exec> {
452454
}
453455
};
454456

455-
ctx.handle_errors(response.errors);
456-
457-
Some((response.data, output_rewrites))
457+
Some((response, output_rewrites))
458458
}
459459

460460
fn process_job_result(
@@ -472,16 +472,18 @@ impl<'exec> Executor<'exec> {
472472
&mut ctx.response_headers_aggregator,
473473
)?;
474474

475-
if let Some((mut data, output_rewrites)) =
475+
if let Some((mut response, output_rewrites)) =
476476
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
477477
{
478+
ctx.handle_errors(response.errors, None);
478479
if let Some(output_rewrites) = output_rewrites {
479480
for output_rewrite in output_rewrites {
480-
output_rewrite.rewrite(&self.schema_metadata.possible_types, &mut data);
481+
output_rewrite
482+
.rewrite(&self.schema_metadata.possible_types, &mut response.data);
481483
}
482484
}
483485

484-
deep_merge(&mut ctx.final_response, data);
486+
deep_merge(&mut ctx.final_response, response.data);
485487
}
486488
}
487489
ExecutionJob::FlattenFetch(job) => {
@@ -493,10 +495,10 @@ impl<'exec> Executor<'exec> {
493495
&mut ctx.response_headers_aggregator,
494496
)?;
495497

496-
if let Some((mut data, output_rewrites)) =
498+
if let Some((mut response, output_rewrites)) =
497499
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
498500
{
499-
if let Some(mut entities) = data.take_entities() {
501+
if let Some(mut entities) = response.data.take_entities() {
500502
if let Some(output_rewrites) = output_rewrites {
501503
for output_rewrite in output_rewrites {
502504
for entity in &mut entities {
@@ -508,15 +510,33 @@ impl<'exec> Executor<'exec> {
508510

509511
let mut index = 0;
510512
let normalized_path = job.flatten_node_path.as_slice();
513+
// If there is an error in the response, then collect the paths for normalizing the error
514+
let initial_error_path = response
515+
.errors
516+
.as_ref()
517+
.map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
518+
let mut entity_index_error_map = response
519+
.errors
520+
.as_ref()
521+
.map(|_| HashMap::with_capacity(entities.len()));
511522
traverse_and_callback_mut(
512523
&mut ctx.final_response,
513524
normalized_path,
514525
self.schema_metadata,
515-
&mut |target| {
526+
initial_error_path,
527+
&mut |target, error_path| {
516528
let hash = job.representation_hashes[index];
517529
if let Some(entity_index) =
518530
job.representation_hash_to_index.get(&hash)
519531
{
532+
if let (Some(error_path), Some(entity_index_error_map)) =
533+
(error_path, entity_index_error_map.as_mut())
534+
{
535+
let error_paths = entity_index_error_map
536+
.entry(entity_index)
537+
.or_insert_with(Vec::new);
538+
error_paths.push(error_path);
539+
}
520540
if let Some(entity) = entities.get(*entity_index) {
521541
// SAFETY: `new_val` is a clone of an entity that lives for `'a`.
522542
// The transmute is to satisfy the compiler, but the lifetime
@@ -529,6 +549,7 @@ impl<'exec> Executor<'exec> {
529549
index += 1;
530550
},
531551
);
552+
ctx.handle_errors(response.errors, entity_index_error_map);
532553
}
533554
}
534555
}
@@ -714,6 +735,8 @@ fn select_fetch_variables<'a>(
714735

715736
#[cfg(test)]
716737
mod tests {
738+
use crate::{context::ExecutionContext, response::graphql_error::GraphQLErrorPath};
739+
717740
use super::select_fetch_variables;
718741
use sonic_rs::Value;
719742
use std::collections::{BTreeSet, HashMap};
@@ -768,4 +791,65 @@ mod tests {
768791

769792
assert!(selected.is_none());
770793
}
794+
#[test]
795+
/**
796+
* We have the same entity in two different paths ["a", 0] and ["b", 1],
797+
* and the subgraph response has an error for this entity.
798+
* So we should duplicate the error for both paths.
799+
*/
800+
fn normalize_entity_errors_correctly() {
801+
use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
802+
use std::collections::HashMap;
803+
let mut ctx = ExecutionContext::default();
804+
let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
805+
entity_index_error_map.insert(
806+
&0,
807+
vec![
808+
GraphQLErrorPath {
809+
segments: vec![
810+
GraphQLErrorPathSegment::String("a".to_string()),
811+
GraphQLErrorPathSegment::Index(0),
812+
],
813+
},
814+
GraphQLErrorPath {
815+
segments: vec![
816+
GraphQLErrorPathSegment::String("b".to_string()),
817+
GraphQLErrorPathSegment::Index(1),
818+
],
819+
},
820+
],
821+
);
822+
let response_errors = vec![GraphQLError {
823+
message: "Error 1".to_string(),
824+
locations: None,
825+
path: Some(GraphQLErrorPath {
826+
segments: vec![
827+
GraphQLErrorPathSegment::String("_entities".to_string()),
828+
GraphQLErrorPathSegment::Index(0),
829+
GraphQLErrorPathSegment::String("field1".to_string()),
830+
],
831+
}),
832+
extensions: None,
833+
}];
834+
ctx.handle_errors(Some(response_errors), Some(entity_index_error_map));
835+
assert_eq!(ctx.errors.len(), 2);
836+
assert_eq!(ctx.errors[0].message, "Error 1");
837+
assert_eq!(
838+
ctx.errors[0].path.as_ref().unwrap().segments,
839+
vec![
840+
GraphQLErrorPathSegment::String("a".to_string()),
841+
GraphQLErrorPathSegment::Index(0),
842+
GraphQLErrorPathSegment::String("field1".to_string())
843+
]
844+
);
845+
assert_eq!(ctx.errors[1].message, "Error 1");
846+
assert_eq!(
847+
ctx.errors[1].path.as_ref().unwrap().segments,
848+
vec![
849+
GraphQLErrorPathSegment::String("b".to_string()),
850+
GraphQLErrorPathSegment::Index(1),
851+
GraphQLErrorPathSegment::String("field1".to_string())
852+
]
853+
);
854+
}
771855
}

lib/executor/src/response/graphql_error.rs

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use graphql_parser::Pos;
22
use graphql_tools::validation::utils::ValidationError;
33
use serde::{de, Deserialize, Deserializer, Serialize};
44
use sonic_rs::Value;
5-
use std::fmt;
5+
use std::{collections::HashMap, fmt};
66

77
#[derive(Clone, Debug, Deserialize, Serialize)]
88
#[serde(rename_all = "camelCase")]
@@ -11,7 +11,7 @@ pub struct GraphQLError {
1111
#[serde(default, skip_serializing_if = "Option::is_none")]
1212
pub locations: Option<Vec<GraphQLErrorLocation>>,
1313
#[serde(default, skip_serializing_if = "Option::is_none")]
14-
pub path: Option<Vec<GraphQLErrorPathSegment>>,
14+
pub path: Option<GraphQLErrorPath>,
1515
pub extensions: Option<Value>,
1616
}
1717

@@ -46,13 +46,43 @@ impl From<&Pos> for GraphQLErrorLocation {
4646
}
4747
}
4848

49+
impl GraphQLError {
50+
pub fn entity_index_and_path<'a>(&'a self) -> Option<EntityIndexAndPath<'a>> {
51+
self.path.as_ref().and_then(|p| p.entity_index_and_path())
52+
}
53+
54+
pub fn normalize_entity_error(
55+
self,
56+
entity_index_error_map: &HashMap<&usize, Vec<GraphQLErrorPath>>,
57+
) -> Vec<GraphQLError> {
58+
if let Some(entity_index_and_path) = &self.entity_index_and_path() {
59+
if let Some(entity_error_paths) =
60+
entity_index_error_map.get(&entity_index_and_path.entity_index)
61+
{
62+
return entity_error_paths
63+
.iter()
64+
.map(|error_path| {
65+
let mut new_error_path = error_path.clone();
66+
new_error_path.extend_from_slice(entity_index_and_path.rest_of_path);
67+
GraphQLError {
68+
path: Some(new_error_path),
69+
..self.clone()
70+
}
71+
})
72+
.collect();
73+
}
74+
}
75+
vec![self]
76+
}
77+
}
78+
4979
#[derive(Clone, Debug, Deserialize, Serialize)]
5080
pub struct GraphQLErrorLocation {
5181
pub line: usize,
5282
pub column: usize,
5383
}
5484

55-
#[derive(Clone, Debug, Serialize)]
85+
#[derive(Clone, Debug, Serialize, PartialEq)]
5686
pub enum GraphQLErrorPathSegment {
5787
String(String),
5888
Index(usize),
@@ -110,3 +140,53 @@ impl<'de> Deserialize<'de> for GraphQLErrorPathSegment {
110140
deserializer.deserialize_any(PathSegmentVisitor)
111141
}
112142
}
143+
144+
#[derive(Clone, Debug, Deserialize, Serialize, Default)]
145+
pub struct GraphQLErrorPath {
146+
#[serde(flatten)]
147+
pub segments: Vec<GraphQLErrorPathSegment>,
148+
}
149+
150+
pub struct EntityIndexAndPath<'a> {
151+
pub entity_index: usize,
152+
pub rest_of_path: &'a [GraphQLErrorPathSegment],
153+
}
154+
155+
impl GraphQLErrorPath {
156+
pub fn with_capacity(capacity: usize) -> Self {
157+
GraphQLErrorPath {
158+
segments: Vec::with_capacity(capacity),
159+
}
160+
}
161+
pub fn concat(&self, segment: GraphQLErrorPathSegment) -> Self {
162+
let mut new_path = self.segments.clone();
163+
new_path.push(segment);
164+
GraphQLErrorPath { segments: new_path }
165+
}
166+
167+
pub fn concat_index(&self, index: usize) -> Self {
168+
self.concat(GraphQLErrorPathSegment::Index(index))
169+
}
170+
171+
pub fn concat_str(&self, field: String) -> Self {
172+
self.concat(GraphQLErrorPathSegment::String(field))
173+
}
174+
175+
pub fn extend_from_slice(&mut self, other: &[GraphQLErrorPathSegment]) {
176+
self.segments.extend_from_slice(other);
177+
}
178+
179+
pub fn entity_index_and_path<'a>(&'a self) -> Option<EntityIndexAndPath<'a>> {
180+
match &self.segments.as_slice() {
181+
[GraphQLErrorPathSegment::String(maybe_entities), GraphQLErrorPathSegment::Index(entity_index), rest_of_path @ ..]
182+
if maybe_entities == "_entities" =>
183+
{
184+
Some(EntityIndexAndPath {
185+
entity_index: *entity_index,
186+
rest_of_path,
187+
})
188+
}
189+
_ => None,
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)