From ad55cdfe8b6c49409c1a80d834e36a15dbb93b49 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 25 Jan 2024 23:58:56 +0100 Subject: [PATCH] refactor(graph-gateway): add query selector extractor and check (#561) --- graph-gateway/src/client_query.rs | 91 ++++--- graph-gateway/src/client_query/auth.rs | 23 ++ graph-gateway/src/client_query/auth/common.rs | 10 + graph-gateway/src/client_query/auth/studio.rs | 13 + .../src/client_query/auth/subscriptions.rs | 13 + .../src/client_query/query_selector.rs | 236 ++++++++++++++++++ 6 files changed, 339 insertions(+), 47 deletions(-) create mode 100644 graph-gateway/src/client_query/query_selector.rs diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 474f73e0..f1aaf98b 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use axum::extract::OriginalUri; use axum::{ body::Bytes, - extract::{Path, State}, + extract::State, http::{HeaderMap, Response, StatusCode}, Extension, }; @@ -20,7 +20,7 @@ use prost::bytes::Buf; use rand::{rngs::SmallRng, SeedableRng as _}; use serde::Deserialize; use serde_json::value::RawValue; -use thegraph::types::{attestation, BlockPointer, DeploymentId, SubgraphId}; +use thegraph::types::{attestation, BlockPointer, DeploymentId}; use tokio::sync::mpsc; use toolshed::buffer_queue::QueueWriter; use tracing::Instrument; @@ -44,6 +44,7 @@ use indexer_selection::{ }; use crate::block_constraints::{block_constraints, make_query_deterministic}; +use crate::client_query::query_selector::QuerySelector; use crate::indexer_client::{check_block_error, IndexerClient, ResponsePayload}; use crate::reports::{self, serialize_attestation, KafkaClient}; use crate::topology::{Deployment, GraphNetwork, Subgraph}; @@ -61,6 +62,7 @@ mod graphql; mod l2_forwarding; pub mod legacy_auth_adapter; pub mod query_id; +mod query_selector; pub mod query_tracing; pub mod require_auth; @@ -74,28 +76,32 @@ pub async fn handle_query( State(ctx): State, Extension(auth): Extension, OriginalUri(original_uri): OriginalUri, - Path(params): Path>, + selector: QuerySelector, headers: HeaderMap, payload: Bytes, ) -> Response { - let span = tracing::span::Span::current(); - let start_time = Instant::now(); let timestamp = unix_timestamp(); - let resolved_deployments = resolve_subgraph_deployments(&ctx.network, ¶ms).await; + // Check if the query selector is authorized by the auth token + match &selector { + QuerySelector::Subgraph(id) => { + if !auth.is_subgraph_authorized(id) { + return graphql::error_response(Error::Auth(anyhow!( + "Subgraph not authorized by user" + ))); + } + } + QuerySelector::Deployment(id) => { + if !auth.is_deployment_authorized(id) { + return graphql::error_response(Error::Auth(anyhow!( + "Deployment not authorized by user" + ))); + } + } + } - // This is very useful for investigating gateway logs in production - let selector = match &resolved_deployments { - Ok((_, Some(subgraph))) => subgraph.id.to_string(), - Ok((deployments, None)) => deployments - .iter() - .map(|d| d.id.to_string()) - .collect::>() - .join(","), - Err(_) => "".to_string(), - }; - span.record("selector", tracing::field::display(selector)); + let resolved_deployments = resolve_subgraph_deployments(&ctx.network, &selector).await; // We only resolve a subgraph when a subgraph ID is given as a URL param. let subgraph = resolved_deployments @@ -126,7 +132,7 @@ pub async fn handle_query( let result = match resolved_deployments { Ok((deployments, _)) => { handle_client_query_inner(&ctx, deployments, payload, auth) - .instrument(span.clone()) + .in_current_span() .await } Err(subgraph_resolution_err) => Err(subgraph_resolution_err), @@ -173,36 +179,27 @@ pub async fn handle_query( async fn resolve_subgraph_deployments( network: &GraphNetwork, - params: &BTreeMap, + selector: &QuerySelector, ) -> Result<(Vec>, Option), Error> { - if let Some(id) = params.get("subgraph_id") { - // Parse the subgraph ID - let id: SubgraphId = id - .parse() - .map_err(|_| Error::SubgraphNotFound(anyhow!("invalid subgraph ID: {id}")))?; - - // Get the subgraph by ID - let subgraph = network - .subgraph_by_id(&id) - .ok_or_else(|| Error::SubgraphNotFound(anyhow!("{id}")))?; - - // Get the subgraph's deployments (versions = deployments) - let versions = subgraph.deployments.clone(); - Ok((versions, Some(subgraph))) - } else if let Some(id) = params.get("deployment_id") { - // Parse the deployment ID - let id: DeploymentId = id - .parse() - .map_err(|_| Error::SubgraphNotFound(anyhow!("invalid deployment ID: {id}")))?; - - // Get the deployment by ID, no subgraph - let deployment = network - .deployment_by_id(&id) - .ok_or_else(|| Error::SubgraphNotFound(anyhow!("deployment not found: {id}")))?; - - Ok((vec![deployment], None)) - } else { - Err(Error::SubgraphNotFound(anyhow!("missing identifier"))) + match selector { + QuerySelector::Subgraph(subgraph_id) => { + // Get the subgraph by ID + let subgraph = network + .subgraph_by_id(subgraph_id) + .ok_or_else(|| Error::SubgraphNotFound(anyhow!("{subgraph_id}")))?; + + // Get the subgraph's deployments (versions = deployments) + let versions = subgraph.deployments.clone(); + Ok((versions, Some(subgraph))) + } + QuerySelector::Deployment(deployment_id) => { + // Get the deployment by ID, no subgraph + let deployment = network.deployment_by_id(deployment_id).ok_or_else(|| { + Error::SubgraphNotFound(anyhow!("deployment not found: {deployment_id}")) + })?; + + Ok((vec![deployment], None)) + } } } diff --git a/graph-gateway/src/client_query/auth.rs b/graph-gateway/src/client_query/auth.rs index 0f9fa1c6..b9ea5682 100644 --- a/graph-gateway/src/client_query/auth.rs +++ b/graph-gateway/src/client_query/auth.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use thegraph::subscriptions::auth::AuthTokenClaims; +use thegraph::types::{DeploymentId, SubgraphId}; use crate::subgraph_studio::APIKey; @@ -20,6 +21,28 @@ pub enum AuthToken { } impl AuthToken { + /// Check if the given subgraph is authorized for this auth token. + pub fn is_subgraph_authorized(&self, subgraph: &SubgraphId) -> bool { + match self { + AuthToken::StudioApiKey(api_key) => studio::is_subgraph_authorized(api_key, subgraph), + AuthToken::SubscriptionsAuthToken(claims) => { + subscriptions::is_subgraph_authorized(claims, subgraph) + } + } + } + + /// Check if the given deployment is authorized for this auth token. + pub fn is_deployment_authorized(&self, deployment: &DeploymentId) -> bool { + match self { + AuthToken::StudioApiKey(api_key) => { + studio::is_deployment_authorized(api_key, deployment) + } + AuthToken::SubscriptionsAuthToken(claims) => { + subscriptions::is_deployment_authorized(claims, deployment) + } + } + } + /// Check if the given origin domain is authorized for this auth token. pub fn is_domain_authorized(&self, domain: &str) -> bool { match self { diff --git a/graph-gateway/src/client_query/auth/common.rs b/graph-gateway/src/client_query/auth/common.rs index 37c3173f..466b6f0b 100644 --- a/graph-gateway/src/client_query/auth/common.rs +++ b/graph-gateway/src/client_query/auth/common.rs @@ -17,6 +17,11 @@ pub fn are_deployments_authorized( .any(|deployment| authorized.contains(&deployment.id)) } +/// Check if the given deployment is authorized by the given authorized deployments. +pub fn is_deployment_authorized(authorized: &[DeploymentId], deployment: &DeploymentId) -> bool { + authorized.is_empty() || authorized.contains(deployment) +} + /// Check if any of the given deployments are authorized by the given authorized subgraphs. /// /// If the authorized subgraphs set is empty, all deployments are considered authorized. @@ -33,6 +38,11 @@ pub fn are_subgraphs_authorized( }) } +/// Check if the given subgraph is authorized by the given authorized subgraphs. +pub fn is_subgraph_authorized(authorized: &[SubgraphId], subgraph: &SubgraphId) -> bool { + authorized.is_empty() || authorized.contains(subgraph) +} + /// Check if the query origin domain is authorized. /// /// If the authorized domain starts with a `*`, it is considered a wildcard diff --git a/graph-gateway/src/client_query/auth/studio.rs b/graph-gateway/src/client_query/auth/studio.rs index 559d5d53..f6d4a25d 100644 --- a/graph-gateway/src/client_query/auth/studio.rs +++ b/graph-gateway/src/client_query/auth/studio.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::bail; use eventuals::{Eventual, Ptr}; +use thegraph::types::{DeploymentId, SubgraphId}; use crate::subgraph_studio::{APIKey, QueryStatus}; use crate::topology::Deployment; @@ -87,6 +88,18 @@ pub fn parse_bearer_token(auth: &AuthContext, token: &str) -> anyhow::Result, deployment: &DeploymentId) -> bool { + let allowed_deployments = &api_key.deployments; + common::is_deployment_authorized(allowed_deployments, deployment) +} + +/// Check if the given subgraph is authorized by the given API key. +pub fn is_subgraph_authorized(api_key: &Arc, subgraph: &SubgraphId) -> bool { + let allowed_subgraphs = &api_key.subgraphs; + common::is_subgraph_authorized(allowed_subgraphs, subgraph) +} + /// Check if the given domain is authorized by the given API key. pub fn is_domain_authorized(api_key: &Arc, domain: &str) -> bool { let allowed_domains = &api_key diff --git a/graph-gateway/src/client_query/auth/subscriptions.rs b/graph-gateway/src/client_query/auth/subscriptions.rs index 12658bc7..f76e26e1 100644 --- a/graph-gateway/src/client_query/auth/subscriptions.rs +++ b/graph-gateway/src/client_query/auth/subscriptions.rs @@ -5,6 +5,7 @@ use std::sync::{atomic, Arc}; use alloy_primitives::Address; use eventuals::{Eventual, Ptr}; use thegraph::subscriptions::auth::{parse_auth_token, verify_auth_token_claims, AuthTokenClaims}; +use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::RwLock; use crate::subscriptions::Subscription; @@ -66,6 +67,18 @@ pub fn parse_bearer_token(_auth: &AuthContext, token: &str) -> anyhow::Result bool { + let allowed_deployments = &auth_token.allowed_deployments; + common::is_deployment_authorized(allowed_deployments, deployment) +} + +/// Check if the given subgraph is authorized by the given API key. +pub fn is_subgraph_authorized(auth_token: &AuthTokenClaims, subgraph: &SubgraphId) -> bool { + let allowed_subgraphs = &auth_token.allowed_subgraphs; + common::is_subgraph_authorized(allowed_subgraphs, subgraph) +} + /// Check if the given domain is authorized by the auth token claims. pub fn is_domain_authorized(auth_token: &AuthTokenClaims, domain: &str) -> bool { // Get domain allowlist diff --git a/graph-gateway/src/client_query/query_selector.rs b/graph-gateway/src/client_query/query_selector.rs new file mode 100644 index 00000000..02e0e104 --- /dev/null +++ b/graph-gateway/src/client_query/query_selector.rs @@ -0,0 +1,236 @@ +use std::collections::HashMap; + +use anyhow::anyhow; +use axum::async_trait; +use axum::extract::{FromRequestParts, Path}; +use axum::http::request::Parts; +use axum::response::IntoResponse; +use thegraph::types::{DeploymentId, SubgraphId}; + +use gateway_framework::errors::Error; + +use super::graphql; + +/// Rejection type for the query selector extractor, [`QuerySelector`]. +/// +/// This is a thin wrapper around [`Error`] and implements [`IntoResponse`] to return a GraphQL +/// error response. +#[derive(Debug)] +pub struct QuerySelectorRejection(Error); + +impl From for QuerySelectorRejection { + fn from(value: Error) -> Self { + Self(value) + } +} + +impl IntoResponse for QuerySelectorRejection { + fn into_response(self) -> axum::response::Response { + graphql::error_response(self.0).into_response() + } +} + +/// Extractor for the GraphQL query selector, i.e. a `DeploymentId` or `SubgraphId`. +/// +/// If the path parameter parsing fails, a GraphQL error response is returned indicating that +/// the provided ID is invalid. +#[derive(Debug, Clone)] +pub enum QuerySelector { + /// The query selector is a [`DeploymentId`]. + Deployment(DeploymentId), + /// The query selector is a [`SubgraphId`]. + Subgraph(SubgraphId), +} + +impl std::fmt::Display for QuerySelector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QuerySelector::Deployment(id) => write!(f, "{}", id), + QuerySelector::Subgraph(id) => write!(f, "{}", id), + } + } +} + +#[async_trait] +impl FromRequestParts for QuerySelector +where + S: Send + Sync, +{ + type Rejection = QuerySelectorRejection; + + async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { + // Get the path parameters + let Path(params) = Path::>::from_request_parts(parts, state) + .await + .map_err(|_| Error::SubgraphNotFound(anyhow!("invalid id")))?; + + // Get the query selector from the path parameters and parse it + let selector = if let Some(param) = params.get("subgraph_id") { + // Parse the Subgraph ID + let subgraph_id = param + .parse() + .map_err(|_| Error::SubgraphNotFound(anyhow!("invalid subgraph ID: {param}")))?; + Self::Subgraph(subgraph_id) + } else if let Some(param) = params.get("deployment_id") { + // Parse the Deployment ID + let deployment_id = param + .parse() + .map_err(|_| Error::SubgraphNotFound(anyhow!("invalid deployment ID: {param}")))?; + Self::Deployment(deployment_id) + } else { + return Err(Error::SubgraphNotFound(anyhow!("missing identifier")).into()); + }; + + // Set the span selector attribute + tracing::span::Span::current().record("selector", tracing::field::display(&selector)); + + Ok(selector) + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use axum::body::{Body, BoxBody}; + use axum::http::{Method, Request}; + use axum::Router; + use thegraph::types::{DeploymentId, SubgraphId}; + use tower::ServiceExt; + + use super::QuerySelector; + + /// Create a test router. + fn test_router() -> Router { + async fn handle_query(selector: QuerySelector) -> String { + format!("{}", selector) + } + + Router::new() + .route( + "/deployments/id/:deployment_id", + axum::routing::post(handle_query), + ) + .route( + "/subgraphs/id/:subgraph_id", + axum::routing::post(handle_query), + ) + } + + /// Test utility function to create a valid `DeploymentId` with an arbitrary deployment id/ipfs hash. + fn test_deployment_id(deployment: &str) -> DeploymentId { + deployment.parse().expect("invalid deployment id/ipfs hash") + } + + /// Test utility function to create a valid `SubgraphId` with an arbitrary address. + fn test_subgraph_id(address: &str) -> SubgraphId { + address.parse().expect("invalid subgraph id") + } + + /// Deserialize a GraphQL response body. + async fn deserialize_graphql_response_body( + body: &mut BoxBody, + ) -> serde_json::Result> + where + for<'de> T: serde::Deserialize<'de>, + { + let body = hyper::body::to_bytes(body).await.expect("valid body"); + serde_json::from_slice(body.as_ref()) + } + + /// Parse text response body. + async fn parse_text_response_body(body: &mut BoxBody) -> anyhow::Result { + let body = hyper::body::to_bytes(body).await.expect("valid body"); + let text = String::from_utf8(body.to_vec())?; + Ok(text) + } + + #[tokio::test] + async fn valid_deployment_id() { + //* Given + let app = test_router(); + + let deployment_id = test_deployment_id("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH"); + + let req = Request::builder() + .method(Method::POST) + .uri(format!("/deployments/id/{deployment_id}")) + .body(Body::empty()) + .unwrap(); + + //* When + let mut res = app.oneshot(req).await.expect("valid request"); + + //* Then + assert_matches!(parse_text_response_body(res.body_mut()).await, Ok(res_body) => { + assert_eq!(res_body, deployment_id.to_string()); + }); + } + + #[tokio::test] + async fn invalid_deployment_id() { + //* Given + let app = test_router(); + + let deployment_id = "test-invalid-deployment-id"; + + let req = Request::builder() + .method(Method::POST) + .uri(format!("/deployments/id/{deployment_id}")) + .body(Body::empty()) + .unwrap(); + + //* When + let mut res = app.oneshot(req).await.expect("valid request"); + + //* Then + assert_matches!(deserialize_graphql_response_body::<()>(res.body_mut()).await, Ok(res_body) => { + assert_eq!(res_body.errors.len(), 1); + assert_eq!(res_body.errors[0].message, r#"subgraph not found: invalid deployment ID: test-invalid-deployment-id"#); + }); + } + + #[tokio::test] + async fn valid_subgraph_id() { + //* Given + let app = test_router(); + + let subgraph_id = test_subgraph_id("184ba627DB853244c9f17f3Cb4378cB8B39bf147"); + + let req = Request::builder() + .method(Method::POST) + .uri(format!("/subgraphs/id/{subgraph_id}")) + .body(Body::empty()) + .unwrap(); + + //* When + let mut res = app.oneshot(req).await.expect("valid request"); + + //* Then + assert_matches!(parse_text_response_body(res.body_mut()).await, Ok(res_body) => { + assert_eq!(res_body, subgraph_id.to_string()); + }); + } + + #[tokio::test] + async fn invalid_subgraph_id() { + //* Given + let app = test_router(); + + let subgraph_id = "test-invalid-subgraph-id"; + + let req = Request::builder() + .method(Method::POST) + .uri(format!("/subgraphs/id/{subgraph_id}")) + .body(Body::empty()) + .unwrap(); + + //* When + let mut res = app.oneshot(req).await.expect("valid request"); + + //* Then + assert_matches!(deserialize_graphql_response_body::<()>(res.body_mut()).await, Ok(res_body) => { + assert_eq!(res_body.errors.len(), 1); + assert_eq!(res_body.errors[0].message, "subgraph not found: invalid subgraph ID: test-invalid-subgraph-id"); + }); + } +}