diff --git a/apollo-router-core/src/query_planner/caching_query_planner.rs b/apollo-router-core/src/query_planner/caching_query_planner.rs index 2aa0dd2bde..c617482e6c 100644 --- a/apollo-router-core/src/query_planner/caching_query_planner.rs +++ b/apollo-router-core/src/query_planner/caching_query_planner.rs @@ -111,8 +111,10 @@ impl QueryPlanner for CachingQueryPlanner { } } } +} - async fn get_hot_keys(&self) -> Vec { +impl CachingQueryPlanner { + pub async fn get_hot_keys(&self) -> Vec { let locked_cache = self.cached.lock().await; locked_cache .iter() @@ -152,10 +154,6 @@ mod tests { ) -> PlanResult { self.sync_get(query, operation, options) } - - async fn get_hot_keys(&self) -> Vec { - vec![] - } } #[test(tokio::test)] @@ -168,7 +166,7 @@ mod tests { PlanningErrors { errors: Vec::new() }, )))); - let planner = delegate.with_caching(10); + let planner = CachingQueryPlanner::new(delegate, 10); for _ in 0..5 { assert!(planner diff --git a/apollo-router-core/src/query_planner/router_bridge_query_planner.rs b/apollo-router-core/src/query_planner/router_bridge_query_planner.rs index b5d1a3f6d5..4942626665 100644 --- a/apollo-router-core/src/query_planner/router_bridge_query_planner.rs +++ b/apollo-router-core/src/query_planner/router_bridge_query_planner.rs @@ -41,10 +41,6 @@ impl QueryPlanner for RouterBridgeQueryPlanner { }) .await? } - - async fn get_hot_keys(&self) -> Vec { - vec![] - } } impl From for plan::QueryPlanOptions { diff --git a/apollo-router-core/src/traits.rs b/apollo-router-core/src/traits.rs index 17c0102eff..4d88deeae5 100644 --- a/apollo-router-core/src/traits.rs +++ b/apollo-router-core/src/traits.rs @@ -43,26 +43,8 @@ pub trait QueryPlanner: Send + Sync + Debug { operation: Option, options: QueryPlanOptions, ) -> Result, QueryPlannerError>; - - async fn get_hot_keys(&self) -> Vec; } -/// With caching trait. -/// -/// Adds with_caching to any query planner. -pub trait WithCaching: QueryPlanner -where - Self: Sized + QueryPlanner, -{ - /// Wrap this query planner in a caching decorator. - /// The original query planner is consumed. - fn with_caching(self, plan_cache_limit: usize) -> CachingQueryPlanner { - CachingQueryPlanner::new(self, plan_cache_limit) - } -} - -impl WithCaching for T where T: QueryPlanner + Sized {} - /// An object that accepts a [`Request`] and allow creating [`PreparedQuery`]'s. /// /// The call to the function will either succeeds and return a [`PreparedQuery`] or it will fail and return diff --git a/apollo-router/src/apollo_router.rs b/apollo-router/src/apollo_router.rs index 58f453618e..e605ca753a 100644 --- a/apollo-router/src/apollo_router.rs +++ b/apollo-router/src/apollo_router.rs @@ -11,7 +11,7 @@ use tracing_futures::WithSubscriber; pub struct ApolloRouter { #[derivative(Debug = "ignore")] naive_introspection: NaiveIntrospection, - query_planner: Arc, + query_planner: Arc>, service_registry: Arc, schema: Arc, query_cache: Arc, @@ -19,24 +19,57 @@ pub struct ApolloRouter { impl ApolloRouter { /// Create an [`ApolloRouter`] instance used to execute a GraphQL query. - pub fn new( - query_planner: Arc, + pub async fn new( service_registry: Arc, schema: Arc, - query_cache_limit: usize, + previous_router: Option>, ) -> Self { + let plan_cache_limit = std::env::var("ROUTER_PLAN_CACHE_LIMIT") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(100); + let query_cache_limit = std::env::var("ROUTER_QUERY_CACHE_LIMIT") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(100); + let query_planner = Arc::new(CachingQueryPlanner::new( + RouterBridgeQueryPlanner::new(Arc::clone(&schema)), + plan_cache_limit, + )); + + // NaiveIntrospection instantiation can potentially block for some time + let naive_introspection = { + let schema = Arc::clone(&schema); + tokio::task::spawn_blocking(move || NaiveIntrospection::from_schema(&schema)) + .await + .expect("NaiveIntrospection instantiation panicked") + }; + + // Start warming up the cache + // + // We don't need to do this in background because the old server will keep running until + // this one is ready. + // + // If we first warm up the cache in foreground, then switch to the new config, the next + // queries will benefit from the warmed up cache. While if we switch and warm up in + // background, the next queries might be blocked until the cache is primed, so there'll be + // a perf hit. + if let Some(previous_router) = previous_router { + for (query, operation, options) in previous_router.query_planner.get_hot_keys().await { + // We can ignore errors because some of the queries that were previously in the + // cache might not work with the new schema + let _ = query_planner.get(query, operation, options).await; + } + } + Self { - naive_introspection: NaiveIntrospection::from_schema(&schema), + naive_introspection, query_planner, service_registry, query_cache: Arc::new(QueryCache::new(query_cache_limit, Arc::clone(&schema))), schema, } } - - pub fn get_query_planner(&self) -> Arc { - self.query_planner.clone() - } } #[async_trait::async_trait] diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index 175a71bfee..3af77de7e4 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -429,14 +429,6 @@ pub struct FederatedServer { /// The Configuration that the server will use. This can be static or a stream for hot reloading. configuration: ConfigurationKind, - /// Limit query cache entries. - #[builder(default = 100)] - plan_cache_limit: usize, - - /// Limit query cache entries. - #[builder(default = 100)] - query_cache_limit: usize, - /// The Schema that the server will use. This can be static or a stream for hot reloading. schema: SchemaKind, @@ -566,7 +558,7 @@ impl FederatedServer { let state_machine = StateMachine::new( server_factory, Some(state_listener), - ApolloRouterFactory::new(self.plan_cache_limit, self.query_cache_limit), + ApolloRouterFactory::default(), ); let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); let result = spawn(async { diff --git a/apollo-router/src/main.rs b/apollo-router/src/main.rs index 497774a5ec..44dcdd92f6 100644 --- a/apollo-router/src/main.rs +++ b/apollo-router/src/main.rs @@ -31,14 +31,6 @@ struct Opt { /// Schema location relative to the project directory. #[structopt(short, long = "supergraph", parse(from_os_str), env)] supergraph_path: Option, - - /// Query Plan cache size (number of entries). - #[structopt(long, default_value = "100")] - plan_cache_limit: usize, - - /// Query parser cache size (number of entries). - #[structopt(long, default_value = "100")] - query_cache_limit: usize, } /// Wrapper so that structop can display the default config path in the help message. @@ -163,8 +155,6 @@ async fn main() -> Result<()> { let server = FederatedServer::builder() .configuration(configuration) .schema(schema) - .plan_cache_limit(opt.plan_cache_limit) - .query_cache_limit(opt.query_cache_limit) .shutdown(ShutdownKind::CtrlC) .build(); let mut server_handle = server.serve(); diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index cd947dbbc2..c20037a86a 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -1,113 +1,39 @@ use crate::apollo_router::{ApolloPreparedQuery, ApolloRouter}; use crate::configuration::Configuration; use crate::http_service_registry::HttpServiceRegistry; -use apollo_router_core::prelude::{graphql::*, *}; -use futures::prelude::*; +use apollo_router_core::prelude::*; use std::sync::Arc; /// Factory for creating graphs. /// /// This trait enables us to test that `StateMachine` correctly recreates the ApolloRouter when /// necessary e.g. when schema changes. -//#[cfg_attr(test, automock)] +#[async_trait::async_trait] pub(crate) trait RouterFactory where Router: graphql::Router, PreparedQuery: graphql::PreparedQuery, { - fn create( + async fn create( &self, configuration: &Configuration, schema: Arc, - plan_cache_limit: usize, - query_cache_limit: usize, - ) -> future::BoxFuture<'static, Router>; - fn recreate( - &self, - router: Arc, - configuration: &Configuration, - schema: Arc, - plan_cache_limit: usize, - query_cache_limit: usize, - ) -> future::BoxFuture<'static, Router>; - fn get_plan_cache_limit(&self) -> usize; - fn get_query_cache_limit(&self) -> usize; + previous_router: Option>, + ) -> Router; } #[derive(Default)] -pub(crate) struct ApolloRouterFactory { - plan_cache_limit: usize, - query_cache_limit: usize, -} -impl ApolloRouterFactory { - pub fn new(plan_cache_limit: usize, query_cache_limit: usize) -> Self { - Self { - plan_cache_limit, - query_cache_limit, - } - } -} +pub(crate) struct ApolloRouterFactory {} +#[async_trait::async_trait] impl RouterFactory for ApolloRouterFactory { - fn create( + async fn create( &self, configuration: &Configuration, schema: Arc, - plan_cache_limit: usize, - query_cache_limit: usize, - ) -> future::BoxFuture<'static, ApolloRouter> { + previous_router: Option>, + ) -> ApolloRouter { let service_registry = HttpServiceRegistry::new(configuration); - tokio::task::spawn_blocking(move || { - ApolloRouter::new( - Arc::new( - graphql::RouterBridgeQueryPlanner::new(Arc::clone(&schema)) - .with_caching(plan_cache_limit), - ), - Arc::new(service_registry), - schema, - query_cache_limit, - ) - }) - .map(|res| res.expect("ApolloRouter::new() is infallible; qed")) - .boxed() - } - - fn recreate( - &self, - router: Arc, - configuration: &Configuration, - schema: Arc, - plan_cache_limit: usize, - query_cache_limit: usize, - ) -> future::BoxFuture<'static, ApolloRouter> { - let factory = self.create(configuration, schema, plan_cache_limit, query_cache_limit); - - Box::pin(async move { - // Use the "hot" entries in the supplied router to pre-populate - // our new router - let new_router = factory.await; - let hot_keys = router.get_query_planner().get_hot_keys().await; - // It would be nice to get these keys concurrently by spawning - // futures in our loop. However, these calls to get call the - // v8 based query planner and running too many of these - // concurrently is a bad idea. One for the future... - for key in hot_keys { - // We can ignore errors, since we are just warming up the - // cache - let _ = new_router - .get_query_planner() - .get(key.0, key.1, key.2) - .await; - } - new_router - }) - } - - fn get_plan_cache_limit(&self) -> usize { - self.plan_cache_limit - } - - fn get_query_cache_limit(&self) -> usize { - self.query_cache_limit + ApolloRouter::new(Arc::new(service_registry), schema, previous_router).await } } diff --git a/apollo-router/src/state_machine.rs b/apollo-router/src/state_machine.rs index 674bee214f..919fc7a83a 100644 --- a/apollo-router/src/state_machine.rs +++ b/apollo-router/src/state_machine.rs @@ -193,12 +193,10 @@ where let schema = Arc::new(new_schema); let router = Arc::new( self.router_factory - .recreate( - router, + .create( &derived_configuration, Arc::clone(&schema), - self.router_factory.get_plan_cache_limit(), - self.router_factory.get_query_cache_limit(), + Some(router), ) .await, ); @@ -254,12 +252,10 @@ where let derived_configuration = Arc::new(derived_configuration); let router = Arc::new( self.router_factory - .recreate( - router, + .create( &derived_configuration, Arc::clone(&schema), - self.router_factory.get_plan_cache_limit(), - self.router_factory.get_query_cache_limit(), + Some(router), ) .await, ); @@ -357,12 +353,7 @@ where let schema = Arc::new(schema); let router = Arc::new( self.router_factory - .create( - &derived_configuration, - Arc::clone(&schema), - self.router_factory.get_plan_cache_limit(), - self.router_factory.get_query_cache_limit(), - ) + .create(&derived_configuration, Arc::clone(&schema), None) .await, ); @@ -494,7 +485,7 @@ mod tests { #[test(tokio::test)] async fn startup_reload_schema() { - let router_factory = recreate_mock_router_factory(2); + let router_factory = create_mock_router_factory(2); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); let schema = include_str!("testdata/supergraph.graphql"); @@ -534,7 +525,7 @@ mod tests { #[test(tokio::test)] async fn startup_reload_configuration() { - let router_factory = recreate_mock_router_factory(2); + let router_factory = create_mock_router_factory(2); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); assert!(matches!( @@ -649,38 +640,21 @@ mod tests { // first call, we take the URL from the configuration router_factory .expect_create() - .withf( - |configuration: &Configuration, - _schema: &Arc, - _plan_cache_limit: &usize, - _query_cache_limit: &usize| { - configuration.subgraphs.get("accounts").unwrap().routing_url - == "http://accounts/graphql" - }, - ) + .withf(|configuration, _schema, _previous_router| { + configuration.subgraphs.get("accounts").unwrap().routing_url + == "http://accounts/graphql" + }) .times(1) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _| MockMyRouter::new()); // second call, configuration is empty, we should take the URL from the graph router_factory - .expect_recreate() - .withf( - |_graph: &Arc, - configuration: &Configuration, - _schema: &Arc, - _plan_cache_limit: &usize, - _query_cache_limit: &usize| { - configuration.subgraphs.get("accounts").unwrap().routing_url - == "http://localhost:4001/graphql" - }, - ) + .expect_create() + .withf(|configuration, _schema, _previous_router| { + configuration.subgraphs.get("accounts").unwrap().routing_url + == "http://localhost:4001/graphql" + }) .times(1) - .returning(|_, _, _, _, _| future::ready(MockMyRouter::new()).boxed()); - router_factory - .expect_get_plan_cache_limit() - .return_const(10usize); - router_factory - .expect_get_query_cache_limit() - .return_const(10usize); + .returning(|_, _, _| MockMyRouter::new()); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); assert!(matches!( @@ -741,39 +715,22 @@ mod tests { // first call, we take the URL from the first supergraph router_factory .expect_create() - .withf( - |configuration: &Configuration, - _schema: &Arc, - _plan_cache_limit: &usize, - _query_cache_limit: &usize| { - configuration.subgraphs.get("accounts").unwrap().routing_url - == "http://accounts/graphql" - }, - ) + .withf(|configuration, _schema, _previous_router| { + configuration.subgraphs.get("accounts").unwrap().routing_url + == "http://accounts/graphql" + }) .times(1) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); + .returning(|_, _, _| MockMyRouter::new()); // second call, configuration is still empty, we should take the URL from the new supergraph router_factory - .expect_recreate() - .withf( - |_graph: &Arc, - configuration: &Configuration, - _schema: &Arc, - _plan_cache_limit: &usize, - _query_cache_limit: &usize| { - println!("got configuration: {:#?}", configuration); - configuration.subgraphs.get("accounts").unwrap().routing_url - == "http://localhost:4001/graphql" - }, - ) + .expect_create() + .withf(|configuration, _schema, _previous_router| { + println!("got configuration: {:#?}", configuration); + configuration.subgraphs.get("accounts").unwrap().routing_url + == "http://localhost:4001/graphql" + }) .times(1) - .returning(|_, _, _, _, _| future::ready(MockMyRouter::new()).boxed()); - router_factory - .expect_get_plan_cache_limit() - .return_const(10usize); - router_factory - .expect_get_query_cache_limit() - .return_const(10usize); + .returning(|_, _, _| MockMyRouter::new()); let (server_factory, shutdown_receivers) = create_mock_server_factory(2); assert!(matches!( @@ -825,23 +782,14 @@ mod tests { #[derive(Debug)] MyRouterFactory {} + #[async_trait::async_trait] impl RouterFactory for MyRouterFactory { - fn create( + async fn create( &self, configuration: &Configuration, schema: Arc, - plan_cache_limit: usize, - query_cache_limit: usize, - ) -> future::BoxFuture<'static, MockMyRouter>; - fn recreate(&self, - router: Arc, - configuration: &Configuration, - schema: Arc, - plan_cache_limit: usize, - query_cache_limit: usize, - ) -> future::BoxFuture<'static, MockMyRouter>; - fn get_plan_cache_limit(&self) -> usize; - fn get_query_cache_limit(&self) -> usize; + previous_router: Option>, + ) -> MockMyRouter; } } @@ -940,32 +888,7 @@ mod tests { router_factory .expect_create() .times(expect_times_called) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); - router_factory - .expect_get_plan_cache_limit() - .return_const(10usize); - router_factory - .expect_get_query_cache_limit() - .return_const(10usize); - router_factory - } - - fn recreate_mock_router_factory(expect_times_called: usize) -> MockMyRouterFactory { - let mut router_factory = MockMyRouterFactory::new(); - router_factory - .expect_create() - .times(1) - .returning(|_, _, _, _| future::ready(MockMyRouter::new()).boxed()); - router_factory - .expect_recreate() - .times(expect_times_called - 1) - .returning(|_, _, _, _, _| future::ready(MockMyRouter::new()).boxed()); - router_factory - .expect_get_plan_cache_limit() - .return_const(10usize); - router_factory - .expect_get_query_cache_limit() - .return_const(10usize); + .returning(|_, _, _| MockMyRouter::new()); router_factory } } diff --git a/apollo-router/tests/integration_tests.rs b/apollo-router/tests/integration_tests.rs index 77ba44eaae..3542b9f1bf 100644 --- a/apollo-router/tests/integration_tests.rs +++ b/apollo-router/tests/integration_tests.rs @@ -11,8 +11,6 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use test_log::test; -const QUERY_CACHE_LIMIT: usize = 100; - macro_rules! assert_federated_response { ($query:expr, $service_requests:expr $(,)?) => { let request = graphql::Request::builder() @@ -161,7 +159,6 @@ async fn query_rust( request: graphql::Request, ) -> (graphql::ResponseStream, Arc) { let schema = Arc::new(include_str!("fixtures/supergraph.graphql").parse().unwrap()); - let planner = graphql::RouterBridgeQueryPlanner::new(Arc::clone(&schema)); let config = serde_yaml::from_str::(include_str!("fixtures/supergraph_config.yaml")) .unwrap(); @@ -169,12 +166,7 @@ async fn query_rust( &config, ))); - let router = ApolloRouter::new( - Arc::new(planner), - registry.clone(), - schema, - QUERY_CACHE_LIMIT, - ); + let router = ApolloRouter::new(registry.clone(), schema, None).await; let stream = match router.prepare_query(&request).await { Ok(route) => route.execute(Arc::new(request)).await,