Skip to content

Commit

Permalink
refactor(graph-gateway): add query selector extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
LNSD committed Jan 25, 2024
1 parent c804b71 commit 5726311
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 85 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions gateway-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ axum.workspace = true
ethers = "2.0.11"
eventuals = "0.6.7"
gateway-common = { path = "../gateway-common" }
graphql-http = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-http-v0.2.1", version = "0.2.1" }
headers = "0.3.9"
hex.workspace = true
indexer-selection = { path = "../indexer-selection" }
itertools = "0.12.0"
Expand All @@ -34,3 +36,6 @@ tokio.workspace = true
toolshed.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true

[dev-dependencies]
assert_matches = "1.5.0"
12 changes: 11 additions & 1 deletion gateway-framework/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::BTreeMap;

use indexer_selection::UnresolvedBlock;
use axum::response::{IntoResponse, Response};
use itertools::Itertools;

use indexer_selection::UnresolvedBlock;

use crate::graphql;

#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Errors that should only occur in exceptional conditions.
Expand Down Expand Up @@ -32,6 +36,12 @@ pub enum Error {
BadIndexers(IndexerErrors),
}

impl IntoResponse for Error {
fn into_response(self) -> Response {
graphql::error_response(self).into_response()
}
}

pub struct IndexerErrors(Vec<IndexerError>);

#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions gateway-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod chains;
pub mod config;
pub mod errors;
pub mod geoip;
pub mod graphql;
pub mod ipfs;
pub mod json;
pub mod metrics;
Expand Down
176 changes: 97 additions & 79 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::anyhow;
use axum::extract::OriginalUri;
use axum::{
body::Bytes,
extract::{Path, State},
extract::State,
http::{HeaderMap, Response, StatusCode},
Extension,
};
Expand All @@ -34,6 +34,7 @@ use gateway_framework::{
block_constraints::BlockConstraint,
chains::BlockCache,
errors::{Error, IndexerError, IndexerErrors, UnavailableReason::*},
graphql,
metrics::{with_metric, METRICS},
scalar::ScalarReceipt,
};
Expand All @@ -46,21 +47,22 @@ use indexer_selection::{
use crate::block_constraints::{block_constraints, make_query_deterministic};
use crate::indexer_client::{check_block_error, IndexerClient, ResponsePayload};
use crate::reports::{self, serialize_attestation, KafkaClient};
use crate::topology::{Deployment, GraphNetwork, Subgraph};
use crate::topology::Deployment;
use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable};

use self::attestation_header::GraphAttestation;
use self::auth::AuthToken;
use self::context::Context;
use self::l2_forwarding::forward_request_to_l2;
use self::query_selector::QuerySelector;

mod attestation_header;
pub mod auth;
pub mod context;
mod graphql;
mod l2_forwarding;
pub mod legacy_auth_adapter;
pub mod query_id;
mod query_selector;
pub mod query_tracing;
pub mod require_auth;

Expand All @@ -70,67 +72,43 @@ pub struct QueryBody {
pub variables: Option<Box<RawValue>>,
}

pub async fn handle_query(
pub async fn handle_subgraph_id_query(
State(ctx): State<Context>,
Extension(auth): Extension<AuthToken>,
OriginalUri(original_uri): OriginalUri,
Path(params): Path<BTreeMap<String, String>>,
QuerySelector(subgraph_id): QuerySelector<SubgraphId>,
headers: HeaderMap,
payload: Bytes,
) -> Response<String> {
let span = tracing::span::Span::current();

) -> Result<Response<String>, Error> {
let start_time = Instant::now();
let timestamp = unix_timestamp();

let resolved_deployments = resolve_subgraph_deployments(&ctx.network, &params).await;

// This is very useful for investigating gateway logs in production
let selector = match &resolved_deployments {
Ok((_, Some(subgraph))) => subgraph.id.to_string(),
Ok((deployments, None)) => deployments
.iter()
.map(|d| d.id.to_string())
.collect::<Vec<_>>()
.join(","),
Err(_) => "".to_string(),
};
span.record("selector", tracing::field::display(selector));

// We only resolve a subgraph when a subgraph ID is given as a URL param.
let subgraph = resolved_deployments
.as_ref()
.ok()
.and_then(|(_, s)| s.as_ref());
// Get the subgraph by ID
let subgraph = ctx
.network
.subgraph_by_id(&subgraph_id)
.ok_or_else(|| Error::SubgraphNotFound(anyhow!("{subgraph_id}")))?;

if let Some(l2_url) = ctx.l2_gateway.as_ref() {
// Forward query to L2 gateway if it's marked as transferred & there are no allocations.
// abf62a6d-c071-4507-b528-ddc8e250127a
let transferred_to_l2 = matches!(
resolved_deployments.as_ref(),
Ok((deployments, _)) if deployments.iter().all(|d| d.transferred_to_l2),
);
let transferred_to_l2 = subgraph.deployments.iter().all(|d| d.transferred_to_l2);
if transferred_to_l2 {
return forward_request_to_l2(
return Ok(forward_request_to_l2(
&ctx.indexer_client.client,
l2_url,
&original_uri,
headers,
payload,
subgraph.and_then(|s| s.l2_id),
subgraph.l2_id,
)
.await;
.await);
}
}

let result = match resolved_deployments {
Ok((deployments, _)) => {
handle_client_query_inner(&ctx, deployments, payload, auth)
.instrument(span.clone())
.await
}
Err(subgraph_resolution_err) => Err(subgraph_resolution_err),
};
let result = handle_client_query_inner(&ctx, subgraph.deployments, payload, auth)
.in_current_span()
.await;

// Metrics and tracing
{
Expand Down Expand Up @@ -160,50 +138,90 @@ pub async fn handle_query(
);
}

match result {
Ok((_, ResponsePayload { body, attestation })) => Response::builder()
result.map(|(_, ResponsePayload { body, attestation })| {
Response::builder()
.status(StatusCode::OK)
.header_typed(ContentType::json())
.header_typed(GraphAttestation(attestation))
.body(body.to_string())
.unwrap(),
Err(err) => graphql::error_response(err),
}
.unwrap()
})
}

async fn resolve_subgraph_deployments(
network: &GraphNetwork,
params: &BTreeMap<String, String>,
) -> Result<(Vec<Arc<Deployment>>, Option<Subgraph>), Error> {
if let Some(id) = params.get("subgraph_id") {
// Parse the subgraph ID
let id: SubgraphId = id
.parse()
.map_err(|_| Error::SubgraphNotFound(anyhow!("invalid subgraph ID: {id}")))?;

// Get the subgraph by ID
let subgraph = network
.subgraph_by_id(&id)
.ok_or_else(|| Error::SubgraphNotFound(anyhow!("{id}")))?;

// Get the subgraph's deployments (versions = deployments)
let versions = subgraph.deployments.clone();
Ok((versions, Some(subgraph)))
} else if let Some(id) = params.get("deployment_id") {
// Parse the deployment ID
let id: DeploymentId = id
.parse()
.map_err(|_| Error::SubgraphNotFound(anyhow!("invalid deployment ID: {id}")))?;

// Get the deployment by ID, no subgraph
let deployment = network
.deployment_by_id(&id)
.ok_or_else(|| Error::SubgraphNotFound(anyhow!("deployment not found: {id}")))?;

Ok((vec![deployment], None))
} else {
Err(Error::SubgraphNotFound(anyhow!("missing identifier")))
pub async fn handle_deployment_id_query(
State(ctx): State<Context>,
Extension(auth): Extension<AuthToken>,
OriginalUri(original_uri): OriginalUri,
QuerySelector(deployment_id): QuerySelector<DeploymentId>,
headers: HeaderMap,
payload: Bytes,
) -> Result<Response<String>, Error> {
let start_time = Instant::now();
let timestamp = unix_timestamp();

// Get the subgraph deployment by ID
let deployment = ctx
.network
.deployment_by_id(&deployment_id)
.ok_or_else(|| Error::SubgraphNotFound(anyhow!("deployment not found")))?;

if let Some(l2_url) = ctx.l2_gateway.as_ref() {
// Forward query to L2 gateway if it's marked as transferred & there are no allocations.
// abf62a6d-c071-4507-b528-ddc8e250127a
let transferred_to_l2 = deployment.transferred_to_l2;
if transferred_to_l2 {
return Ok(forward_request_to_l2(
&ctx.indexer_client.client,
l2_url,
&original_uri,
headers,
payload,
None,
)
.await);
}
}

let result = handle_client_query_inner(&ctx, vec![deployment], payload, auth)
.in_current_span()
.await;

// Metrics and tracing
{
let deployment: Option<String> = result
.as_ref()
.map(|(selection, _)| selection.indexing.deployment.to_string())
.ok();
let metric_labels = [deployment.as_deref().unwrap_or("")];

METRICS.client_query.check(&metric_labels, &result);
with_metric(&METRICS.client_query.duration, &metric_labels, |h| {
h.observe((Instant::now() - start_time).as_secs_f64())
});

let status_message = match &result {
Ok(_) => "200 OK".to_string(),
Err(err) => err.to_string(),
};
let (legacy_status_message, legacy_status_code) = reports::legacy_status(&result);
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
start_time_ms = timestamp,
deployment,
%status_message,
%legacy_status_message,
legacy_status_code,
);
}

result.map(|(_, ResponsePayload { body, attestation })| {
Response::builder()
.status(StatusCode::OK)
.header_typed(ContentType::json())
.header_typed(GraphAttestation(attestation))
.body(body.to_string())
.unwrap()
})
}

async fn handle_client_query_inner(
Expand Down
Loading

0 comments on commit 5726311

Please sign in to comment.