Skip to content

Commit

Permalink
Refactor cache parameters (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
cecton authored Dec 3, 2021
1 parent ee0d7f9 commit 31eb0d8
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 262 deletions.
10 changes: 4 additions & 6 deletions apollo-router-core/src/query_planner/caching_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ impl<T: QueryPlanner> QueryPlanner for CachingQueryPlanner<T> {
}
}
}
}

async fn get_hot_keys(&self) -> Vec<QueryKey> {
impl<T: QueryPlanner> CachingQueryPlanner<T> {
pub async fn get_hot_keys(&self) -> Vec<QueryKey> {
let locked_cache = self.cached.lock().await;
locked_cache
.iter()
Expand Down Expand Up @@ -152,10 +154,6 @@ mod tests {
) -> PlanResult {
self.sync_get(query, operation, options)
}

async fn get_hot_keys(&self) -> Vec<QueryKey> {
vec![]
}
}

#[test(tokio::test)]
Expand All @@ -168,7 +166,7 @@ mod tests {
PlanningErrors { errors: Vec::new() },
))));

let planner = delegate.with_caching(10);
let planner = CachingQueryPlanner::new(delegate, 10);

for _ in 0..5 {
assert!(planner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ impl QueryPlanner for RouterBridgeQueryPlanner {
})
.await?
}

async fn get_hot_keys(&self) -> Vec<QueryKey> {
vec![]
}
}

impl From<QueryPlanOptions> for plan::QueryPlanOptions {
Expand Down
18 changes: 0 additions & 18 deletions apollo-router-core/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,8 @@ pub trait QueryPlanner: Send + Sync + Debug {
operation: Option<String>,
options: QueryPlanOptions,
) -> Result<Arc<QueryPlan>, QueryPlannerError>;

async fn get_hot_keys(&self) -> Vec<QueryKey>;
}

/// With caching trait.
///
/// Adds with_caching to any query planner.
pub trait WithCaching: QueryPlanner
where
Self: Sized + QueryPlanner,
{
/// Wrap this query planner in a caching decorator.
/// The original query planner is consumed.
fn with_caching(self, plan_cache_limit: usize) -> CachingQueryPlanner<Self> {
CachingQueryPlanner::new(self, plan_cache_limit)
}
}

impl<T: ?Sized> WithCaching for T where T: QueryPlanner + Sized {}

/// An object that accepts a [`Request`] and allow creating [`PreparedQuery`]'s.
///
/// The call to the function will either succeeds and return a [`PreparedQuery`] or it will fail and return
Expand Down
51 changes: 42 additions & 9 deletions apollo-router/src/apollo_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,65 @@ use tracing_futures::WithSubscriber;
pub struct ApolloRouter {
#[derivative(Debug = "ignore")]
naive_introspection: NaiveIntrospection,
query_planner: Arc<dyn QueryPlanner>,
query_planner: Arc<CachingQueryPlanner<RouterBridgeQueryPlanner>>,
service_registry: Arc<dyn ServiceRegistry>,
schema: Arc<Schema>,
query_cache: Arc<QueryCache>,
}

impl ApolloRouter {
/// Create an [`ApolloRouter`] instance used to execute a GraphQL query.
pub fn new(
query_planner: Arc<dyn QueryPlanner>,
pub async fn new(
service_registry: Arc<dyn ServiceRegistry>,
schema: Arc<Schema>,
query_cache_limit: usize,
previous_router: Option<Arc<ApolloRouter>>,
) -> Self {
let plan_cache_limit = std::env::var("ROUTER_PLAN_CACHE_LIMIT")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(100);
let query_cache_limit = std::env::var("ROUTER_QUERY_CACHE_LIMIT")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(100);
let query_planner = Arc::new(CachingQueryPlanner::new(
RouterBridgeQueryPlanner::new(Arc::clone(&schema)),
plan_cache_limit,
));

// NaiveIntrospection instantiation can potentially block for some time
let naive_introspection = {
let schema = Arc::clone(&schema);
tokio::task::spawn_blocking(move || NaiveIntrospection::from_schema(&schema))
.await
.expect("NaiveIntrospection instantiation panicked")
};

// Start warming up the cache
//
// We don't need to do this in background because the old server will keep running until
// this one is ready.
//
// If we first warm up the cache in foreground, then switch to the new config, the next
// queries will benefit from the warmed up cache. While if we switch and warm up in
// background, the next queries might be blocked until the cache is primed, so there'll be
// a perf hit.
if let Some(previous_router) = previous_router {
for (query, operation, options) in previous_router.query_planner.get_hot_keys().await {
// We can ignore errors because some of the queries that were previously in the
// cache might not work with the new schema
let _ = query_planner.get(query, operation, options).await;
}
}

Self {
naive_introspection: NaiveIntrospection::from_schema(&schema),
naive_introspection,
query_planner,
service_registry,
query_cache: Arc::new(QueryCache::new(query_cache_limit, Arc::clone(&schema))),
schema,
}
}

pub fn get_query_planner(&self) -> Arc<dyn QueryPlanner> {
self.query_planner.clone()
}
}

#[async_trait::async_trait]
Expand Down
10 changes: 1 addition & 9 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,6 @@ pub struct FederatedServer {
/// The Configuration that the server will use. This can be static or a stream for hot reloading.
configuration: ConfigurationKind,

/// Limit query cache entries.
#[builder(default = 100)]
plan_cache_limit: usize,

/// Limit query cache entries.
#[builder(default = 100)]
query_cache_limit: usize,

/// The Schema that the server will use. This can be static or a stream for hot reloading.
schema: SchemaKind,

Expand Down Expand Up @@ -566,7 +558,7 @@ impl FederatedServer {
let state_machine = StateMachine::new(
server_factory,
Some(state_listener),
ApolloRouterFactory::new(self.plan_cache_limit, self.query_cache_limit),
ApolloRouterFactory::default(),
);
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let result = spawn(async {
Expand Down
10 changes: 0 additions & 10 deletions apollo-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ struct Opt {
/// Schema location relative to the project directory.
#[structopt(short, long = "supergraph", parse(from_os_str), env)]
supergraph_path: Option<PathBuf>,

/// Query Plan cache size (number of entries).
#[structopt(long, default_value = "100")]
plan_cache_limit: usize,

/// Query parser cache size (number of entries).
#[structopt(long, default_value = "100")]
query_cache_limit: usize,
}

/// Wrapper so that structop can display the default config path in the help message.
Expand Down Expand Up @@ -163,8 +155,6 @@ async fn main() -> Result<()> {
let server = FederatedServer::builder()
.configuration(configuration)
.schema(schema)
.plan_cache_limit(opt.plan_cache_limit)
.query_cache_limit(opt.query_cache_limit)
.shutdown(ShutdownKind::CtrlC)
.build();
let mut server_handle = server.serve();
Expand Down
96 changes: 11 additions & 85 deletions apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,39 @@
use crate::apollo_router::{ApolloPreparedQuery, ApolloRouter};
use crate::configuration::Configuration;
use crate::http_service_registry::HttpServiceRegistry;
use apollo_router_core::prelude::{graphql::*, *};
use futures::prelude::*;
use apollo_router_core::prelude::*;
use std::sync::Arc;

/// Factory for creating graphs.
///
/// This trait enables us to test that `StateMachine` correctly recreates the ApolloRouter when
/// necessary e.g. when schema changes.
//#[cfg_attr(test, automock)]
#[async_trait::async_trait]
pub(crate) trait RouterFactory<Router, PreparedQuery>
where
Router: graphql::Router<PreparedQuery>,
PreparedQuery: graphql::PreparedQuery,
{
fn create(
async fn create(
&self,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, Router>;
fn recreate(
&self,
router: Arc<Router>,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, Router>;
fn get_plan_cache_limit(&self) -> usize;
fn get_query_cache_limit(&self) -> usize;
previous_router: Option<Arc<Router>>,
) -> Router;
}

#[derive(Default)]
pub(crate) struct ApolloRouterFactory {
plan_cache_limit: usize,
query_cache_limit: usize,
}
impl ApolloRouterFactory {
pub fn new(plan_cache_limit: usize, query_cache_limit: usize) -> Self {
Self {
plan_cache_limit,
query_cache_limit,
}
}
}
pub(crate) struct ApolloRouterFactory {}

#[async_trait::async_trait]
impl RouterFactory<ApolloRouter, ApolloPreparedQuery> for ApolloRouterFactory {
fn create(
async fn create(
&self,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, ApolloRouter> {
previous_router: Option<Arc<ApolloRouter>>,
) -> ApolloRouter {
let service_registry = HttpServiceRegistry::new(configuration);
tokio::task::spawn_blocking(move || {
ApolloRouter::new(
Arc::new(
graphql::RouterBridgeQueryPlanner::new(Arc::clone(&schema))
.with_caching(plan_cache_limit),
),
Arc::new(service_registry),
schema,
query_cache_limit,
)
})
.map(|res| res.expect("ApolloRouter::new() is infallible; qed"))
.boxed()
}

fn recreate(
&self,
router: Arc<ApolloRouter>,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, ApolloRouter> {
let factory = self.create(configuration, schema, plan_cache_limit, query_cache_limit);

Box::pin(async move {
// Use the "hot" entries in the supplied router to pre-populate
// our new router
let new_router = factory.await;
let hot_keys = router.get_query_planner().get_hot_keys().await;
// It would be nice to get these keys concurrently by spawning
// futures in our loop. However, these calls to get call the
// v8 based query planner and running too many of these
// concurrently is a bad idea. One for the future...
for key in hot_keys {
// We can ignore errors, since we are just warming up the
// cache
let _ = new_router
.get_query_planner()
.get(key.0, key.1, key.2)
.await;
}
new_router
})
}

fn get_plan_cache_limit(&self) -> usize {
self.plan_cache_limit
}

fn get_query_cache_limit(&self) -> usize {
self.query_cache_limit
ApolloRouter::new(Arc::new(service_registry), schema, previous_router).await
}
}
Loading

0 comments on commit 31eb0d8

Please sign in to comment.