From 5a23f144eb1664fdae670b9855b0063eced8e01b Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 13:03:42 +0100 Subject: [PATCH 01/20] Move Query to its own file --- crates/apollo-router-core/src/lib.rs | 2 + crates/apollo-router-core/src/query.rs | 329 +++++++++++++++++++++++ crates/apollo-router-core/src/request.rs | 320 ---------------------- 3 files changed, 331 insertions(+), 320 deletions(-) create mode 100644 crates/apollo-router-core/src/query.rs diff --git a/crates/apollo-router-core/src/lib.rs b/crates/apollo-router-core/src/lib.rs index 954e97265a..b2c300b8f0 100644 --- a/crates/apollo-router-core/src/lib.rs +++ b/crates/apollo-router-core/src/lib.rs @@ -25,6 +25,7 @@ macro_rules! failfast_error { mod error; mod json_ext; mod naive_introspection; +mod query; mod query_planner; mod request; mod response; @@ -34,6 +35,7 @@ mod traits; pub use error::*; pub use json_ext::*; pub use naive_introspection::*; +pub use query::*; pub use query_planner::*; pub use request::*; pub use response::*; diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs new file mode 100644 index 0000000000..204c96479e --- /dev/null +++ b/crates/apollo-router-core/src/query.rs @@ -0,0 +1,329 @@ +use crate::prelude::graphql::*; +use apollo_parser::ast; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)] +#[serde(transparent)] +pub struct Query { + string: String, +} + +impl Query { + /// Returns a reference to the underlying query string. + pub fn as_str(&self) -> &str { + self.string.as_str() + } + + /// Re-format the response value to match this query. + /// + /// This will discard unrequested fields and re-order the output to match the order of the + /// query. + #[tracing::instrument] + pub fn format_response(&self, response: &mut Response) { + fn apply_selection_set( + selection_set: &ast::SelectionSet, + input: &mut Object, + output: &mut Object, + fragments: &HashMap, + ) { + for selection in selection_set.selections() { + match selection { + // Spec: https://spec.graphql.org/draft/#Field + ast::Selection::Field(field) => { + let name = field + .name() + .expect("the node Name is not optional in the spec; qed") + .text() + .to_string(); + let alias = field.alias().map(|x| x.name().unwrap().text().to_string()); + let name = alias.unwrap_or(name); + + if let Some(input_value) = input.remove(&name) { + if let Some(selection_set) = field.selection_set() { + match input_value { + Value::Object(mut input_object) => { + let mut output_object = Object::default(); + apply_selection_set( + &selection_set, + &mut input_object, + &mut output_object, + fragments, + ); + output.insert(name, output_object.into()); + } + Value::Array(input_array) => { + let output_array = input_array + .into_iter() + .enumerate() + .map(|(i, mut element)| { + if let Some(input_object) = element.as_object_mut() + { + let mut output_object = Object::default(); + apply_selection_set( + &selection_set, + input_object, + &mut output_object, + fragments, + ); + output_object.into() + } else { + failfast_debug!( + "Array element is not an object: {}[{}]", + name, + i, + ); + element + } + }) + .collect::(); + output.insert(name, output_array); + } + _ => { + output.insert(name.clone(), input_value); + failfast_debug!( + "Field is not an object nor an array of object: {}", + name, + ); + } + } + } else { + output.insert(name, input_value); + } + } else { + failfast_debug!("Missing field: {}", name); + } + } + // Spec: https://spec.graphql.org/draft/#InlineFragment + ast::Selection::InlineFragment(inline_fragment) => { + let selection_set = inline_fragment + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed"); + + apply_selection_set(&selection_set, input, output, fragments); + } + // Spec: https://spec.graphql.org/draft/#FragmentSpread + ast::Selection::FragmentSpread(fragment_spread) => { + let name = fragment_spread + .fragment_name() + .expect("the node FragmentName is not optional in the spec; qed") + .name() + .unwrap() + .text() + .to_string(); + + if let Some(selection_set) = fragments.get(&name) { + apply_selection_set(selection_set, input, output, fragments); + } else { + failfast_debug!("Missing fragment named: {}", name); + } + } + } + } + } + + fn fragments(document: &ast::Document) -> HashMap { + document + .definitions() + .filter_map(|definition| match definition { + // Spec: https://spec.graphql.org/draft/#FragmentDefinition + ast::Definition::FragmentDefinition(fragment_definition) => { + let name = fragment_definition + .fragment_name() + .expect("the node FragmentName is not optional in the spec; qed") + .name() + .unwrap() + .text() + .to_string(); + let selection_set = fragment_definition + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed"); + + Some((name, selection_set)) + } + _ => None, + }) + .collect() + } + + let parser = apollo_parser::Parser::new(self.as_str()); + let tree = parser.parse(); + + if !tree.errors().is_empty() { + let errors = tree + .errors() + .iter() + .map(|err| format!("{:?}", err)) + .collect::>(); + failfast_debug!("Parsing error(s): {}", errors.join(", ")); + return; + } + + let document = tree.document(); + let fragments = fragments(&document); + + for definition in document.definitions() { + // Spec: https://spec.graphql.org/draft/#sec-Language.Operations + if let ast::Definition::OperationDefinition(operation) = definition { + let selection_set = operation + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed"); + if let Some(data) = response.data.as_object_mut() { + let mut output = Object::default(); + apply_selection_set(&selection_set, data, &mut output, &fragments); + response.data = output.into(); + return; + } else { + failfast_debug!("Invalid type for data in response."); + } + } + } + + failfast_debug!("No suitable definition found. This is a bug."); + } +} + +impl> From for Query { + fn from(string: T) -> Self { + Query { + string: string.into(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use test_log::test; + + macro_rules! assert_eq_and_ordered { + ($a:expr, $b:expr $(,)?) => { + assert_eq!($a, $b,); + assert!( + $a.eq_and_ordered(&$b), + "assertion failed: objects are not ordered the same:\ + \n left: `{:?}`\n right: `{:?}`", + $a, + $b, + ); + }; + } + + #[test] + fn reformat_response_data_field() { + let query = Query::from( + r#"{ + foo + stuff{bar} + array{bar} + baz + alias:baz + alias_obj:baz_obj{bar} + alias_array:baz_array{bar} + }"#, + ); + let mut response = Response::builder() + .data(json! {{ + "foo": "1", + "stuff": {"bar": "2"}, + "array": [{"bar": "3", "baz": "4"}, {"bar": "5", "baz": "6"}], + "baz": "7", + "alias": "7", + "alias_obj": {"bar": "8"}, + "alias_array": [{"bar": "9", "baz": "10"}, {"bar": "11", "baz": "12"}], + "other": "13", + }}) + .build(); + query.format_response(&mut response); + assert_eq_and_ordered!( + response.data, + json! {{ + "foo": "1", + "stuff": { + "bar": "2", + }, + "array": [ + {"bar": "3"}, + {"bar": "5"}, + ], + "baz": "7", + "alias": "7", + "alias_obj": { + "bar": "8", + }, + "alias_array": [ + {"bar": "9"}, + {"bar": "11"}, + ], + }}, + ); + } + + #[test] + fn reformat_response_data_inline_fragment() { + let query = Query::from(r#"{... on Stuff { stuff{bar}}}"#); + let mut response = Response::builder() + .data(json! {{"stuff": {"bar": "2"}}}) + .build(); + query.format_response(&mut response); + assert_eq_and_ordered!( + response.data, + json! {{ + "stuff": { + "bar": "2", + }, + }}, + ); + } + + #[test] + fn reformat_response_data_fragment_spread() { + let query = + Query::from(r#"{...foo ...bar} fragment foo on Foo {foo} fragment bar on Bar {bar}"#); + let mut response = Response::builder() + .data(json! {{"foo": "1", "bar": "2"}}) + .build(); + query.format_response(&mut response); + assert_eq_and_ordered!( + response.data, + json! {{ + "foo": "1", + "bar": "2", + }}, + ); + } + + #[test] + fn reformat_response_data_best_effort() { + let query = Query::from(r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#); + let mut response = Response::builder() + .data(json! {{ + "foo": "1", + "stuff": {"baz": "2"}, + "array": [ + {"baz": "3"}, + "4", + {"bar": "5"}, + ], + "other": "6", + }}) + .build(); + query.format_response(&mut response); + assert_eq_and_ordered!( + response.data, + json! {{ + "foo": "1", + "stuff": { + "baz": "2", + }, + "array": [ + {"baz": "3"}, + "4", + {"bar": "5"}, + ], + "other": "6", + }}, + ); + } +} diff --git a/crates/apollo-router-core/src/request.rs b/crates/apollo-router-core/src/request.rs index 915f0aa687..51952a446a 100644 --- a/crates/apollo-router-core/src/request.rs +++ b/crates/apollo-router-core/src/request.rs @@ -1,8 +1,6 @@ use crate::prelude::graphql::*; -use apollo_parser::ast; use derivative::Derivative; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::sync::Arc; use typed_builder::TypedBuilder; @@ -32,194 +30,6 @@ pub struct Request { pub extensions: Object, } -#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)] -#[serde(transparent)] -pub struct Query { - string: String, -} - -impl Query { - /// Returns a reference to the underlying query string. - pub fn as_str(&self) -> &str { - self.string.as_str() - } - - /// Re-format the response value to match this query. - /// - /// This will discard unrequested fields and re-order the output to match the order of the - /// query. - #[tracing::instrument] - pub fn format_response(&self, response: &mut Response) { - fn apply_selection_set( - selection_set: &ast::SelectionSet, - input: &mut Object, - output: &mut Object, - fragments: &HashMap, - ) { - for selection in selection_set.selections() { - match selection { - // Spec: https://spec.graphql.org/draft/#Field - ast::Selection::Field(field) => { - let name = field - .name() - .expect("the node Name is not optional in the spec; qed") - .text() - .to_string(); - let alias = field.alias().map(|x| x.name().unwrap().text().to_string()); - let name = alias.unwrap_or(name); - - if let Some(input_value) = input.remove(&name) { - if let Some(selection_set) = field.selection_set() { - match input_value { - Value::Object(mut input_object) => { - let mut output_object = Object::default(); - apply_selection_set( - &selection_set, - &mut input_object, - &mut output_object, - fragments, - ); - output.insert(name, output_object.into()); - } - Value::Array(input_array) => { - let output_array = input_array - .into_iter() - .enumerate() - .map(|(i, mut element)| { - if let Some(input_object) = element.as_object_mut() - { - let mut output_object = Object::default(); - apply_selection_set( - &selection_set, - input_object, - &mut output_object, - fragments, - ); - output_object.into() - } else { - failfast_debug!( - "Array element is not an object: {}[{}]", - name, - i, - ); - element - } - }) - .collect::(); - output.insert(name, output_array); - } - _ => { - output.insert(name.clone(), input_value); - failfast_debug!( - "Field is not an object nor an array of object: {}", - name, - ); - } - } - } else { - output.insert(name, input_value); - } - } else { - failfast_debug!("Missing field: {}", name); - } - } - // Spec: https://spec.graphql.org/draft/#InlineFragment - ast::Selection::InlineFragment(inline_fragment) => { - let selection_set = inline_fragment - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - - apply_selection_set(&selection_set, input, output, fragments); - } - // Spec: https://spec.graphql.org/draft/#FragmentSpread - ast::Selection::FragmentSpread(fragment_spread) => { - let name = fragment_spread - .fragment_name() - .expect("the node FragmentName is not optional in the spec; qed") - .name() - .unwrap() - .text() - .to_string(); - - if let Some(selection_set) = fragments.get(&name) { - apply_selection_set(selection_set, input, output, fragments); - } else { - failfast_debug!("Missing fragment named: {}", name); - } - } - } - } - } - - fn fragments(document: &ast::Document) -> HashMap { - document - .definitions() - .filter_map(|definition| match definition { - // Spec: https://spec.graphql.org/draft/#FragmentDefinition - ast::Definition::FragmentDefinition(fragment_definition) => { - let name = fragment_definition - .fragment_name() - .expect("the node FragmentName is not optional in the spec; qed") - .name() - .unwrap() - .text() - .to_string(); - let selection_set = fragment_definition - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - - Some((name, selection_set)) - } - _ => None, - }) - .collect() - } - - let parser = apollo_parser::Parser::new(self.as_str()); - let tree = parser.parse(); - - if !tree.errors().is_empty() { - let errors = tree - .errors() - .iter() - .map(|err| format!("{:?}", err)) - .collect::>(); - failfast_debug!("Parsing error(s): {}", errors.join(", ")); - return; - } - - let document = tree.document(); - let fragments = fragments(&document); - - for definition in document.definitions() { - // Spec: https://spec.graphql.org/draft/#sec-Language.Operations - if let ast::Definition::OperationDefinition(operation) = definition { - let selection_set = operation - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - if let Some(data) = response.data.as_object_mut() { - let mut output = Object::default(); - apply_selection_set(&selection_set, data, &mut output, &fragments); - response.data = output.into(); - return; - } else { - failfast_debug!("Invalid type for data in response."); - } - } - } - - failfast_debug!("No suitable definition found. This is a bug."); - } -} - -impl> From for Query { - fn from(string: T) -> Self { - Query { - string: string.into(), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -298,134 +108,4 @@ mod tests { .build() ); } - - macro_rules! assert_eq_and_ordered { - ($a:expr, $b:expr $(,)?) => { - assert_eq!($a, $b,); - assert!( - $a.eq_and_ordered(&$b), - "assertion failed: objects are not ordered the same:\ - \n left: `{:?}`\n right: `{:?}`", - $a, - $b, - ); - }; - } - - #[test] - fn reformat_response_data_field() { - let query = Query::from( - r#"{ - foo - stuff{bar} - array{bar} - baz - alias:baz - alias_obj:baz_obj{bar} - alias_array:baz_array{bar} - }"#, - ); - let mut response = Response::builder() - .data(json! {{ - "foo": "1", - "stuff": {"bar": "2"}, - "array": [{"bar": "3", "baz": "4"}, {"bar": "5", "baz": "6"}], - "baz": "7", - "alias": "7", - "alias_obj": {"bar": "8"}, - "alias_array": [{"bar": "9", "baz": "10"}, {"bar": "11", "baz": "12"}], - "other": "13", - }}) - .build(); - query.format_response(&mut response); - assert_eq_and_ordered!( - response.data, - json! {{ - "foo": "1", - "stuff": { - "bar": "2", - }, - "array": [ - {"bar": "3"}, - {"bar": "5"}, - ], - "baz": "7", - "alias": "7", - "alias_obj": { - "bar": "8", - }, - "alias_array": [ - {"bar": "9"}, - {"bar": "11"}, - ], - }}, - ); - } - - #[test] - fn reformat_response_data_inline_fragment() { - let query = Query::from(r#"{... on Stuff { stuff{bar}}}"#); - let mut response = Response::builder() - .data(json! {{"stuff": {"bar": "2"}}}) - .build(); - query.format_response(&mut response); - assert_eq_and_ordered!( - response.data, - json! {{ - "stuff": { - "bar": "2", - }, - }}, - ); - } - - #[test] - fn reformat_response_data_fragment_spread() { - let query = - Query::from(r#"{...foo ...bar} fragment foo on Foo {foo} fragment bar on Bar {bar}"#); - let mut response = Response::builder() - .data(json! {{"foo": "1", "bar": "2"}}) - .build(); - query.format_response(&mut response); - assert_eq_and_ordered!( - response.data, - json! {{ - "foo": "1", - "bar": "2", - }}, - ); - } - - #[test] - fn reformat_response_data_best_effort() { - let query = Query::from(r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#); - let mut response = Response::builder() - .data(json! {{ - "foo": "1", - "stuff": {"baz": "2"}, - "array": [ - {"baz": "3"}, - "4", - {"bar": "5"}, - ], - "other": "6", - }}) - .build(); - query.format_response(&mut response); - assert_eq_and_ordered!( - response.data, - json! {{ - "foo": "1", - "stuff": { - "baz": "2", - }, - "array": [ - {"baz": "3"}, - "4", - {"bar": "5"}, - ], - "other": "6", - }}, - ); - } } From 46ece7dcd86a841d47936f0972fc10b2193fb0e4 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 13:19:06 +0100 Subject: [PATCH 02/20] Move query parsing to object creation I don't store any object coming from apollo_parser here because they are !Send. This prevents the whole object to be moved to another thread. This is why I need to create my own objects (enum Selection) which won't have this limitation. --- crates/apollo-router-core/src/query.rs | 93 ++++++++++++++++++-------- 1 file changed, 65 insertions(+), 28 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 204c96479e..f6e6a01de7 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -1,12 +1,17 @@ use crate::prelude::graphql::*; use apollo_parser::ast; +use derivative::Derivative; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Clone, Hash, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, Derivative)] +#[derivative(PartialEq, Hash, Eq)] #[serde(transparent)] pub struct Query { string: String, + #[derivative(PartialEq = "ignore", Hash = "ignore")] + #[serde(skip)] + fragments: HashMap>, } impl Query { @@ -122,30 +127,6 @@ impl Query { } } - fn fragments(document: &ast::Document) -> HashMap { - document - .definitions() - .filter_map(|definition| match definition { - // Spec: https://spec.graphql.org/draft/#FragmentDefinition - ast::Definition::FragmentDefinition(fragment_definition) => { - let name = fragment_definition - .fragment_name() - .expect("the node FragmentName is not optional in the spec; qed") - .name() - .unwrap() - .text() - .to_string(); - let selection_set = fragment_definition - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - - Some((name, selection_set)) - } - _ => None, - }) - .collect() - } - let parser = apollo_parser::Parser::new(self.as_str()); let tree = parser.parse(); @@ -160,7 +141,7 @@ impl Query { } let document = tree.document(); - let fragments = fragments(&document); + let fragments = todo!(); //fragments(&document); for definition in document.definitions() { // Spec: https://spec.graphql.org/draft/#sec-Language.Operations @@ -181,16 +162,72 @@ impl Query { failfast_debug!("No suitable definition found. This is a bug."); } + + fn fragments(document: &ast::Document) -> HashMap> { + /* + document + .definitions() + .filter_map(|definition| match definition { + // Spec: https://spec.graphql.org/draft/#FragmentDefinition + ast::Definition::FragmentDefinition(fragment_definition) => { + let name = fragment_definition + .fragment_name() + .expect("the node FragmentName is not optional in the spec; qed") + .name() + .unwrap() + .text() + .to_string(); + let selection_set = fragment_definition + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed"); + + Some((name, selection_set)) + } + _ => None, + }) + .collect() + */ + todo!() + } } impl> From for Query { fn from(string: T) -> Self { - Query { - string: string.into(), + let string = string.into(); + let parser = apollo_parser::Parser::new(string.as_str()); + let tree = parser.parse(); + + if !tree.errors().is_empty() { + let errors = tree + .errors() + .iter() + .map(|err| format!("{:?}", err)) + .collect::>(); + failfast_debug!("Parsing error(s): {}", errors.join(", ")); + todo!(); } + + let document = tree.document(); + let fragments = Self::fragments(&document); + + Query { string, fragments } } } +#[derive(Debug, Clone)] +enum Selection { + Field { + name: String, + selection_set: Vec, + }, + InlineFragment { + selection_set: Vec, + }, + FragmentSpread { + name: String, + }, +} + #[cfg(test)] mod tests { use super::*; From 40b12afd7c5d94b718d19886f8047669cdf25595 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 13:21:33 +0100 Subject: [PATCH 03/20] Remove unnecessary derives We won't serialize or deserialize on object creation because this process is costly. So Request's `query` field stays a simple String and Query becomes a new object more aware of the actual content of the query. --- crates/apollo-router-core/src/query.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index f6e6a01de7..aca17351ea 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -1,16 +1,13 @@ use crate::prelude::graphql::*; use apollo_parser::ast; use derivative::Derivative; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; -#[derive(Debug, Clone, Deserialize, Serialize, Derivative)] +#[derive(Debug, Derivative)] #[derivative(PartialEq, Hash, Eq)] -#[serde(transparent)] pub struct Query { string: String, #[derivative(PartialEq = "ignore", Hash = "ignore")] - #[serde(skip)] fragments: HashMap>, } @@ -214,7 +211,7 @@ impl> From for Query { } } -#[derive(Debug, Clone)] +#[derive(Debug)] enum Selection { Field { name: String, From fad02c665312a5620c34972a5fb7116011d9c94d Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 13:31:41 +0100 Subject: [PATCH 04/20] Convert ast::Selection to Selection --- crates/apollo-router-core/src/query.rs | 58 ++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index aca17351ea..9633d14c6b 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -161,7 +161,6 @@ impl Query { } fn fragments(document: &ast::Document) -> HashMap> { - /* document .definitions() .filter_map(|definition| match definition { @@ -178,13 +177,11 @@ impl Query { .selection_set() .expect("the node SelectionSet is not optional in the spec; qed"); - Some((name, selection_set)) + Some((name, selection_set.selections().map(Into::into).collect())) } _ => None, }) .collect() - */ - todo!() } } @@ -225,6 +222,59 @@ enum Selection { }, } +impl From for Selection { + fn from(selection: ast::Selection) -> Self { + match selection { + // Spec: https://spec.graphql.org/draft/#Field + ast::Selection::Field(field) => { + let name = field + .name() + .expect("the node Name is not optional in the spec; qed") + .text() + .to_string(); + let alias = field.alias().map(|x| x.name().unwrap().text().to_string()); + let name = alias.unwrap_or(name); + let selection_set = field + .selection_set() + .into_iter() + .flat_map(|x| x.selections()) + .into_iter() + .map(Into::into) + .collect(); + + Self::Field { + name, + selection_set, + } + } + // Spec: https://spec.graphql.org/draft/#InlineFragment + ast::Selection::InlineFragment(inline_fragment) => { + let selection_set = inline_fragment + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed") + .selections() + .into_iter() + .map(Into::into) + .collect(); + + Self::InlineFragment { selection_set } + } + // Spec: https://spec.graphql.org/draft/#FragmentSpread + ast::Selection::FragmentSpread(fragment_spread) => { + let name = fragment_spread + .fragment_name() + .expect("the node FragmentName is not optional in the spec; qed") + .name() + .unwrap() + .text() + .to_string(); + + Self::FragmentSpread { name } + } + } + } +} + #[cfg(test)] mod tests { use super::*; From 8e913b11f6697f924c3dd3277bfe273b448ad2d8 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 13:44:39 +0100 Subject: [PATCH 05/20] Convert ast::OperationDefinition to Operation --- crates/apollo-router-core/src/query.rs | 245 ++++++++++++++----------- 1 file changed, 141 insertions(+), 104 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 9633d14c6b..fccda568bc 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -9,6 +9,8 @@ pub struct Query { string: String, #[derivative(PartialEq = "ignore", Hash = "ignore")] fragments: HashMap>, + #[derivative(PartialEq = "ignore", Hash = "ignore")] + operations: Vec, } impl Query { @@ -23,107 +25,6 @@ impl Query { /// query. #[tracing::instrument] pub fn format_response(&self, response: &mut Response) { - fn apply_selection_set( - selection_set: &ast::SelectionSet, - input: &mut Object, - output: &mut Object, - fragments: &HashMap, - ) { - for selection in selection_set.selections() { - match selection { - // Spec: https://spec.graphql.org/draft/#Field - ast::Selection::Field(field) => { - let name = field - .name() - .expect("the node Name is not optional in the spec; qed") - .text() - .to_string(); - let alias = field.alias().map(|x| x.name().unwrap().text().to_string()); - let name = alias.unwrap_or(name); - - if let Some(input_value) = input.remove(&name) { - if let Some(selection_set) = field.selection_set() { - match input_value { - Value::Object(mut input_object) => { - let mut output_object = Object::default(); - apply_selection_set( - &selection_set, - &mut input_object, - &mut output_object, - fragments, - ); - output.insert(name, output_object.into()); - } - Value::Array(input_array) => { - let output_array = input_array - .into_iter() - .enumerate() - .map(|(i, mut element)| { - if let Some(input_object) = element.as_object_mut() - { - let mut output_object = Object::default(); - apply_selection_set( - &selection_set, - input_object, - &mut output_object, - fragments, - ); - output_object.into() - } else { - failfast_debug!( - "Array element is not an object: {}[{}]", - name, - i, - ); - element - } - }) - .collect::(); - output.insert(name, output_array); - } - _ => { - output.insert(name.clone(), input_value); - failfast_debug!( - "Field is not an object nor an array of object: {}", - name, - ); - } - } - } else { - output.insert(name, input_value); - } - } else { - failfast_debug!("Missing field: {}", name); - } - } - // Spec: https://spec.graphql.org/draft/#InlineFragment - ast::Selection::InlineFragment(inline_fragment) => { - let selection_set = inline_fragment - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - - apply_selection_set(&selection_set, input, output, fragments); - } - // Spec: https://spec.graphql.org/draft/#FragmentSpread - ast::Selection::FragmentSpread(fragment_spread) => { - let name = fragment_spread - .fragment_name() - .expect("the node FragmentName is not optional in the spec; qed") - .name() - .unwrap() - .text() - .to_string(); - - if let Some(selection_set) = fragments.get(&name) { - apply_selection_set(selection_set, input, output, fragments); - } else { - failfast_debug!("Missing fragment named: {}", name); - } - } - } - } - } - let parser = apollo_parser::Parser::new(self.as_str()); let tree = parser.parse(); @@ -138,7 +39,6 @@ impl Query { } let document = tree.document(); - let fragments = todo!(); //fragments(&document); for definition in document.definitions() { // Spec: https://spec.graphql.org/draft/#sec-Language.Operations @@ -148,7 +48,7 @@ impl Query { .expect("the node SelectionSet is not optional in the spec; qed"); if let Some(data) = response.data.as_object_mut() { let mut output = Object::default(); - apply_selection_set(&selection_set, data, &mut output, &fragments); + self.apply_selection_set(&selection_set, data, &mut output); response.data = output.into(); return; } else { @@ -183,6 +83,109 @@ impl Query { }) .collect() } + + fn apply_selection_set( + &self, + selection_set: &ast::SelectionSet, + input: &mut Object, + output: &mut Object, + ) { + /* + for selection in selection_set.selections() { + match selection { + // Spec: https://spec.graphql.org/draft/#Field + ast::Selection::Field(field) => { + let name = field + .name() + .expect("the node Name is not optional in the spec; qed") + .text() + .to_string(); + let alias = field.alias().map(|x| x.name().unwrap().text().to_string()); + let name = alias.unwrap_or(name); + + if let Some(input_value) = input.remove(&name) { + if let Some(selection_set) = field.selection_set() { + match input_value { + Value::Object(mut input_object) => { + let mut output_object = Object::default(); + apply_selection_set( + &selection_set, + &mut input_object, + &mut output_object, + fragments, + ); + output.insert(name, output_object.into()); + } + Value::Array(input_array) => { + let output_array = input_array + .into_iter() + .enumerate() + .map(|(i, mut element)| { + if let Some(input_object) = element.as_object_mut() { + let mut output_object = Object::default(); + apply_selection_set( + &selection_set, + input_object, + &mut output_object, + fragments, + ); + output_object.into() + } else { + failfast_debug!( + "Array element is not an object: {}[{}]", + name, + i, + ); + element + } + }) + .collect::(); + output.insert(name, output_array); + } + _ => { + output.insert(name.clone(), input_value); + failfast_debug!( + "Field is not an object nor an array of object: {}", + name, + ); + } + } + } else { + output.insert(name, input_value); + } + } else { + failfast_debug!("Missing field: {}", name); + } + } + // Spec: https://spec.graphql.org/draft/#InlineFragment + ast::Selection::InlineFragment(inline_fragment) => { + let selection_set = inline_fragment + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed"); + + apply_selection_set(&selection_set, input, output, fragments); + } + // Spec: https://spec.graphql.org/draft/#FragmentSpread + ast::Selection::FragmentSpread(fragment_spread) => { + let name = fragment_spread + .fragment_name() + .expect("the node FragmentName is not optional in the spec; qed") + .name() + .unwrap() + .text() + .to_string(); + + if let Some(selection_set) = fragments.get(&name) { + apply_selection_set(selection_set, input, output, fragments); + } else { + failfast_debug!("Missing fragment named: {}", name); + } + } + } + } + */ + todo!() + } } impl> From for Query { @@ -204,7 +207,23 @@ impl> From for Query { let document = tree.document(); let fragments = Self::fragments(&document); - Query { string, fragments } + let operations = document + .definitions() + .filter_map(|definition| { + // Spec: https://spec.graphql.org/draft/#sec-Language.Operations + if let ast::Definition::OperationDefinition(operation) = definition { + Some(operation.into()) + } else { + None + } + }) + .collect(); + + Query { + string, + fragments, + operations, + } } } @@ -275,6 +294,24 @@ impl From for Selection { } } +#[derive(Debug)] +struct Operation { + selection_set: Vec, +} + +impl From for Operation { + fn from(operation: ast::OperationDefinition) -> Self { + let selection_set = operation + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed") + .selections() + .map(Into::into) + .collect(); + + Operation { selection_set } + } +} + #[cfg(test)] mod tests { use super::*; From 0e01fd401fd2ed7b7d4899742beeb0fad919165b Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 14:28:41 +0100 Subject: [PATCH 06/20] Update apply_selection_set --- crates/apollo-router-core/src/query.rs | 108 +++++++------------------ 1 file changed, 29 insertions(+), 79 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index fccda568bc..9b07b10f89 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -25,35 +25,14 @@ impl Query { /// query. #[tracing::instrument] pub fn format_response(&self, response: &mut Response) { - let parser = apollo_parser::Parser::new(self.as_str()); - let tree = parser.parse(); - - if !tree.errors().is_empty() { - let errors = tree - .errors() - .iter() - .map(|err| format!("{:?}", err)) - .collect::>(); - failfast_debug!("Parsing error(s): {}", errors.join(", ")); - return; - } - - let document = tree.document(); - - for definition in document.definitions() { - // Spec: https://spec.graphql.org/draft/#sec-Language.Operations - if let ast::Definition::OperationDefinition(operation) = definition { - let selection_set = operation - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - if let Some(data) = response.data.as_object_mut() { - let mut output = Object::default(); - self.apply_selection_set(&selection_set, data, &mut output); - response.data = output.into(); - return; - } else { - failfast_debug!("Invalid type for data in response."); - } + for operation in self.operations.iter() { + if let Some(data) = response.data.as_object_mut() { + let mut output = Object::default(); + self.apply_selection_set(&operation.selection_set, data, &mut output); + response.data = output.into(); + return; + } else { + failfast_debug!("Invalid type for data in response."); } } @@ -86,35 +65,27 @@ impl Query { fn apply_selection_set( &self, - selection_set: &ast::SelectionSet, + selection_set: &[Selection], input: &mut Object, output: &mut Object, ) { - /* - for selection in selection_set.selections() { + for selection in selection_set { match selection { - // Spec: https://spec.graphql.org/draft/#Field - ast::Selection::Field(field) => { - let name = field - .name() - .expect("the node Name is not optional in the spec; qed") - .text() - .to_string(); - let alias = field.alias().map(|x| x.name().unwrap().text().to_string()); - let name = alias.unwrap_or(name); - - if let Some(input_value) = input.remove(&name) { - if let Some(selection_set) = field.selection_set() { + Selection::Field { + name, + selection_set, + } => { + if let Some(input_value) = input.remove(name) { + if let Some(selection_set) = selection_set { match input_value { Value::Object(mut input_object) => { let mut output_object = Object::default(); - apply_selection_set( + self.apply_selection_set( &selection_set, &mut input_object, &mut output_object, - fragments, ); - output.insert(name, output_object.into()); + output.insert(name.to_string(), output_object.into()); } Value::Array(input_array) => { let output_array = input_array @@ -123,11 +94,10 @@ impl Query { .map(|(i, mut element)| { if let Some(input_object) = element.as_object_mut() { let mut output_object = Object::default(); - apply_selection_set( + self.apply_selection_set( &selection_set, input_object, &mut output_object, - fragments, ); output_object.into() } else { @@ -140,7 +110,7 @@ impl Query { } }) .collect::(); - output.insert(name, output_array); + output.insert(name.to_string(), output_array); } _ => { output.insert(name.clone(), input_value); @@ -151,40 +121,24 @@ impl Query { } } } else { - output.insert(name, input_value); + output.insert(name.to_string(), input_value); } } else { failfast_debug!("Missing field: {}", name); } } - // Spec: https://spec.graphql.org/draft/#InlineFragment - ast::Selection::InlineFragment(inline_fragment) => { - let selection_set = inline_fragment - .selection_set() - .expect("the node SelectionSet is not optional in the spec; qed"); - - apply_selection_set(&selection_set, input, output, fragments); + Selection::InlineFragment { selection_set } => { + self.apply_selection_set(&selection_set, input, output); } - // Spec: https://spec.graphql.org/draft/#FragmentSpread - ast::Selection::FragmentSpread(fragment_spread) => { - let name = fragment_spread - .fragment_name() - .expect("the node FragmentName is not optional in the spec; qed") - .name() - .unwrap() - .text() - .to_string(); - - if let Some(selection_set) = fragments.get(&name) { - apply_selection_set(selection_set, input, output, fragments); + Selection::FragmentSpread { name } => { + if let Some(selection_set) = self.fragments.get(name) { + self.apply_selection_set(selection_set, input, output); } else { failfast_debug!("Missing fragment named: {}", name); } } } } - */ - todo!() } } @@ -210,7 +164,6 @@ impl> From for Query { let operations = document .definitions() .filter_map(|definition| { - // Spec: https://spec.graphql.org/draft/#sec-Language.Operations if let ast::Definition::OperationDefinition(operation) = definition { Some(operation.into()) } else { @@ -231,7 +184,7 @@ impl> From for Query { enum Selection { Field { name: String, - selection_set: Vec, + selection_set: Option>, }, InlineFragment { selection_set: Vec, @@ -255,11 +208,7 @@ impl From for Selection { let name = alias.unwrap_or(name); let selection_set = field .selection_set() - .into_iter() - .flat_map(|x| x.selections()) - .into_iter() - .map(Into::into) - .collect(); + .map(|x| x.selections().into_iter().map(Into::into).collect()); Self::Field { name, @@ -301,6 +250,7 @@ struct Operation { impl From for Operation { fn from(operation: ast::OperationDefinition) -> Self { + // Spec: https://spec.graphql.org/draft/#sec-Language.Operations let selection_set = operation .selection_set() .expect("the node SelectionSet is not optional in the spec; qed") From 86f6cb028cc81019697156230c59ebbd48e9f5d7 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 14:37:29 +0100 Subject: [PATCH 07/20] Do the parsing in a blocking thread --- crates/apollo-router-core/src/query.rs | 76 +++++++++++------------ crates/apollo-router/src/apollo_router.rs | 2 +- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 9b07b10f89..927af00683 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -39,6 +39,44 @@ impl Query { failfast_debug!("No suitable definition found. This is a bug."); } + pub fn parse(query: impl Into) -> tokio::task::JoinHandle { + let string = query.into(); + tokio::task::spawn_blocking(move || { + let parser = apollo_parser::Parser::new(string.as_str()); + let tree = parser.parse(); + + if !tree.errors().is_empty() { + let errors = tree + .errors() + .iter() + .map(|err| format!("{:?}", err)) + .collect::>(); + failfast_debug!("Parsing error(s): {}", errors.join(", ")); + todo!(); + } + + let document = tree.document(); + let fragments = Self::fragments(&document); + + let operations = document + .definitions() + .filter_map(|definition| { + if let ast::Definition::OperationDefinition(operation) = definition { + Some(operation.into()) + } else { + None + } + }) + .collect(); + + Query { + string, + fragments, + operations, + } + }) + } + fn fragments(document: &ast::Document) -> HashMap> { document .definitions() @@ -142,44 +180,6 @@ impl Query { } } -impl> From for Query { - fn from(string: T) -> Self { - let string = string.into(); - let parser = apollo_parser::Parser::new(string.as_str()); - let tree = parser.parse(); - - if !tree.errors().is_empty() { - let errors = tree - .errors() - .iter() - .map(|err| format!("{:?}", err)) - .collect::>(); - failfast_debug!("Parsing error(s): {}", errors.join(", ")); - todo!(); - } - - let document = tree.document(); - let fragments = Self::fragments(&document); - - let operations = document - .definitions() - .filter_map(|definition| { - if let ast::Definition::OperationDefinition(operation) = definition { - Some(operation.into()) - } else { - None - } - }) - .collect(); - - Query { - string, - fragments, - operations, - } - } -} - #[derive(Debug)] enum Selection { Field { diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index dcf2f6251e..c7c994bffa 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -64,7 +64,7 @@ impl Router for ApolloRouter { } // TODO query caching - let query = Arc::new(Query::from(&request.query)); + let query = Arc::new(Query::parse(&request.query).await.expect("todo")); Ok(ApolloPreparedQuery { query_plan, From 230980141f79319f0e63989460a8131df41c49c7 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 14:45:23 +0100 Subject: [PATCH 08/20] Return parsing errors --- crates/apollo-router-core/src/query.rs | 24 +++++++++++++---------- crates/apollo-router/src/apollo_router.rs | 7 ++++++- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 927af00683..d69ca0133d 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -39,20 +39,24 @@ impl Query { failfast_debug!("No suitable definition found. This is a bug."); } - pub fn parse(query: impl Into) -> tokio::task::JoinHandle { + pub fn parse( + query: impl Into, + ) -> tokio::task::JoinHandle>> { let string = query.into(); tokio::task::spawn_blocking(move || { let parser = apollo_parser::Parser::new(string.as_str()); let tree = parser.parse(); if !tree.errors().is_empty() { - let errors = tree - .errors() - .iter() - .map(|err| format!("{:?}", err)) - .collect::>(); - failfast_debug!("Parsing error(s): {}", errors.join(", ")); - todo!(); + failfast_debug!( + "Parsing error(s): {}", + tree.errors() + .iter() + .map(|err| format!("{:?}", err)) + .collect::>() + .join(", "), + ); + return Err(tree.errors().iter().cloned().collect()); } let document = tree.document(); @@ -69,11 +73,11 @@ impl Query { }) .collect(); - Query { + Ok(Query { string, fragments, operations, - } + }) }) } diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index c7c994bffa..6e7e085bb2 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -64,7 +64,12 @@ impl Router for ApolloRouter { } // TODO query caching - let query = Arc::new(Query::parse(&request.query).await.expect("todo")); + let query = Arc::new( + Query::parse(&request.query) + .await + .expect("todo") + .expect("todo"), + ); Ok(ApolloPreparedQuery { query_plan, From 78d61961c177fb3d7b7f5259640736327533995a Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 15:24:31 +0100 Subject: [PATCH 09/20] Shamelessly copy-pasting caching from query planner Maybe I should have refactor that caching first... --- crates/apollo-router-core/src/lib.rs | 2 + crates/apollo-router-core/src/query.rs | 8 +- crates/apollo-router-core/src/query_cache.rs | 104 +++++++++++++++++++ crates/apollo-router/src/apollo_router.rs | 3 + crates/apollo-router/src/lib.rs | 6 +- crates/apollo-router/src/main.rs | 5 + crates/apollo-router/src/router_factory.rs | 20 +++- crates/apollo-router/src/state_machine.rs | 3 + 8 files changed, 142 insertions(+), 9 deletions(-) create mode 100644 crates/apollo-router-core/src/query_cache.rs diff --git a/crates/apollo-router-core/src/lib.rs b/crates/apollo-router-core/src/lib.rs index b2c300b8f0..d497a8894d 100644 --- a/crates/apollo-router-core/src/lib.rs +++ b/crates/apollo-router-core/src/lib.rs @@ -26,6 +26,7 @@ mod error; mod json_ext; mod naive_introspection; mod query; +mod query_cache; mod query_planner; mod request; mod response; @@ -36,6 +37,7 @@ pub use error::*; pub use json_ext::*; pub use naive_introspection::*; pub use query::*; +pub use query_cache::*; pub use query_planner::*; pub use request::*; pub use response::*; diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index d69ca0133d..c4edbafb32 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -39,9 +39,7 @@ impl Query { failfast_debug!("No suitable definition found. This is a bug."); } - pub fn parse( - query: impl Into, - ) -> tokio::task::JoinHandle>> { + pub fn parse(query: impl Into) -> tokio::task::JoinHandle> { let string = query.into(); tokio::task::spawn_blocking(move || { let parser = apollo_parser::Parser::new(string.as_str()); @@ -56,7 +54,7 @@ impl Query { .collect::>() .join(", "), ); - return Err(tree.errors().iter().cloned().collect()); + return None; } let document = tree.document(); @@ -73,7 +71,7 @@ impl Query { }) .collect(); - Ok(Query { + Some(Query { string, fragments, operations, diff --git a/crates/apollo-router-core/src/query_cache.rs b/crates/apollo-router-core/src/query_cache.rs new file mode 100644 index 0000000000..03fa9b34c4 --- /dev/null +++ b/crates/apollo-router-core/src/query_cache.rs @@ -0,0 +1,104 @@ +use crate::prelude::graphql::*; +use futures::lock::Mutex; +use lru::LruCache; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// TODO +#[derive(Debug)] +pub struct QueryCache { + cached: Mutex>>>, + wait_map: Mutex>)>>>, + cache_limit: usize, +} + +impl QueryCache { + /// TODO + pub fn new(cache_limit: usize) -> Self { + Self { + cached: Mutex::new(LruCache::new(cache_limit)), + wait_map: Mutex::new(HashMap::new()), + cache_limit, + } + } + + /// TODO + pub async fn get_query(&self, query: impl AsRef) -> Option> { + let mut locked_cache = self.cached.lock().await; + let key = query.as_ref().to_string(); + if let Some(value) = locked_cache.get(&key).cloned() { + return value; + } + + // Holding a lock across the query parsing tasks is a bad idea because this would block all + // other get() requests for a potentially long time. + // + // Alternatively, if we don't hold the lock, there is a risk that we will do the work + // multiple times. This is also sub-optimal. + + // To work around this, we keep a list of keys we are currently processing. If we try to + // get a key on this list, we block and wait for it to complete and then retry. + // + // This is more complex than either of the two simple alternatives but succeeds in + // providing a mechanism where each client only waits for uncached Query they are going to + // use AND avoids generating the query multiple times. + + let mut locked_wait_map = self.wait_map.lock().await; + + // We must only drop the locked cache after we have locked the + // wait map. Otherwise,we might get a race that causes us to + // miss a broadcast. + drop(locked_cache); + + match locked_wait_map.get_mut(&key) { + Some(waiter) => { + // Register interest in key + let mut receiver = waiter.subscribe(); + drop(locked_wait_map); + let (recv_key, recv_plan) = receiver.recv().await.expect( + "the sender won't ever be dropped before all the receivers finish; qed", + ); + debug_assert_eq!(recv_key, key); + recv_plan + } + None => { + let (tx, _rx) = broadcast::channel(1); + locked_wait_map.insert(key.clone(), tx.clone()); + drop(locked_wait_map); + // This is the potentially high duration operation + // No locks are held here + let parsed_query = match Query::parse(query.as_ref()).await { + Ok(res) => res.map(Arc::new), + // Silently ignore cancelled tasks (never happen for blocking tasks). + Err(err) if err.is_cancelled() => None, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None + } + }; + // Update our cache + let mut locked_cache = self.cached.lock().await; + locked_cache.put(key.clone(), parsed_query.clone()); + // Update our wait list + let mut locked_wait_map = self.wait_map.lock().await; + locked_wait_map.remove(&key); + // Let our waiters know + let broadcast_value = parsed_query.clone(); + match tokio::task::spawn_blocking(move || { + let _ = tx + .send((key, broadcast_value)) + .expect("there is always at least one receiver alive, the _rx guard; qed"); + }) + .await + { + Ok(()) => parsed_query, + Err(err) => { + failfast_debug!("Parsing query task failed: {}", err); + None + } + } + } + } + } +} diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index 6e7e085bb2..201c90f856 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -13,6 +13,7 @@ pub struct ApolloRouter { query_planner: Arc, service_registry: Arc, schema: Arc, + query_cache: QueryCache, } impl ApolloRouter { @@ -21,12 +22,14 @@ impl ApolloRouter { query_planner: Arc, service_registry: Arc, schema: Arc, + query_cache_limit: usize, ) -> Self { Self { naive_introspection: NaiveIntrospection::from_schema(&schema), query_planner, service_registry, schema, + query_cache: QueryCache::new(query_cache_limit), } } diff --git a/crates/apollo-router/src/lib.rs b/crates/apollo-router/src/lib.rs index 1f7c91a169..59f73f4273 100644 --- a/crates/apollo-router/src/lib.rs +++ b/crates/apollo-router/src/lib.rs @@ -428,6 +428,10 @@ pub struct FederatedServer { #[builder(default = 100)] plan_cache_limit: usize, + /// Limit query cache entries. + #[builder(default = 100)] + query_cache_limit: usize, + /// The Schema that the server will use. This can be static or a stream for hot reloading. schema: SchemaKind, @@ -557,7 +561,7 @@ impl FederatedServer { let state_machine = StateMachine::new( server_factory, Some(state_listener), - ApolloRouterFactory::new(self.plan_cache_limit), + ApolloRouterFactory::new(self.plan_cache_limit, self.query_cache_limit), ); let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); let result = spawn(async { diff --git a/crates/apollo-router/src/main.rs b/crates/apollo-router/src/main.rs index 26c7e0c7c8..a31337056d 100644 --- a/crates/apollo-router/src/main.rs +++ b/crates/apollo-router/src/main.rs @@ -35,6 +35,10 @@ struct Opt { /// Query Plan cache size (number of entries). #[structopt(long, default_value = "100")] plan_cache_limit: usize, + + /// Query parser cache size (number of entries). + #[structopt(long, default_value = "100")] + query_cache_limit: usize, } /// Wrapper so that structop can display the default config path in the help message. @@ -160,6 +164,7 @@ async fn main() -> Result<()> { .configuration(configuration) .schema(schema) .plan_cache_limit(opt.plan_cache_limit) + .query_cache_limit(opt.query_cache_limit) .shutdown(ShutdownKind::CtrlC) .build(); let mut server_handle = server.serve(); diff --git a/crates/apollo-router/src/router_factory.rs b/crates/apollo-router/src/router_factory.rs index 39407d4d3d..cd947dbbc2 100644 --- a/crates/apollo-router/src/router_factory.rs +++ b/crates/apollo-router/src/router_factory.rs @@ -20,6 +20,7 @@ where configuration: &Configuration, schema: Arc, plan_cache_limit: usize, + query_cache_limit: usize, ) -> future::BoxFuture<'static, Router>; fn recreate( &self, @@ -27,17 +28,23 @@ where configuration: &Configuration, schema: Arc, plan_cache_limit: usize, + query_cache_limit: usize, ) -> future::BoxFuture<'static, Router>; fn get_plan_cache_limit(&self) -> usize; + fn get_query_cache_limit(&self) -> usize; } #[derive(Default)] pub(crate) struct ApolloRouterFactory { plan_cache_limit: usize, + query_cache_limit: usize, } impl ApolloRouterFactory { - pub fn new(plan_cache_limit: usize) -> Self { - Self { plan_cache_limit } + pub fn new(plan_cache_limit: usize, query_cache_limit: usize) -> Self { + Self { + plan_cache_limit, + query_cache_limit, + } } } @@ -47,6 +54,7 @@ impl RouterFactory for ApolloRouterFactory { configuration: &Configuration, schema: Arc, plan_cache_limit: usize, + query_cache_limit: usize, ) -> future::BoxFuture<'static, ApolloRouter> { let service_registry = HttpServiceRegistry::new(configuration); tokio::task::spawn_blocking(move || { @@ -57,6 +65,7 @@ impl RouterFactory for ApolloRouterFactory { ), Arc::new(service_registry), schema, + query_cache_limit, ) }) .map(|res| res.expect("ApolloRouter::new() is infallible; qed")) @@ -69,8 +78,9 @@ impl RouterFactory for ApolloRouterFactory { configuration: &Configuration, schema: Arc, plan_cache_limit: usize, + query_cache_limit: usize, ) -> future::BoxFuture<'static, ApolloRouter> { - let factory = self.create(configuration, schema, plan_cache_limit); + let factory = self.create(configuration, schema, plan_cache_limit, query_cache_limit); Box::pin(async move { // Use the "hot" entries in the supplied router to pre-populate @@ -96,4 +106,8 @@ impl RouterFactory for ApolloRouterFactory { fn get_plan_cache_limit(&self) -> usize { self.plan_cache_limit } + + fn get_query_cache_limit(&self) -> usize { + self.query_cache_limit + } } diff --git a/crates/apollo-router/src/state_machine.rs b/crates/apollo-router/src/state_machine.rs index 7fa7604de7..1e93e78e8c 100644 --- a/crates/apollo-router/src/state_machine.rs +++ b/crates/apollo-router/src/state_machine.rs @@ -197,6 +197,7 @@ where &derived_configuration, Arc::clone(&schema), self.router_factory.get_plan_cache_limit(), + self.router_factory.get_query_cache_limit(), ) .await, ); @@ -257,6 +258,7 @@ where &derived_configuration, Arc::clone(&schema), self.router_factory.get_plan_cache_limit(), + self.router_factory.get_query_cache_limit(), ) .await, ); @@ -358,6 +360,7 @@ where &derived_configuration, Arc::clone(&schema), self.router_factory.get_plan_cache_limit(), + self.router_factory.get_query_cache_limit(), ) .await, ); From dd9a1752a16fd19b5583621d420a7137a1d4d894 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 15:43:19 +0100 Subject: [PATCH 10/20] Use the query cache & remove feat post-processing --- Cargo.lock | 1 - crates/apollo-router-benchmarks/Cargo.toml | 2 +- crates/apollo-router/Cargo.toml | 7 ----- crates/apollo-router/src/apollo_router.rs | 31 ++++++++-------------- 4 files changed, 12 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 214bd2fe9b..bdc41eba09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,7 +80,6 @@ name = "apollo-router" version = "0.1.0-alpha.1" dependencies = [ "anyhow", - "apollo-router", "apollo-router-core", "async-trait", "atty", diff --git a/crates/apollo-router-benchmarks/Cargo.toml b/crates/apollo-router-benchmarks/Cargo.toml index 1e685523a8..0adb5d2c0a 100644 --- a/crates/apollo-router-benchmarks/Cargo.toml +++ b/crates/apollo-router-benchmarks/Cargo.toml @@ -8,7 +8,7 @@ license = "LicenseRef-ELv2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dev-dependencies] -apollo-router = { path = "../apollo-router", features = ["post-processing"] } +apollo-router = { path = "../apollo-router" } apollo-router-core = { path = "../apollo-router-core" } criterion = { version = "0.3", features = ["async_tokio", "async_futures"] } futures = "0.3.18" diff --git a/crates/apollo-router/Cargo.toml b/crates/apollo-router/Cargo.toml index 876bf7b0b5..09a7648657 100644 --- a/crates/apollo-router/Cargo.toml +++ b/crates/apollo-router/Cargo.toml @@ -21,9 +21,6 @@ otlp-grpc = [ "tonic/tls", ] otlp-http = ["opentelemetry-otlp/http-proto"] -# activates the response post-processing feature. It is deactivated by default -# until we solve performance issues -post-processing = [] [dependencies] anyhow = "1.0.48" @@ -70,9 +67,6 @@ warp = { version = "0.3.2", default-features = false, features = [ ] } [dev-dependencies] -apollo-router = { path = ".", default-features = false, features = [ - "post-processing", -] } httpmock = "0.6.4" insta = "1.8.0" maplit = "1.0.2" @@ -88,4 +82,3 @@ uuid = { version = "0.8.2", features = ["serde", "v4"] } [[test]] name = "integration_tests" path = "tests/integration_tests.rs" -required-features = ["post-processing"] diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index 201c90f856..e057466095 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -13,7 +13,7 @@ pub struct ApolloRouter { query_planner: Arc, service_registry: Arc, schema: Arc, - query_cache: QueryCache, + query_cache: Arc, } impl ApolloRouter { @@ -29,7 +29,7 @@ impl ApolloRouter { query_planner, service_registry, schema, - query_cache: QueryCache::new(query_cache_limit), + query_cache: Arc::new(QueryCache::new(query_cache_limit)), } } @@ -66,19 +66,14 @@ impl Router for ApolloRouter { return Err(stream::empty().boxed()); } - // TODO query caching - let query = Arc::new( - Query::parse(&request.query) - .await - .expect("todo") - .expect("todo"), - ); + // Pre-emptively parse the query to populate caching + let _ = self.query_cache.get_query(&request.query).await; Ok(ApolloPreparedQuery { query_plan, service_registry: Arc::clone(&self.service_registry), schema: Arc::clone(&self.schema), - query, + query_cache: Arc::clone(&self.query_cache), }) } } @@ -89,9 +84,7 @@ pub struct ApolloPreparedQuery { query_plan: Arc, service_registry: Arc, schema: Arc, - // TODO - #[allow(dead_code)] - query: Arc, + query_cache: Arc, } #[async_trait::async_trait] @@ -100,23 +93,21 @@ impl PreparedQuery for ApolloPreparedQuery { async fn execute(self, request: Arc) -> ResponseStream { stream::once( async move { - // TODO - #[allow(unused_mut)] let mut response = self .query_plan .node() .expect("we already ensured that the plan is some; qed") .execute( - request, + Arc::clone(&request), Arc::clone(&self.service_registry), Arc::clone(&self.schema), ) .await; - // TODO move query parsing to query creation - #[cfg(feature = "post-processing")] - tracing::debug_span!("format_response") - .in_scope(|| self.query.format_response(&mut response)); + if let Some(query) = self.query_cache.get_query(&request.query).await { + tracing::debug_span!("format_response") + .in_scope(|| query.format_response(&mut response)); + } response } From b38d4b58733e4dbdfb2b8db302441de01dfff453 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 16:00:01 +0100 Subject: [PATCH 11/20] Fix wrong logic for parallel tasks --- crates/apollo-router/src/apollo_router.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index e057466095..09d4068aec 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -66,9 +66,6 @@ impl Router for ApolloRouter { return Err(stream::empty().boxed()); } - // Pre-emptively parse the query to populate caching - let _ = self.query_cache.get_query(&request.query).await; - Ok(ApolloPreparedQuery { query_plan, service_registry: Arc::clone(&self.service_registry), @@ -93,7 +90,7 @@ impl PreparedQuery for ApolloPreparedQuery { async fn execute(self, request: Arc) -> ResponseStream { stream::once( async move { - let mut response = self + let response_task = self .query_plan .node() .expect("we already ensured that the plan is some; qed") @@ -101,10 +98,12 @@ impl PreparedQuery for ApolloPreparedQuery { Arc::clone(&request), Arc::clone(&self.service_registry), Arc::clone(&self.schema), - ) - .await; + ); + let query_task = self.query_cache.get_query(&request.query); + + let (mut response, query) = tokio::join!(response_task, query_task); - if let Some(query) = self.query_cache.get_query(&request.query).await { + if let Some(query) = query { tracing::debug_span!("format_response") .in_scope(|| query.format_response(&mut response)); } From 07a9b0e094d11a46971705337b506a6d843ac33e Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 16:13:27 +0100 Subject: [PATCH 12/20] Fix tests... omg I regret not having refactored prior --- crates/apollo-router-core/src/query.rs | 38 ++++++++----- crates/apollo-router/src/state_machine.rs | 55 ++++++++++++++----- .../apollo-router/tests/integration_tests.rs | 9 ++- 3 files changed, 75 insertions(+), 27 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index c4edbafb32..1025fe7d52 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -283,9 +283,9 @@ mod tests { }; } - #[test] - fn reformat_response_data_field() { - let query = Query::from( + #[test(tokio::test)] + async fn reformat_response_data_field() { + let query = Query::parse( r#"{ foo stuff{bar} @@ -295,7 +295,10 @@ mod tests { alias_obj:baz_obj{bar} alias_array:baz_array{bar} }"#, - ); + ) + .await + .unwrap() + .unwrap(); let mut response = Response::builder() .data(json! {{ "foo": "1", @@ -333,9 +336,12 @@ mod tests { ); } - #[test] - fn reformat_response_data_inline_fragment() { - let query = Query::from(r#"{... on Stuff { stuff{bar}}}"#); + #[test(tokio::test)] + async fn reformat_response_data_inline_fragment() { + let query = Query::parse(r#"{... on Stuff { stuff{bar}}}"#) + .await + .unwrap() + .unwrap(); let mut response = Response::builder() .data(json! {{"stuff": {"bar": "2"}}}) .build(); @@ -350,10 +356,13 @@ mod tests { ); } - #[test] - fn reformat_response_data_fragment_spread() { + #[test(tokio::test)] + async fn reformat_response_data_fragment_spread() { let query = - Query::from(r#"{...foo ...bar} fragment foo on Foo {foo} fragment bar on Bar {bar}"#); + Query::parse(r#"{...foo ...bar} fragment foo on Foo {foo} fragment bar on Bar {bar}"#) + .await + .unwrap() + .unwrap(); let mut response = Response::builder() .data(json! {{"foo": "1", "bar": "2"}}) .build(); @@ -367,9 +376,12 @@ mod tests { ); } - #[test] - fn reformat_response_data_best_effort() { - let query = Query::from(r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#); + #[test(tokio::test)] + async fn reformat_response_data_best_effort() { + let query = Query::parse(r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#) + .await + .unwrap() + .unwrap(); let mut response = Response::builder() .data(json! {{ "foo": "1", diff --git a/crates/apollo-router/src/state_machine.rs b/crates/apollo-router/src/state_machine.rs index 1e93e78e8c..fea036b8f4 100644 --- a/crates/apollo-router/src/state_machine.rs +++ b/crates/apollo-router/src/state_machine.rs @@ -646,13 +646,14 @@ mod tests { .withf( |configuration: &Configuration, _schema: &Arc, - _plan_cache_limit: &usize| { + _plan_cache_limit: &usize, + _query_cache_limit: &usize| { configuration.subgraphs.get("accounts").unwrap().routing_url == "http://accounts/graphql" }, ) .times(1) - .returning(|_, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); // second call, configuration is empty, we should take the URL from the graph router_factory .expect_recreate() @@ -660,16 +661,20 @@ mod tests { |_graph: &Arc, configuration: &Configuration, _schema: &Arc, - _plan_cache_limit: &usize| { + _plan_cache_limit: &usize, + _query_cache_limit: &usize| { configuration.subgraphs.get("accounts").unwrap().routing_url == "http://localhost:4001/graphql" }, ) .times(1) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _, _| future::ready(MockMyRouter::new()).boxed()); router_factory .expect_get_plan_cache_limit() .return_const(10usize); + router_factory + .expect_get_query_cache_limit() + .return_const(10usize); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); assert!(matches!( @@ -731,13 +736,14 @@ mod tests { .withf( |configuration: &Configuration, _schema: &Arc, - _plan_cache_limit: &usize| { + _plan_cache_limit: &usize, + _query_cache_limit: &usize| { configuration.subgraphs.get("accounts").unwrap().routing_url == "http://accounts/graphql" }, ) .times(1) - .returning(|_, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); // second call, configuration is still empty, we should take the URL from the new supergraph router_factory .expect_recreate() @@ -745,17 +751,21 @@ mod tests { |_graph: &Arc, configuration: &Configuration, _schema: &Arc, - _plan_cache_limit: &usize| { + _plan_cache_limit: &usize, + _query_cache_limit: &usize| { println!("got configuration: {:#?}", configuration); configuration.subgraphs.get("accounts").unwrap().routing_url == "http://localhost:4001/graphql" }, ) .times(1) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _, _| future::ready(MockMyRouter::new()).boxed()); router_factory .expect_get_plan_cache_limit() .return_const(10usize); + router_factory + .expect_get_query_cache_limit() + .return_const(10usize); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); assert!(matches!( @@ -807,9 +817,22 @@ mod tests { MyRouterFactory {} impl RouterFactory for MyRouterFactory { - fn create(&self, configuration: &Configuration, schema: Arc, plan_cache_limit: usize) -> future::BoxFuture<'static, MockMyRouter>; - fn recreate(&self, router: Arc, configuration: &Configuration, schema: Arc, plan_cache_limit: usize) -> future::BoxFuture<'static, MockMyRouter>; + fn create( + &self, + configuration: &Configuration, + schema: Arc, + plan_cache_limit: usize, + query_cache_limit: usize, + ) -> future::BoxFuture<'static, MockMyRouter>; + fn recreate(&self, + router: Arc, + configuration: &Configuration, + schema: Arc, + plan_cache_limit: usize, + query_cache_limit: usize, + ) -> future::BoxFuture<'static, MockMyRouter>; fn get_plan_cache_limit(&self) -> usize; + fn get_query_cache_limit(&self) -> usize; } } @@ -908,11 +931,14 @@ mod tests { router_factory .expect_create() .times(expect_times_called) - .returning(|_, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); router_factory .expect_get_plan_cache_limit() .return_const(10usize); router_factory + .expect_get_query_cache_limit() + .return_const(10usize); + router_factory } fn recreate_mock_router_factory(expect_times_called: usize) -> MockMyRouterFactory { @@ -920,14 +946,17 @@ mod tests { router_factory .expect_create() .times(1) - .returning(|_, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); router_factory .expect_recreate() .times(expect_times_called - 1) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _, _, _| future::ready(MockMyRouter::new()).boxed()); router_factory .expect_get_plan_cache_limit() .return_const(10usize); router_factory + .expect_get_query_cache_limit() + .return_const(10usize); + router_factory } } diff --git a/crates/apollo-router/tests/integration_tests.rs b/crates/apollo-router/tests/integration_tests.rs index 581cafce59..77ba44eaae 100644 --- a/crates/apollo-router/tests/integration_tests.rs +++ b/crates/apollo-router/tests/integration_tests.rs @@ -11,6 +11,8 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use test_log::test; +const QUERY_CACHE_LIMIT: usize = 100; + macro_rules! assert_federated_response { ($query:expr, $service_requests:expr $(,)?) => { let request = graphql::Request::builder() @@ -167,7 +169,12 @@ async fn query_rust( &config, ))); - let router = ApolloRouter::new(Arc::new(planner), registry.clone(), schema); + let router = ApolloRouter::new( + Arc::new(planner), + registry.clone(), + schema, + QUERY_CACHE_LIMIT, + ); let stream = match router.prepare_query(&request).await { Ok(route) => route.execute(Arc::new(request)).await, From 2f7fcd458d063769c6760d6fd00218745733dfe6 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 16:14:57 +0100 Subject: [PATCH 13/20] clippy fixes --- crates/apollo-router-core/src/query.rs | 6 +++--- crates/apollo-router-core/src/query_cache.rs | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 1025fe7d52..5e79374cfc 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -121,7 +121,7 @@ impl Query { Value::Object(mut input_object) => { let mut output_object = Object::default(); self.apply_selection_set( - &selection_set, + selection_set, &mut input_object, &mut output_object, ); @@ -135,7 +135,7 @@ impl Query { if let Some(input_object) = element.as_object_mut() { let mut output_object = Object::default(); self.apply_selection_set( - &selection_set, + selection_set, input_object, &mut output_object, ); @@ -168,7 +168,7 @@ impl Query { } } Selection::InlineFragment { selection_set } => { - self.apply_selection_set(&selection_set, input, output); + self.apply_selection_set(selection_set, input, output); } Selection::FragmentSpread { name } => { if let Some(selection_set) = self.fragments.get(name) { diff --git a/crates/apollo-router-core/src/query_cache.rs b/crates/apollo-router-core/src/query_cache.rs index 03fa9b34c4..eee66c881d 100644 --- a/crates/apollo-router-core/src/query_cache.rs +++ b/crates/apollo-router-core/src/query_cache.rs @@ -9,6 +9,7 @@ use tokio::sync::broadcast; #[derive(Debug)] pub struct QueryCache { cached: Mutex>>>, + #[allow(clippy::type_complexity)] wait_map: Mutex>)>>>, cache_limit: usize, } From 722de305dcec78b85bb37d287e54558b5643ff77 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 17:12:23 +0100 Subject: [PATCH 14/20] Prepend schema's fragments to parsed query --- crates/apollo-router-core/src/query.rs | 49 +++++++++++++------ crates/apollo-router-core/src/query_cache.rs | 6 ++- .../src/query_planner/mod.rs | 6 +-- crates/apollo-router-core/src/response.rs | 1 + crates/apollo-router-core/src/schema.rs | 22 +++++++++ crates/apollo-router/src/apollo_router.rs | 2 +- 6 files changed, 65 insertions(+), 21 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 5e79374cfc..8793af12f5 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -39,8 +39,16 @@ impl Query { failfast_debug!("No suitable definition found. This is a bug."); } - pub fn parse(query: impl Into) -> tokio::task::JoinHandle> { + pub fn parse( + query: impl Into, + schema: &Schema, + ) -> tokio::task::JoinHandle> { let string = query.into(); + let mut fragments = schema + .fragments() + .map(|(key, value)| (key.clone(), value.clone())) + .collect::>>(); + tokio::task::spawn_blocking(move || { let parser = apollo_parser::Parser::new(string.as_str()); let tree = parser.parse(); @@ -58,7 +66,7 @@ impl Query { } let document = tree.document(); - let fragments = Self::fragments(&document); + fragments.extend(Self::fragments(&document)); let operations = document .definitions() @@ -182,8 +190,8 @@ impl Query { } } -#[derive(Debug)] -enum Selection { +#[derive(Debug, Clone)] +pub(crate) enum Selection { Field { name: String, selection_set: Option>, @@ -285,6 +293,7 @@ mod tests { #[test(tokio::test)] async fn reformat_response_data_field() { + let schema: Schema = "".parse().unwrap(); let query = Query::parse( r#"{ foo @@ -295,6 +304,7 @@ mod tests { alias_obj:baz_obj{bar} alias_array:baz_array{bar} }"#, + &schema, ) .await .unwrap() @@ -338,7 +348,8 @@ mod tests { #[test(tokio::test)] async fn reformat_response_data_inline_fragment() { - let query = Query::parse(r#"{... on Stuff { stuff{bar}}}"#) + let schema: Schema = "".parse().unwrap(); + let query = Query::parse(r#"{... on Stuff { stuff{bar}}}"#, &schema) .await .unwrap() .unwrap(); @@ -358,13 +369,16 @@ mod tests { #[test(tokio::test)] async fn reformat_response_data_fragment_spread() { - let query = - Query::parse(r#"{...foo ...bar} fragment foo on Foo {foo} fragment bar on Bar {bar}"#) - .await - .unwrap() - .unwrap(); + let schema: Schema = "fragment baz on Baz {baz}".parse().unwrap(); + let query = Query::parse( + r#"{...foo ...bar ...baz} fragment foo on Foo {foo} fragment bar on Bar {bar}"#, + &schema, + ) + .await + .unwrap() + .unwrap(); let mut response = Response::builder() - .data(json! {{"foo": "1", "bar": "2"}}) + .data(json! {{"foo": "1", "bar": "2", "baz": "3"}}) .build(); query.format_response(&mut response); assert_eq_and_ordered!( @@ -372,16 +386,21 @@ mod tests { json! {{ "foo": "1", "bar": "2", + "baz": "3", }}, ); } #[test(tokio::test)] async fn reformat_response_data_best_effort() { - let query = Query::parse(r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#) - .await - .unwrap() - .unwrap(); + let schema: Schema = "".parse().unwrap(); + let query = Query::parse( + r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#, + &schema, + ) + .await + .unwrap() + .unwrap(); let mut response = Response::builder() .data(json! {{ "foo": "1", diff --git a/crates/apollo-router-core/src/query_cache.rs b/crates/apollo-router-core/src/query_cache.rs index eee66c881d..3895e341ab 100644 --- a/crates/apollo-router-core/src/query_cache.rs +++ b/crates/apollo-router-core/src/query_cache.rs @@ -12,15 +12,17 @@ pub struct QueryCache { #[allow(clippy::type_complexity)] wait_map: Mutex>)>>>, cache_limit: usize, + schema: Arc, } impl QueryCache { /// TODO - pub fn new(cache_limit: usize) -> Self { + pub fn new(cache_limit: usize, schema: Arc) -> Self { Self { cached: Mutex::new(LruCache::new(cache_limit)), wait_map: Mutex::new(HashMap::new()), cache_limit, + schema, } } @@ -69,7 +71,7 @@ impl QueryCache { drop(locked_wait_map); // This is the potentially high duration operation // No locks are held here - let parsed_query = match Query::parse(query.as_ref()).await { + let parsed_query = match Query::parse(query.as_ref(), &self.schema).await { Ok(res) => res.map(Arc::new), // Silently ignore cancelled tasks (never happen for blocking tasks). Err(err) if err.is_cancelled() => None, diff --git a/crates/apollo-router-core/src/query_planner/mod.rs b/crates/apollo-router-core/src/query_planner/mod.rs index eee8b3214b..2b010cde5d 100644 --- a/crates/apollo-router-core/src/query_planner/mod.rs +++ b/crates/apollo-router-core/src/query_planner/mod.rs @@ -1,6 +1,6 @@ -mod caching_query_planner; -mod model; -mod router_bridge_query_planner; +pub(crate) mod caching_query_planner; +pub(crate) mod model; +pub(crate) mod router_bridge_query_planner; pub use caching_query_planner::*; pub use model::*; diff --git a/crates/apollo-router-core/src/response.rs b/crates/apollo-router-core/src/response.rs index f76eb4ee2e..b888996044 100644 --- a/crates/apollo-router-core/src/response.rs +++ b/crates/apollo-router-core/src/response.rs @@ -1,4 +1,5 @@ use crate::prelude::graphql::*; +use crate::query_planner::model::Selection; use futures::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::map::Entry; diff --git a/crates/apollo-router-core/src/schema.rs b/crates/apollo-router-core/src/schema.rs index 6411beef66..6f4e7c9761 100644 --- a/crates/apollo-router-core/src/schema.rs +++ b/crates/apollo-router-core/src/schema.rs @@ -7,6 +7,7 @@ pub struct Schema { string: String, subtype_map: HashMap>, subgraphs: HashMap, + fragments: HashMap>, } impl std::str::FromStr for Schema { @@ -23,6 +24,7 @@ impl std::str::FromStr for Schema { let document = tree.document(); let mut subtype_map: HashMap> = Default::default(); let mut subgraphs = HashMap::new(); + let mut fragments = HashMap::new(); // the logic of this algorithm is inspired from the npm package graphql: // https://github.com/graphql/graphql-js/blob/ac8f0c6b484a0d5dca2dc13c387247f96772580a/src/type/schema.ts#L302-L327 @@ -143,6 +145,21 @@ impl std::str::FromStr for Schema { } } } + // Spec: https://spec.graphql.org/draft/#FragmentDefinition + ast::Definition::FragmentDefinition(fragment_definition) => { + let name = fragment_definition + .fragment_name() + .expect("the node FragmentName is not optional in the spec; qed") + .name() + .unwrap() + .text() + .to_string(); + let selection_set = fragment_definition + .selection_set() + .expect("the node SelectionSet is not optional in the spec; qed"); + + fragments.insert(name, selection_set.selections().map(Into::into).collect()); + } _ => {} } } @@ -151,6 +168,7 @@ impl std::str::FromStr for Schema { subtype_map, string: s.to_owned(), subgraphs, + fragments, }) } } @@ -174,6 +192,10 @@ impl Schema { pub fn subgraphs(&self) -> impl Iterator { self.subgraphs.iter() } + + pub(crate) fn fragments(&self) -> impl Iterator)> { + self.fragments.iter() + } } #[cfg(test)] diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index 09d4068aec..4b80803e12 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -28,8 +28,8 @@ impl ApolloRouter { naive_introspection: NaiveIntrospection::from_schema(&schema), query_planner, service_registry, + query_cache: Arc::new(QueryCache::new(query_cache_limit, Arc::clone(&schema))), schema, - query_cache: Arc::new(QueryCache::new(query_cache_limit)), } } From 30907760f82bc32a2afc283668205a21ba37d76c Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 17:41:41 +0100 Subject: [PATCH 15/20] Apply all operation selections --- crates/apollo-router-core/src/query.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 8793af12f5..18e4c5f3f5 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -25,18 +25,20 @@ impl Query { /// query. #[tracing::instrument] pub fn format_response(&self, response: &mut Response) { - for operation in self.operations.iter() { - if let Some(data) = response.data.as_object_mut() { - let mut output = Object::default(); - self.apply_selection_set(&operation.selection_set, data, &mut output); + let data = std::mem::take(&mut response.data); + match data { + Value::Object(init) => { + let output = self.operations.iter().fold(init, |mut input, operation| { + let mut output = Object::default(); + self.apply_selection_set(&operation.selection_set, &mut input, &mut output); + output + }); response.data = output.into(); - return; - } else { + } + _ => { failfast_debug!("Invalid type for data in response."); } } - - failfast_debug!("No suitable definition found. This is a bug."); } pub fn parse( From a1b54301f69c845116280f5caa68cb9c561531ef Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 18:17:34 +0100 Subject: [PATCH 16/20] Apply selection set only if operation name matches --- crates/apollo-router-core/src/query.rs | 21 +++++++++++++++------ crates/apollo-router/src/apollo_router.rs | 5 +++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index 18e4c5f3f5..c543d61403 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -24,14 +24,18 @@ impl Query { /// This will discard unrequested fields and re-order the output to match the order of the /// query. #[tracing::instrument] - pub fn format_response(&self, response: &mut Response) { + pub fn format_response(&self, response: &mut Response, operation_name: Option<&str>) { let data = std::mem::take(&mut response.data); match data { Value::Object(init) => { let output = self.operations.iter().fold(init, |mut input, operation| { - let mut output = Object::default(); - self.apply_selection_set(&operation.selection_set, &mut input, &mut output); - output + if operation.name.as_deref() == operation_name { + let mut output = Object::default(); + self.apply_selection_set(&operation.selection_set, &mut input, &mut output); + output + } else { + input + } }); response.data = output.into(); } @@ -257,12 +261,14 @@ impl From for Selection { #[derive(Debug)] struct Operation { + name: Option, selection_set: Vec, } impl From for Operation { + // Spec: https://spec.graphql.org/draft/#sec-Language.Operations fn from(operation: ast::OperationDefinition) -> Self { - // Spec: https://spec.graphql.org/draft/#sec-Language.Operations + let name = operation.name().map(|x| x.text().to_string()); let selection_set = operation .selection_set() .expect("the node SelectionSet is not optional in the spec; qed") @@ -270,7 +276,10 @@ impl From for Operation { .map(Into::into) .collect(); - Operation { selection_set } + Operation { + selection_set, + name, + } } } diff --git a/crates/apollo-router/src/apollo_router.rs b/crates/apollo-router/src/apollo_router.rs index 4b80803e12..aad0943c2e 100644 --- a/crates/apollo-router/src/apollo_router.rs +++ b/crates/apollo-router/src/apollo_router.rs @@ -104,8 +104,9 @@ impl PreparedQuery for ApolloPreparedQuery { let (mut response, query) = tokio::join!(response_task, query_task); if let Some(query) = query { - tracing::debug_span!("format_response") - .in_scope(|| query.format_response(&mut response)); + tracing::debug_span!("format_response").in_scope(|| { + query.format_response(&mut response, request.operation_name.as_deref()) + }); } response From 54b3bf5aa0bfd82c44bc3abfbb58bc40d1fa8f28 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 18:27:15 +0100 Subject: [PATCH 17/20] Fix and add tests --- crates/apollo-router-core/src/query.rs | 35 ++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/crates/apollo-router-core/src/query.rs b/crates/apollo-router-core/src/query.rs index c543d61403..0b7435022b 100644 --- a/crates/apollo-router-core/src/query.rs +++ b/crates/apollo-router-core/src/query.rs @@ -29,7 +29,7 @@ impl Query { match data { Value::Object(init) => { let output = self.operations.iter().fold(init, |mut input, operation| { - if operation.name.as_deref() == operation_name { + if operation_name.is_none() || operation.name.as_deref() == operation_name { let mut output = Object::default(); self.apply_selection_set(&operation.selection_set, &mut input, &mut output); output @@ -332,7 +332,7 @@ mod tests { "other": "13", }}) .build(); - query.format_response(&mut response); + query.format_response(&mut response, None); assert_eq_and_ordered!( response.data, json! {{ @@ -367,7 +367,7 @@ mod tests { let mut response = Response::builder() .data(json! {{"stuff": {"bar": "2"}}}) .build(); - query.format_response(&mut response); + query.format_response(&mut response, None); assert_eq_and_ordered!( response.data, json! {{ @@ -391,7 +391,7 @@ mod tests { let mut response = Response::builder() .data(json! {{"foo": "1", "bar": "2", "baz": "3"}}) .build(); - query.format_response(&mut response); + query.format_response(&mut response, None); assert_eq_and_ordered!( response.data, json! {{ @@ -424,7 +424,7 @@ mod tests { "other": "6", }}) .build(); - query.format_response(&mut response); + query.format_response(&mut response, None); assert_eq_and_ordered!( response.data, json! {{ @@ -441,4 +441,29 @@ mod tests { }}, ); } + + #[test(tokio::test)] + async fn reformat_matching_operation() { + let schema: Schema = "".parse().unwrap(); + let query = Query::parse( + r#"query MyOperation { + foo + }"#, + &schema, + ) + .await + .unwrap() + .unwrap(); + let mut response = Response::builder() + .data(json! {{ + "foo": "1", + "other": "2", + }}) + .build(); + let untouched = response.clone(); + query.format_response(&mut response, Some("OtherOperation")); + assert_eq_and_ordered!(response.data, untouched.data); + query.format_response(&mut response, Some("MyOperation")); + assert_eq_and_ordered!(response.data, json! {{ "foo": "1" }}); + } } From c4064eed554ee1fe6e91272dc79462e4d5e47fba Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Fri, 26 Nov 2021 18:39:32 +0100 Subject: [PATCH 18/20] Add doc --- crates/apollo-router-core/src/query_cache.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/apollo-router-core/src/query_cache.rs b/crates/apollo-router-core/src/query_cache.rs index 3895e341ab..79db244b7f 100644 --- a/crates/apollo-router-core/src/query_cache.rs +++ b/crates/apollo-router-core/src/query_cache.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; -/// TODO +/// A cache for parsed GraphQL queries. #[derive(Debug)] pub struct QueryCache { cached: Mutex>>>, @@ -16,7 +16,7 @@ pub struct QueryCache { } impl QueryCache { - /// TODO + /// Instantiate a new cache for parsed GraphQL queries. pub fn new(cache_limit: usize, schema: Arc) -> Self { Self { cached: Mutex::new(LruCache::new(cache_limit)), @@ -26,7 +26,7 @@ impl QueryCache { } } - /// TODO + /// Attempt to parse a string to a [`Query`] using cache if possible. pub async fn get_query(&self, query: impl AsRef) -> Option> { let mut locked_cache = self.cached.lock().await; let key = query.as_ref().to_string(); From 44fdb3cfefbc2d279181e28b1e0b07a1466c57f0 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Mon, 29 Nov 2021 13:30:17 +0100 Subject: [PATCH 19/20] Remove blocking future from return value of query parsing --- apollo-router-core/src/query.rs | 98 +++++++++++---------------- apollo-router-core/src/query_cache.rs | 7 +- 2 files changed, 47 insertions(+), 58 deletions(-) diff --git a/apollo-router-core/src/query.rs b/apollo-router-core/src/query.rs index c2fb419656..6feae80a11 100644 --- a/apollo-router-core/src/query.rs +++ b/apollo-router-core/src/query.rs @@ -45,51 +45,46 @@ impl Query { } } - pub fn parse( - query: impl Into, - schema: &Schema, - ) -> tokio::task::JoinHandle> { + pub fn parse(query: impl Into, schema: &Schema) -> Option { let string = query.into(); let mut fragments = schema .fragments() .map(|(key, value)| (key.clone(), value.clone())) .collect::>>(); - tokio::task::spawn_blocking(move || { - let parser = apollo_parser::Parser::new(string.as_str()); - let tree = parser.parse(); + let parser = apollo_parser::Parser::new(string.as_str()); + let tree = parser.parse(); - if !tree.errors().is_empty() { - failfast_debug!( - "Parsing error(s): {}", - tree.errors() - .iter() - .map(|err| format!("{:?}", err)) - .collect::>() - .join(", "), - ); - return None; - } - - let document = tree.document(); - fragments.extend(Self::fragments(&document)); + if !tree.errors().is_empty() { + failfast_debug!( + "Parsing error(s): {}", + tree.errors() + .iter() + .map(|err| format!("{:?}", err)) + .collect::>() + .join(", "), + ); + return None; + } - let operations = document - .definitions() - .filter_map(|definition| { - if let ast::Definition::OperationDefinition(operation) = definition { - Some(operation.into()) - } else { - None - } - }) - .collect(); + let document = tree.document(); + fragments.extend(Self::fragments(&document)); - Some(Query { - string, - fragments, - operations, + let operations = document + .definitions() + .filter_map(|definition| { + if let ast::Definition::OperationDefinition(operation) = definition { + Some(operation.into()) + } else { + None + } }) + .collect(); + + Some(Query { + string, + fragments, + operations, }) } @@ -302,8 +297,8 @@ mod tests { }; } - #[test(tokio::test)] - async fn reformat_response_data_field() { + #[test] + fn reformat_response_data_field() { let schema: Schema = "".parse().unwrap(); let query = Query::parse( r#"{ @@ -317,8 +312,6 @@ mod tests { }"#, &schema, ) - .await - .unwrap() .unwrap(); let mut response = Response::builder() .data(json! {{ @@ -357,13 +350,10 @@ mod tests { ); } - #[test(tokio::test)] - async fn reformat_response_data_inline_fragment() { + #[test] + fn reformat_response_data_inline_fragment() { let schema: Schema = "".parse().unwrap(); - let query = Query::parse(r#"{... on Stuff { stuff{bar}}}"#, &schema) - .await - .unwrap() - .unwrap(); + let query = Query::parse(r#"{... on Stuff { stuff{bar}}}"#, &schema).unwrap(); let mut response = Response::builder() .data(json! {{"stuff": {"bar": "2"}}}) .build(); @@ -378,15 +368,13 @@ mod tests { ); } - #[test(tokio::test)] - async fn reformat_response_data_fragment_spread() { + #[test] + fn reformat_response_data_fragment_spread() { let schema: Schema = "fragment baz on Baz {baz}".parse().unwrap(); let query = Query::parse( r#"{...foo ...bar ...baz} fragment foo on Foo {foo} fragment bar on Bar {bar}"#, &schema, ) - .await - .unwrap() .unwrap(); let mut response = Response::builder() .data(json! {{"foo": "1", "bar": "2", "baz": "3"}}) @@ -402,15 +390,13 @@ mod tests { ); } - #[test(tokio::test)] - async fn reformat_response_data_best_effort() { + #[test] + fn reformat_response_data_best_effort() { let schema: Schema = "".parse().unwrap(); let query = Query::parse( r#"{foo stuff{bar baz} ...fragment array{bar baz} other{bar}}"#, &schema, ) - .await - .unwrap() .unwrap(); let mut response = Response::builder() .data(json! {{ @@ -442,8 +428,8 @@ mod tests { ); } - #[test(tokio::test)] - async fn reformat_matching_operation() { + #[test] + fn reformat_matching_operation() { let schema: Schema = "".parse().unwrap(); let query = Query::parse( r#"query MyOperation { @@ -451,8 +437,6 @@ mod tests { }"#, &schema, ) - .await - .unwrap() .unwrap(); let mut response = Response::builder() .data(json! {{ diff --git a/apollo-router-core/src/query_cache.rs b/apollo-router-core/src/query_cache.rs index 79db244b7f..f0ffd14a0f 100644 --- a/apollo-router-core/src/query_cache.rs +++ b/apollo-router-core/src/query_cache.rs @@ -71,7 +71,12 @@ impl QueryCache { drop(locked_wait_map); // This is the potentially high duration operation // No locks are held here - let parsed_query = match Query::parse(query.as_ref(), &self.schema).await { + let query_parsing_future = { + let query = query.as_ref().to_string(); + let schema = Arc::clone(&self.schema); + tokio::task::spawn_blocking(move || Query::parse(query, &schema)) + }; + let parsed_query = match query_parsing_future.await { Ok(res) => res.map(Arc::new), // Silently ignore cancelled tasks (never happen for blocking tasks). Err(err) if err.is_cancelled() => None, From 33e03a6a20a8ede2dcfc0b2dc567e219ba866d14 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Mon, 29 Nov 2021 13:30:40 +0100 Subject: [PATCH 20/20] Remove unnecessary pub(crate)'s --- apollo-router-core/src/query_planner/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apollo-router-core/src/query_planner/mod.rs b/apollo-router-core/src/query_planner/mod.rs index 2b010cde5d..bdbfcf5bcc 100644 --- a/apollo-router-core/src/query_planner/mod.rs +++ b/apollo-router-core/src/query_planner/mod.rs @@ -1,6 +1,6 @@ -pub(crate) mod caching_query_planner; +mod caching_query_planner; pub(crate) mod model; -pub(crate) mod router_bridge_query_planner; +mod router_bridge_query_planner; pub use caching_query_planner::*; pub use model::*;