From 032c448ef12c37832b481625e8597fb20bbef799 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=8D=E9=87=91=E6=97=A5?= Date: Fri, 16 Jun 2023 18:11:16 +0800 Subject: [PATCH] refactor: avoid grpc forwarding twice (#991) ## Rationale Close #984 ## Detailed Changes Add a parameter to the headers of grpc to mark that it has been forwarded. ## Test Plan Existing tests --- proxy/src/forward.rs | 29 ++++++++++++++-- proxy/src/grpc/sql_query.rs | 8 ++++- proxy/src/http/prom.rs | 1 + proxy/src/http/sql.rs | 1 + proxy/src/influxdb/mod.rs | 1 + proxy/src/lib.rs | 4 +++ proxy/src/read.rs | 7 +++- proxy/src/write.rs | 16 ++++++--- server/src/grpc/storage_service/mod.rs | 48 ++++++++++++++++++++------ 9 files changed, 96 insertions(+), 19 deletions(-) diff --git a/proxy/src/forward.rs b/proxy/src/forward.rs index 9603dceee9..e1765dbfda 100644 --- a/proxy/src/forward.rs +++ b/proxy/src/forward.rs @@ -21,6 +21,8 @@ use tonic::{ transport::{self, Channel}, }; +use crate::FORWARDED_FROM; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( @@ -68,6 +70,9 @@ pub enum Error { source: tonic::transport::Error, backtrace: Backtrace, }, + + #[snafu(display("Request should not be forwarded twice, forward from:{}", endpoint))] + ForwardedErr { endpoint: String }, } define_result!(Error); @@ -184,6 +189,7 @@ pub struct ForwardRequest { pub schema: String, pub table: String, pub req: tonic::Request, + pub forwarded_from: Option, } impl Forwarder { @@ -256,7 +262,12 @@ impl Forwarder { F: ForwarderRpc, Req: std::fmt::Debug + Clone, { - let ForwardRequest { schema, table, req } = forward_req; + let ForwardRequest { + schema, + table, + req, + forwarded_from, + } = forward_req; let route_req = RouteRequest { context: Some(RequestContext { database: schema }), @@ -281,13 +292,15 @@ impl Forwarder { } }; - self.forward_with_endpoint(endpoint, req, do_rpc).await + self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc) + .await } pub async fn forward_with_endpoint( &self, endpoint: Endpoint, mut req: tonic::Request, + forwarded_from: Option, do_rpc: F, ) -> Result> where @@ -310,6 +323,17 @@ impl Forwarder { "Try to forward request to {:?}, request:{:?}", endpoint, req, ); + + if let Some(endpoint) = forwarded_from { + return ForwardedErr { endpoint }.fail(); + } + + // mark forwarded + req.metadata_mut().insert( + FORWARDED_FROM, + self.local_endpoint.to_string().parse().unwrap(), + ); + let client = self.get_or_create_client(&endpoint).await?; match do_rpc(client, req, &endpoint).await { Err(e) => { @@ -461,6 +485,7 @@ mod tests { schema: DEFAULT_SCHEMA.to_string(), table: table.to_string(), req: query_request.into_request(), + forwarded_from: None, } }; diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index a3b5613f02..6c756be12c 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -113,7 +113,11 @@ impl Proxy { let req_context = req.context.as_ref().unwrap(); let schema = req_context.database.clone(); - let req = match self.clone().maybe_forward_stream_sql_query(&req).await { + let req = match self + .clone() + .maybe_forward_stream_sql_query(ctx.clone(), &req) + .await + { Some(resp) => match resp { ForwardResult::Forwarded(resp) => return resp, ForwardResult::Local => req, @@ -167,6 +171,7 @@ impl Proxy { async fn maybe_forward_stream_sql_query( self: Arc, + ctx: Context, req: &SqlQueryRequest, ) -> Option, Error>> { if req.tables.len() != 1 { @@ -180,6 +185,7 @@ impl Proxy { schema: req_ctx.database.clone(), table: req.tables[0].clone(), req: req.clone().into_request(), + forwarded_from: ctx.forwarded_from, }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index d414f7e0db..dab16707ab 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -62,6 +62,7 @@ impl Proxy { runtime: self.engine_runtimes.write_runtime.clone(), timeout: ctx.timeout, enable_partition_table_access: false, + forwarded_from: None, }; let result = self.handle_write_internal(ctx, table_request).await?; diff --git a/proxy/src/http/sql.rs b/proxy/src/http/sql.rs index 127732c24e..67ec394042 100644 --- a/proxy/src/http/sql.rs +++ b/proxy/src/http/sql.rs @@ -37,6 +37,7 @@ impl Proxy { timeout: ctx.timeout, runtime: self.engine_runtimes.read_runtime.clone(), enable_partition_table_access: true, + forwarded_from: None, }; match self.handle_sql(context, &ctx.schema, &req.query).await? { diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index e028c59ff3..c4a2fee151 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -58,6 +58,7 @@ impl Proxy { timeout: ctx.timeout, runtime: self.engine_runtimes.write_runtime.clone(), enable_partition_table_access: false, + forwarded_from: None, }; let result = self .handle_write_internal(proxy_context, table_request) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8ab85fd147..c54a5acfc6 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -22,6 +22,8 @@ pub mod schema_config_provider; mod util; mod write; +pub const FORWARDED_FROM: &str = "forwarded-from"; + use std::{ sync::Arc, time::{Duration, Instant}, @@ -131,6 +133,7 @@ impl Proxy { schema: req_ctx.database.clone(), table: metric, req: req.into_request(), + forwarded_from: None, }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, @@ -452,4 +455,5 @@ pub struct Context { pub timeout: Option, pub runtime: Arc, pub enable_partition_table_access: bool, + pub forwarded_from: Option, } diff --git a/proxy/src/read.rs b/proxy/src/read.rs index 9131cf54d0..a47b9454be 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -41,7 +41,10 @@ impl Proxy { schema: &str, sql: &str, ) -> Result { - if let Some(resp) = self.maybe_forward_sql_query(schema, sql).await? { + if let Some(resp) = self + .maybe_forward_sql_query(ctx.clone(), schema, sql) + .await? + { match resp { ForwardResult::Forwarded(resp) => return Ok(SqlResponse::Forwarded(resp?)), ForwardResult::Local => (), @@ -149,6 +152,7 @@ impl Proxy { async fn maybe_forward_sql_query( &self, + ctx: Context, schema: &str, sql: &str, ) -> Result>> { @@ -174,6 +178,7 @@ impl Proxy { schema: schema.to_string(), table: table_name.unwrap(), req: sql_request.into_request(), + forwarded_from: ctx.forwarded_from, }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/proxy/src/write.rs b/proxy/src/write.rs index a371e3cd01..44b2ab4491 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -108,7 +108,7 @@ impl Proxy { let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1); // Write to remote. - self.collect_write_to_remote_future(&mut futures, write_requests_to_forward) + self.collect_write_to_remote_future(&mut futures, ctx.clone(), write_requests_to_forward) .await; // Write to local. @@ -139,7 +139,7 @@ impl Proxy { let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1); // Write to remote. - self.collect_write_to_remote_future(&mut futures, write_requests_to_forward) + self.collect_write_to_remote_future(&mut futures, ctx.clone(), write_requests_to_forward) .await; // Create table. @@ -358,12 +358,14 @@ impl Proxy { async fn collect_write_to_remote_future( &self, futures: &mut WriteResponseFutures<'_>, + ctx: Context, write_request: HashMap, ) { for (endpoint, table_write_request) in write_request { let forwarder = self.forwarder.clone(); + let ctx = ctx.clone(); let write_handle = self.engine_runtimes.io_runtime.spawn(async move { - Self::write_to_remote(forwarder, endpoint, table_write_request).await + Self::write_to_remote(ctx, forwarder, endpoint, table_write_request).await }); futures.push(write_handle.boxed()); @@ -408,6 +410,7 @@ impl Proxy { } async fn write_to_remote( + ctx: Context, forwarder: ForwarderRef, endpoint: Endpoint, table_write_request: WriteRequest, @@ -432,7 +435,12 @@ impl Proxy { }; let forward_result = forwarder - .forward_with_endpoint(endpoint, tonic::Request::new(table_write_request), do_write) + .forward_with_endpoint( + endpoint, + tonic::Request::new(table_write_request), + ctx.forwarded_from, + do_write, + ) .await; let forward_res = forward_result .map_err(|e| { diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 7487b4fda8..ca758146ca 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -21,7 +21,7 @@ use ceresdbproto::{ use common_util::time::InstantExt; use futures::{stream, stream::BoxStream, StreamExt}; use http::StatusCode; -use proxy::{Context, Proxy}; +use proxy::{Context, Proxy, FORWARDED_FROM}; use query_engine::executor::Executor as QueryExecutor; use table_engine::engine::EngineRuntimes; @@ -138,6 +138,10 @@ impl StorageService for StorageServiceImpl { runtime: self.runtimes.read_runtime.clone(), timeout: self.timeout, enable_partition_table_access: false, + forwarded_from: req + .metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()), }; let stream = Self::stream_sql_query_internal(ctx, proxy, req).await; @@ -155,13 +159,17 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let req = req.into_inner(); - let proxy = self.proxy.clone(); let ctx = Context { runtime: self.runtimes.read_runtime.clone(), timeout: self.timeout, enable_partition_table_access: false, + forwarded_from: req + .metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()), }; + let req = req.into_inner(); + let proxy = self.proxy.clone(); let join_handle = self .runtimes @@ -186,13 +194,17 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let req = req.into_inner(); - let proxy = self.proxy.clone(); let ctx = Context { runtime: self.runtimes.write_runtime.clone(), timeout: self.timeout, enable_partition_table_access: false, + forwarded_from: req + .metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()), }; + let req = req.into_inner(); + let proxy = self.proxy.clone(); let join_handle = self.runtimes.write_runtime.spawn(async move { if req.context.is_none() { @@ -226,13 +238,18 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let req = req.into_inner(); - let proxy = self.proxy.clone(); let ctx = Context { runtime: self.runtimes.read_runtime.clone(), timeout: self.timeout, enable_partition_table_access: false, + forwarded_from: req + .metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()), }; + let req = req.into_inner(); + let proxy = self.proxy.clone(); + let join_handle = self .runtimes .read_runtime @@ -289,13 +306,18 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let req = req.into_inner(); - let proxy = self.proxy.clone(); let ctx = Context { runtime: self.runtimes.read_runtime.clone(), timeout: self.timeout, enable_partition_table_access: false, + forwarded_from: req + .metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()), }; + let req = req.into_inner(); + let proxy = self.proxy.clone(); + let join_handle = self.runtimes.read_runtime.spawn(async move { if req.context.is_none() { return PrometheusQueryResponse { @@ -329,13 +351,17 @@ impl StorageServiceImpl { ) -> Result, tonic::Status> { let mut total_success = 0; - let mut stream = req.into_inner(); - let proxy = self.proxy.clone(); let ctx = Context { runtime: self.runtimes.write_runtime.clone(), timeout: self.timeout, enable_partition_table_access: false, + forwarded_from: req + .metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()), }; + let mut stream = req.into_inner(); + let proxy = self.proxy.clone(); let join_handle = self.runtimes.write_runtime.spawn(async move { let mut resp = WriteResponse::default();