Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: Introduce a pool of query planners #4897

Merged
merged 32 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6c6e453
wip
o0Ignition0o Apr 2, 2024
02c3173
looking good
o0Ignition0o Apr 2, 2024
e8e51df
cleanup
o0Ignition0o Apr 3, 2024
0f0e63b
wip
o0Ignition0o Apr 3, 2024
5bfa58d
expose auto in the configuration
o0Ignition0o Apr 3, 2024
23aff3b
make test pass
o0Ignition0o Apr 3, 2024
c1f6170
clean up some error handling
Apr 3, 2024
3126e54
check if channel is full in poll_ready
Apr 3, 2024
1add817
lint
o0Ignition0o Apr 4, 2024
1b7c273
Fix lock contention.
o0Ignition0o Apr 4, 2024
b43666c
lint
o0Ignition0o Apr 4, 2024
41ebc57
Merge branch 'dev' into igni/query_planner_pool
o0Ignition0o Apr 4, 2024
f2ca738
missed one
o0Ignition0o Apr 4, 2024
b89dd13
readd use that is required on linux
o0Ignition0o Apr 4, 2024
fcd78ea
Merge branch 'dev' into igni/query_planner_pool
o0Ignition0o Apr 4, 2024
17c27a2
Igni/query planner pool (#4914)
xuorig Apr 4, 2024
d0f6956
update configuration name
o0Ignition0o Apr 4, 2024
e13bfea
Igni/query planner pool (#4916)
xuorig Apr 4, 2024
4fb2829
changeset
o0Ignition0o Apr 4, 2024
273fd5b
Apply suggestions from code review
o0Ignition0o Apr 5, 2024
b8854fb
update the planner example with usage instructions.
o0Ignition0o Apr 5, 2024
a62e8f4
Merge branch 'dev' into igni/query_planner_pool
o0Ignition0o Apr 5, 2024
1f7e478
address review comments
o0Ignition0o Apr 8, 2024
faf75c8
lint
o0Ignition0o Apr 8, 2024
68d6374
turn expect into an error
o0Ignition0o Apr 8, 2024
a5209a8
simplify configuration deserialization
o0Ignition0o Apr 8, 2024
8cca46c
update snapshot
o0Ignition0o Apr 8, 2024
c8b7dc5
Merge branch 'dev' into igni/query_planner_pool
o0Ignition0o Apr 8, 2024
41c8c35
experimental_available_parallelism -> experimental_parallelism
o0Ignition0o Apr 8, 2024
d492701
add configuration metric
o0Ignition0o Apr 9, 2024
dc7891d
add fixed metric for parallelism if set to auto
o0Ignition0o Apr 9, 2024
903843b
fix remaining tests
o0Ignition0o Apr 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .changesets/exp_carton_ginger_magnet_beacon.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### Experimental: Introduce a pool of query planners ([PR #4897](https://github.com/apollographql/router/pull/4897))

This changeset introduces an experimental pool of query planners to parallelize query planning.
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved

This feature is experimental, you can discuss it by following this link: https://github.com/apollographql/router/discussions/4917


Configuration:
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved

```yaml
supergraph:
query_planner:
experimental_available_parallelism: auto # number of available cpus
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
```

Note you can also set `experimental_available_parallelism` to a number representing how many planners you want to use in a pool.
The default is `1`.

o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
By [@xuorig](https://github.com/xuorig) and [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/4897
7 changes: 4 additions & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ dependencies = [
"petgraph",
"salsa",
"serde_json",
"strum 0.26.1",
"strum 0.26.2",
"strum_macros 0.26.1",
"thiserror",
"url",
Expand All @@ -258,6 +258,7 @@ dependencies = [
"apollo-federation",
"arc-swap",
"askama",
"async-channel 1.9.0",
"async-compression",
"async-trait",
"aws-config",
Expand Down Expand Up @@ -6536,9 +6537,9 @@ dependencies = [

[[package]]
name = "strum"
version = "0.26.1"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f"
checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"

[[package]]
name = "strum_macros"
Expand Down
9 changes: 9 additions & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ anyhow = "1.0.80"
apollo-compiler = "=1.0.0-beta.14"
apollo-federation = "=0.0.9"
arc-swap = "1.6.0"
async-channel = "1.9.0"
bnjjj marked this conversation as resolved.
Show resolved Hide resolved
async-compression = { version = "0.4.6", features = [
"tokio",
"brotli",
Expand Down Expand Up @@ -350,3 +351,11 @@ harness = false
[[bench]]
name = "deeply_nested"
harness = false

[[bench]]
name = "planner"
harness = false

[[example]]
name = "planner"
path = "benches/planner.rs"
Empty file.
63 changes: 63 additions & 0 deletions apollo-router/benches/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::ops::ControlFlow;

use anyhow::Result;
use apollo_router::layers::ServiceBuilderExt;
use apollo_router::plugin::Plugin;
use apollo_router::plugin::PluginInit;
use apollo_router::register_plugin;
use apollo_router::services::execution;
use apollo_router::services::supergraph;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;

#[derive(Debug)]
struct DoNotExecute {
#[allow(dead_code)]
configuration: bool,
}

#[async_trait::async_trait]
impl Plugin for DoNotExecute {
type Config = bool;

async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
Ok(Self {
configuration: init.config,
})
}

fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
ServiceBuilder::new()
.map_request(|mut req: supergraph::Request| {
let body = req.supergraph_request.body_mut();
body.query = body.query.as_ref().map(|query| {
let query_name = format!("query Query{} ", rand::random::<usize>());
query.replacen("query ", query_name.as_str(), 1)
});
req
})
.service(service)
.boxed()
}

fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
ServiceBuilder::new()
.checkpoint(|req: execution::Request| {
Ok(ControlFlow::Break(
execution::Response::fake_builder()
.context(req.context)
.build()
.unwrap(),
))
})
.service(service)
.boxed()
}
}

register_plugin!("apollo-test", "do_not_execute", DoNotExecute);

fn main() -> Result<()> {
apollo_router::main()
}
13 changes: 13 additions & 0 deletions apollo-router/benches/router.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
supergraph:
listen: 0.0.0.0:4100
introspection: true
query_planner:
experimental_available_parallelism: auto
plugins:
experimental.expose_query_plan: true
apollo-test.do_not_execute: true
experimental_graphql_validation_mode: both
sandbox:
enabled: true
homepage:
enabled: false
13 changes: 9 additions & 4 deletions apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
Expand Down Expand Up @@ -64,7 +65,7 @@ use crate::http_server_factory::HttpServerFactory;
use crate::http_server_factory::HttpServerHandle;
use crate::json_ext::Path;
use crate::plugin::test::MockSubgraph;
use crate::query_planner::BridgeQueryPlanner;
use crate::query_planner::BridgeQueryPlannerPool;
use crate::router_factory::create_plugins;
use crate::router_factory::Endpoint;
use crate::router_factory::RouterFactory;
Expand Down Expand Up @@ -2303,9 +2304,13 @@ async fn test_supergraph_timeout() {
let conf: Arc<Configuration> = Arc::new(serde_json::from_value(config).unwrap());

let schema = include_str!("..//testdata/minimal_supergraph.graphql");
let planner = BridgeQueryPlanner::new(schema.to_string(), conf.clone())
.await
.unwrap();
let planner = BridgeQueryPlannerPool::new(
schema.to_string(),
conf.clone(),
NonZeroUsize::new(1).unwrap(),
)
.await
.unwrap();
let schema = planner.schema();

// we do the entire supergraph rebuilding instead of using `from_supergraph_mock_callback_and_configuration`
Expand Down
96 changes: 96 additions & 0 deletions apollo-router/src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub struct Configuration {
/// Batching configuration.
#[serde(default)]
pub(crate) experimental_batching: Batching,

/// Configuration for the query planner
#[serde(default)]
pub(crate) query_planner: QueryPlanner,
}

impl PartialEq for Configuration {
Expand Down Expand Up @@ -256,6 +260,7 @@ impl<'de> serde::Deserialize<'de> for Configuration {
experimental_chaos: Chaos,
experimental_graphql_validation_mode: GraphQLValidationMode,
experimental_batching: Batching,
query_planner: QueryPlanner,
}
let ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?;

Expand All @@ -274,6 +279,7 @@ impl<'de> serde::Deserialize<'de> for Configuration {
.chaos(ad_hoc.experimental_chaos)
.uplink(ad_hoc.uplink)
.graphql_validation_mode(ad_hoc.experimental_graphql_validation_mode)
.query_planner(ad_hoc.query_planner)
.experimental_batching(ad_hoc.experimental_batching)
.build()
.map_err(|e| serde::de::Error::custom(e.to_string()))
Expand Down Expand Up @@ -313,6 +319,7 @@ impl Configuration {
graphql_validation_mode: Option<GraphQLValidationMode>,
experimental_api_schema_generation_mode: Option<ApiSchemaMode>,
experimental_batching: Option<Batching>,
query_planner: Option<QueryPlanner>,
) -> Result<Self, ConfigurationError> {
#[cfg(not(test))]
let notify_queue_cap = match apollo_plugins.get(APOLLO_SUBSCRIPTION_PLUGIN_NAME) {
Expand Down Expand Up @@ -349,6 +356,7 @@ impl Configuration {
tls: tls.unwrap_or_default(),
uplink,
experimental_batching: experimental_batching.unwrap_or_default(),
query_planner: query_planner.unwrap_or_default(),
#[cfg(test)]
notify: notify.unwrap_or_default(),
#[cfg(not(test))]
Expand Down Expand Up @@ -389,6 +397,7 @@ impl Configuration {
graphql_validation_mode: Option<GraphQLValidationMode>,
experimental_batching: Option<Batching>,
experimental_api_schema_generation_mode: Option<ApiSchemaMode>,
query_planner: Option<QueryPlanner>,
) -> Result<Self, ConfigurationError> {
let configuration = Self {
validated_yaml: Default::default(),
Expand All @@ -413,6 +422,7 @@ impl Configuration {
apq: apq.unwrap_or_default(),
persisted_queries: persisted_query.unwrap_or_default(),
uplink,
query_planner: query_planner.unwrap_or_default(),
experimental_batching: experimental_batching.unwrap_or_default(),
};

Expand Down Expand Up @@ -625,6 +635,88 @@ pub(crate) struct Supergraph {
/// Log a message if the client closes the connection before the response is sent.
/// Default: false.
pub(crate) experimental_log_on_broken_pipe: bool,

/// Configuration options pertaining to the query planner component.
pub(crate) query_planner: QueryPlanner,
}

/// Configuration options pertaining to the query planner component.
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub(crate) struct QueryPlanner {
/// Set the size of a pool of workers to enable query planning parallelism.
/// Default: 1.
pub(crate) experimental_available_parallelism: AvailableParallelism,
}

#[derive(Debug, Clone, JsonSchema)]
#[serde(untagged)]
pub(crate) enum AvailableParallelism {
o0Ignition0o marked this conversation as resolved.
Show resolved Hide resolved
Fixed(NonZeroUsize),
Automatic(String),
}

impl Serialize for AvailableParallelism {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
Self::Fixed(n) => {
let as_u64 = u64::try_from(n.get()).map_err(|_| {
serde::ser::Error::custom(format!("{} is out of range", n).as_str())
})?;
serializer.serialize_u64(as_u64)
}
Self::Automatic(_) => serializer.serialize_str("auto"),
}
}
}

impl<'de> Deserialize<'de> for AvailableParallelism {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
#[serde(deny_unknown_fields)]
enum AdHoc {
Fixed(NonZeroUsize),
Automatic(String),
}

let ad_hoc: AdHoc = serde::Deserialize::deserialize(deserializer)?;

match ad_hoc {
AdHoc::Fixed(n) => Ok(Self::Fixed(n)),
AdHoc::Automatic(s) => {
if s.as_str() == "auto" {
Ok(Self::Automatic(s))
} else {
Err(serde::de::Error::custom(format!(
"expected a non zero number, or 'auto', found {}",
s
)))
}
}
}
}
}

impl Default for AvailableParallelism {
fn default() -> Self {
Self::Fixed(NonZeroUsize::new(1).expect("cannot fail"))
}
}

impl QueryPlanner {
pub(crate) fn experimental_query_planner_parallelism(&self) -> io::Result<NonZeroUsize> {
match self.experimental_available_parallelism {
AvailableParallelism::Automatic(_) => std::thread::available_parallelism(),
AvailableParallelism::Fixed(n) => Ok(n),
}
}
}

fn default_defer_support() -> bool {
Expand All @@ -643,6 +735,7 @@ impl Supergraph {
reuse_query_fragments: Option<bool>,
early_cancel: Option<bool>,
experimental_log_on_broken_pipe: Option<bool>,
query_planner: Option<QueryPlanner>,
) -> Self {
Self {
listen: listen.unwrap_or_else(default_graphql_listen),
Expand All @@ -653,6 +746,7 @@ impl Supergraph {
reuse_query_fragments,
early_cancel: early_cancel.unwrap_or_default(),
experimental_log_on_broken_pipe: experimental_log_on_broken_pipe.unwrap_or_default(),
query_planner: query_planner.unwrap_or_default(),
}
}
}
Expand All @@ -670,6 +764,7 @@ impl Supergraph {
reuse_query_fragments: Option<bool>,
early_cancel: Option<bool>,
experimental_log_on_broken_pipe: Option<bool>,
query_planner: Option<QueryPlanner>,
) -> Self {
Self {
listen: listen.unwrap_or_else(test_listen),
Expand All @@ -680,6 +775,7 @@ impl Supergraph {
reuse_query_fragments,
early_cancel: early_cancel.unwrap_or_default(),
experimental_log_on_broken_pipe: experimental_log_on_broken_pipe.unwrap_or_default(),
query_planner: query_planner.unwrap_or_default(),
}
}
}
Expand Down
Loading
Loading