Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cache parameters #225

Merged
merged 16 commits into from
Dec 3, 2021
Merged
10 changes: 4 additions & 6 deletions apollo-router-core/src/query_planner/caching_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ impl<T: QueryPlanner> QueryPlanner for CachingQueryPlanner<T> {
}
}
}
}

async fn get_hot_keys(&self) -> Vec<QueryKey> {
impl<T: QueryPlanner> CachingQueryPlanner<T> {
pub async fn get_hot_keys(&self) -> Vec<QueryKey> {
let locked_cache = self.cached.lock().await;
locked_cache
.iter()
Expand Down Expand Up @@ -152,10 +154,6 @@ mod tests {
) -> PlanResult {
self.sync_get(query, operation, options)
}

async fn get_hot_keys(&self) -> Vec<QueryKey> {
vec![]
}
}

#[test(tokio::test)]
Expand All @@ -168,7 +166,7 @@ mod tests {
PlanningErrors { errors: Vec::new() },
))));

let planner = delegate.with_caching(10);
let planner = CachingQueryPlanner::new(delegate, 10);

for _ in 0..5 {
assert!(planner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ impl QueryPlanner for RouterBridgeQueryPlanner {
})
.await?
}

async fn get_hot_keys(&self) -> Vec<QueryKey> {
vec![]
}
}

impl From<QueryPlanOptions> for plan::QueryPlanOptions {
Expand Down
18 changes: 0 additions & 18 deletions apollo-router-core/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,8 @@ pub trait QueryPlanner: Send + Sync + Debug {
operation: Option<String>,
options: QueryPlanOptions,
) -> Result<Arc<QueryPlan>, QueryPlannerError>;

async fn get_hot_keys(&self) -> Vec<QueryKey>;
}

/// With caching trait.
///
/// Adds with_caching to any query planner.
pub trait WithCaching: QueryPlanner
where
Self: Sized + QueryPlanner,
{
/// Wrap this query planner in a caching decorator.
/// The original query planner is consumed.
fn with_caching(self, plan_cache_limit: usize) -> CachingQueryPlanner<Self> {
CachingQueryPlanner::new(self, plan_cache_limit)
}
}

impl<T: ?Sized> WithCaching for T where T: QueryPlanner + Sized {}

/// An object that accepts a [`Request`] and allow creating [`PreparedQuery`]'s.
///
/// The call to the function will either succeeds and return a [`PreparedQuery`] or it will fail and return
Expand Down
51 changes: 42 additions & 9 deletions apollo-router/src/apollo_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,65 @@ use tracing_futures::WithSubscriber;
pub struct ApolloRouter {
#[derivative(Debug = "ignore")]
naive_introspection: NaiveIntrospection,
query_planner: Arc<dyn QueryPlanner>,
query_planner: Arc<CachingQueryPlanner<RouterBridgeQueryPlanner>>,
service_registry: Arc<dyn ServiceRegistry>,
schema: Arc<Schema>,
query_cache: Arc<QueryCache>,
}

impl ApolloRouter {
/// Create an [`ApolloRouter`] instance used to execute a GraphQL query.
pub fn new(
query_planner: Arc<dyn QueryPlanner>,
pub async fn new(
service_registry: Arc<dyn ServiceRegistry>,
schema: Arc<Schema>,
query_cache_limit: usize,
previous_router: Option<Arc<ApolloRouter>>,
) -> Self {
let plan_cache_limit = std::env::var("ROUTER_PLAN_CACHE_LIMIT")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(100);
let query_cache_limit = std::env::var("ROUTER_QUERY_CACHE_LIMIT")
.ok()
.and_then(|x| x.parse().ok())
.unwrap_or(100);
let query_planner = Arc::new(CachingQueryPlanner::new(
RouterBridgeQueryPlanner::new(Arc::clone(&schema)),
plan_cache_limit,
));

// NaiveIntrospection instantiation can potentially block for some time
let naive_introspection = {
let schema = Arc::clone(&schema);
tokio::task::spawn_blocking(move || NaiveIntrospection::from_schema(&schema))
.await
.expect("NaiveIntrospection instantiation panicked")
};

// Start warming up the cache
//
// We don't need to do this in background because the old server will keep running until
// this one is ready.
//
// If we first warm up the cache in foreground, then switch to the new config, the next
// queries will benefit from the warmed up cache. While if we switch and warm up in
// background, the next queries might be blocked until the cache is primed, so there'll be
// a perf hit.
if let Some(previous_router) = previous_router {
for (query, operation, options) in previous_router.query_planner.get_hot_keys().await {
// We can ignore errors because some of the queries that were previously in the
// cache might not work with the new schema
let _ = query_planner.get(query, operation, options).await;
}
}

Self {
naive_introspection: NaiveIntrospection::from_schema(&schema),
naive_introspection,
query_planner,
service_registry,
query_cache: Arc::new(QueryCache::new(query_cache_limit, Arc::clone(&schema))),
schema,
}
}

pub fn get_query_planner(&self) -> Arc<dyn QueryPlanner> {
self.query_planner.clone()
}
}

#[async_trait::async_trait]
Expand Down
10 changes: 1 addition & 9 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,6 @@ pub struct FederatedServer {
/// The Configuration that the server will use. This can be static or a stream for hot reloading.
configuration: ConfigurationKind,

/// Limit query cache entries.
#[builder(default = 100)]
plan_cache_limit: usize,

/// Limit query cache entries.
#[builder(default = 100)]
query_cache_limit: usize,

/// The Schema that the server will use. This can be static or a stream for hot reloading.
schema: SchemaKind,

Expand Down Expand Up @@ -566,7 +558,7 @@ impl FederatedServer {
let state_machine = StateMachine::new(
server_factory,
Some(state_listener),
ApolloRouterFactory::new(self.plan_cache_limit, self.query_cache_limit),
ApolloRouterFactory::default(),
);
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let result = spawn(async {
Expand Down
10 changes: 0 additions & 10 deletions apollo-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ struct Opt {
/// Schema location relative to the project directory.
#[structopt(short, long = "supergraph", parse(from_os_str), env)]
supergraph_path: Option<PathBuf>,

/// Query Plan cache size (number of entries).
#[structopt(long, default_value = "100")]
plan_cache_limit: usize,

/// Query parser cache size (number of entries).
#[structopt(long, default_value = "100")]
query_cache_limit: usize,
}

/// Wrapper so that structop can display the default config path in the help message.
Expand Down Expand Up @@ -163,8 +155,6 @@ async fn main() -> Result<()> {
let server = FederatedServer::builder()
.configuration(configuration)
.schema(schema)
.plan_cache_limit(opt.plan_cache_limit)
.query_cache_limit(opt.query_cache_limit)
.shutdown(ShutdownKind::CtrlC)
.build();
let mut server_handle = server.serve();
Expand Down
96 changes: 11 additions & 85 deletions apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,39 @@
use crate::apollo_router::{ApolloPreparedQuery, ApolloRouter};
use crate::configuration::Configuration;
use crate::http_service_registry::HttpServiceRegistry;
use apollo_router_core::prelude::{graphql::*, *};
use futures::prelude::*;
use apollo_router_core::prelude::*;
use std::sync::Arc;

/// Factory for creating graphs.
///
/// This trait enables us to test that `StateMachine` correctly recreates the ApolloRouter when
/// necessary e.g. when schema changes.
//#[cfg_attr(test, automock)]
#[async_trait::async_trait]
pub(crate) trait RouterFactory<Router, PreparedQuery>
where
Router: graphql::Router<PreparedQuery>,
PreparedQuery: graphql::PreparedQuery,
{
fn create(
async fn create(
&self,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, Router>;
fn recreate(
&self,
router: Arc<Router>,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, Router>;
fn get_plan_cache_limit(&self) -> usize;
fn get_query_cache_limit(&self) -> usize;
previous_router: Option<Arc<Router>>,
) -> Router;
}

#[derive(Default)]
pub(crate) struct ApolloRouterFactory {
plan_cache_limit: usize,
query_cache_limit: usize,
}
impl ApolloRouterFactory {
pub fn new(plan_cache_limit: usize, query_cache_limit: usize) -> Self {
Self {
plan_cache_limit,
query_cache_limit,
}
}
}
pub(crate) struct ApolloRouterFactory {}

#[async_trait::async_trait]
impl RouterFactory<ApolloRouter, ApolloPreparedQuery> for ApolloRouterFactory {
fn create(
async fn create(
&self,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, ApolloRouter> {
previous_router: Option<Arc<ApolloRouter>>,
) -> ApolloRouter {
let service_registry = HttpServiceRegistry::new(configuration);
tokio::task::spawn_blocking(move || {
ApolloRouter::new(
Arc::new(
graphql::RouterBridgeQueryPlanner::new(Arc::clone(&schema))
.with_caching(plan_cache_limit),
),
Arc::new(service_registry),
schema,
query_cache_limit,
)
})
.map(|res| res.expect("ApolloRouter::new() is infallible; qed"))
.boxed()
}

fn recreate(
&self,
router: Arc<ApolloRouter>,
configuration: &Configuration,
schema: Arc<graphql::Schema>,
plan_cache_limit: usize,
query_cache_limit: usize,
) -> future::BoxFuture<'static, ApolloRouter> {
let factory = self.create(configuration, schema, plan_cache_limit, query_cache_limit);

Box::pin(async move {
// Use the "hot" entries in the supplied router to pre-populate
// our new router
let new_router = factory.await;
let hot_keys = router.get_query_planner().get_hot_keys().await;
// It would be nice to get these keys concurrently by spawning
// futures in our loop. However, these calls to get call the
// v8 based query planner and running too many of these
// concurrently is a bad idea. One for the future...
for key in hot_keys {
// We can ignore errors, since we are just warming up the
// cache
let _ = new_router
.get_query_planner()
.get(key.0, key.1, key.2)
.await;
}
new_router
})
}

fn get_plan_cache_limit(&self) -> usize {
self.plan_cache_limit
}

fn get_query_cache_limit(&self) -> usize {
self.query_cache_limit
ApolloRouter::new(Arc::new(service_registry), schema, previous_router).await
}
}
Loading