From 0b88d712ef17a8a2f164cc6bbb8bbf0c2628f54e Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 29 Jul 2024 18:48:29 +0800 Subject: [PATCH 1/4] feat: hint options for gRPC isnert --- src/operator/src/insert.rs | 11 +++++- src/servers/src/grpc/database.rs | 44 ++++++++++++++++++++-- src/servers/src/grpc/flight.rs | 2 +- src/servers/src/grpc/greptime_handler.rs | 23 +++++++---- src/servers/src/grpc/prom_query_gateway.rs | 2 +- 5 files changed, 69 insertions(+), 13 deletions(-) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 38e79de6c993..397c615a7c2a 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -644,9 +644,18 @@ impl Inserter { statement_executor: &StatementExecutor, create_type: AutoCreateTableType, ) -> Result { + let mut hint_options = vec![]; let options: &[(&str, &str)] = match create_type { AutoCreateTableType::Logical(_) => unreachable!(), - AutoCreateTableType::Physical => &[], + AutoCreateTableType::Physical => { + if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) { + hint_options.push((APPEND_MODE_KEY, append_mode)); + } + if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) { + hint_options.push((MERGE_MODE_KEY, merge_mode)); + } + hint_options.as_slice() + } // Set append_mode to true for log table. // because log tables should keep rows with the same ts and tags. AutoCreateTableType::Log => &[(APPEND_MODE_KEY, "true")], diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index f8c9e298d4b5..c563326ee05b 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -18,13 +18,16 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader}; use async_trait::async_trait; use common_error::status_code::StatusCode; use common_query::OutputData; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use futures::StreamExt; +use tonic::metadata::{KeyAndValueRef, MetadataMap}; use tonic::{Request, Response, Status, Streaming}; use crate::grpc::greptime_handler::GreptimeRequestHandler; use crate::grpc::{cancellation, TonicResult}; +pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint:"; + pub(crate) struct DatabaseService { handler: GreptimeRequestHandler, } @@ -33,6 +36,31 @@ impl DatabaseService { pub(crate) fn new(handler: GreptimeRequestHandler) -> Self { Self { handler } } + + fn extract_hints(&self, metadata: &MetadataMap) -> Vec<(String, String)> { + metadata + .iter() + .filter_map(|kv| { + let KeyAndValueRef::Ascii(key, value) = kv else { + return None; + }; + let key = key.as_str(); + if !key.starts_with(GREPTIME_DB_HEADER_HINT_PREFIX) { + return None; + } + let Ok(value) = value.to_str() else { + // Simply return None for non-string values. + return None; + }; + // Safety: we already checked the prefix. + let new_key = key + .strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX) + .unwrap() + .to_string(); + Some((new_key, value.to_string())) + }) + .collect() + } } #[async_trait] @@ -42,10 +70,15 @@ impl GreptimeDatabase for DatabaseService { request: Request, ) -> TonicResult> { let remote_addr = request.remote_addr(); + let hints = self.extract_hints(request.metadata()); + debug!( + "GreptimeDatabase::Handle: request from {:?} with hints: {:?}", + remote_addr, hints + ); let handler = self.handler.clone(); let request_future = async move { let request = request.into_inner(); - let output = handler.handle_request(request).await?; + let output = handler.handle_request(request, hints).await?; let message = match output.data { OutputData::AffectedRows(rows) => GreptimeResponse { header: Some(ResponseHeader { @@ -83,6 +116,11 @@ impl GreptimeDatabase for DatabaseService { request: Request>, ) -> Result, Status> { let remote_addr = request.remote_addr(); + let hints = self.extract_hints(request.metadata()); + debug!( + "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}", + remote_addr, hints + ); let handler = self.handler.clone(); let request_future = async move { let mut affected_rows = 0; @@ -90,7 +128,7 @@ impl GreptimeDatabase for DatabaseService { let mut stream = request.into_inner(); while let Some(request) = stream.next().await { let request = request?; - let output = handler.handle_request(request).await?; + let output = handler.handle_request(request, hints.clone()).await?; match output.data { OutputData::AffectedRows(rows) => affected_rows += rows, OutputData::Stream(_) | OutputData::RecordBatches(_) => { diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index cd1d2a4bd019..76a6cc00cec1 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -167,7 +167,7 @@ impl FlightCraft for GreptimeRequestHandler { request_type = get_request_type(&request) ); async { - let output = self.handle_request(request).await?; + let output = self.handle_request(request, Default::default()).await?; let stream: Pin> + Send + Sync>> = to_flight_data_stream(output, TracingContext::from_current_span()); Ok(Response::new(stream)) diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 726e8dd3304c..550fc2683cd6 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -59,13 +59,17 @@ impl GreptimeRequestHandler { } #[tracing::instrument(skip_all, fields(protocol = "grpc", request_type = get_request_type(&request)))] - pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> Result { + pub(crate) async fn handle_request( + &self, + request: GreptimeRequest, + hints: Vec<(String, String)>, + ) -> Result { let query = request.request.context(InvalidQuerySnafu { reason: "Expecting non-empty GreptimeRequest.", })?; let header = request.header.as_ref(); - let query_ctx = create_query_context(header); + let query_ctx = create_query_context(header, hints); let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?; query_ctx.set_current_user(user_info); @@ -164,7 +168,10 @@ pub(crate) async fn auth( }) } -pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef { +pub(crate) fn create_query_context( + header: Option<&RequestHeader>, + extensions: Vec<(String, String)>, +) -> QueryContextRef { let (catalog, schema) = header .map(|header| { // We provide dbname field in newer versions of protos/sdks @@ -193,12 +200,14 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte ) }); let timezone = parse_timezone(header.map(|h| h.timezone.as_str())); - QueryContextBuilder::default() + let mut ctx_builder = QueryContextBuilder::default() .current_catalog(catalog) .current_schema(schema) - .timezone(timezone) - .build() - .into() + .timezone(timezone); + for (key, value) in extensions { + ctx_builder = ctx_builder.set_extension(key, value); + } + ctx_builder.build().into() } /// Histogram timer for handling gRPC request. diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index e2aeec897f7b..918631600a14 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -78,7 +78,7 @@ impl PrometheusGateway for PrometheusGatewayService { }; let header = inner.header.as_ref(); - let query_ctx = create_query_context(header); + let query_ctx = create_query_context(header, Default::default()); let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?; query_ctx.set_current_user(user_info); From 30fea1e26e58c016e3186d994584be3e6a36eb28 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 30 Jul 2024 09:44:02 +0800 Subject: [PATCH 2/4] chore: unit test for extract_hints --- src/servers/src/grpc/database.rs | 80 +++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index c563326ee05b..ae7660f3226b 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -36,31 +36,6 @@ impl DatabaseService { pub(crate) fn new(handler: GreptimeRequestHandler) -> Self { Self { handler } } - - fn extract_hints(&self, metadata: &MetadataMap) -> Vec<(String, String)> { - metadata - .iter() - .filter_map(|kv| { - let KeyAndValueRef::Ascii(key, value) = kv else { - return None; - }; - let key = key.as_str(); - if !key.starts_with(GREPTIME_DB_HEADER_HINT_PREFIX) { - return None; - } - let Ok(value) = value.to_str() else { - // Simply return None for non-string values. - return None; - }; - // Safety: we already checked the prefix. - let new_key = key - .strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX) - .unwrap() - .to_string(); - Some((new_key, value.to_string())) - }) - .collect() - } } #[async_trait] @@ -70,7 +45,7 @@ impl GreptimeDatabase for DatabaseService { request: Request, ) -> TonicResult> { let remote_addr = request.remote_addr(); - let hints = self.extract_hints(request.metadata()); + let hints = extract_hints(request.metadata()); debug!( "GreptimeDatabase::Handle: request from {:?} with hints: {:?}", remote_addr, hints @@ -116,7 +91,7 @@ impl GreptimeDatabase for DatabaseService { request: Request>, ) -> Result, Status> { let remote_addr = request.remote_addr(); - let hints = self.extract_hints(request.metadata()); + let hints = extract_hints(request.metadata()); debug!( "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}", remote_addr, hints @@ -167,3 +142,54 @@ impl GreptimeDatabase for DatabaseService { cancellation::with_cancellation_handler(request_future, cancellation_future).await } } + +fn extract_hints(metadata: &MetadataMap) -> Vec<(String, String)> { + metadata + .iter() + .filter_map(|kv| { + let KeyAndValueRef::Ascii(key, value) = kv else { + return None; + }; + let key = key.as_str(); + if !key.starts_with(GREPTIME_DB_HEADER_HINT_PREFIX) { + return None; + } + let Ok(value) = value.to_str() else { + // Simply return None for non-string values. + return None; + }; + // Safety: we already checked the prefix. + let new_key = key + .strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX) + .unwrap() + .to_string(); + Some((new_key, value.trim().to_string())) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use tonic::metadata::MetadataValue; + + use super::*; + + #[test] + fn test_extract_hints() { + let mut metadata = MetadataMap::new(); + metadata.insert("x-greptime-hint:append_mode", "true".parse().unwrap()); + let hints = extract_hints(&metadata); + assert_eq!(hints, vec![("append_mode".to_string(), "true".to_string())]); + } + + #[test] + fn extract_hints_ignores_non_ascii_metadata() { + let mut metadata = MetadataMap::new(); + metadata.insert_bin( + "x-greptime-hint:merge_mode", + MetadataValue::from_bytes(b"last_non_null"), + ); + let hints = extract_hints(&metadata); + assert!(hints.is_empty()); + } +} From 1e6dc035dbd936d5237fd8d934366bf8c5fdcfe2 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 30 Jul 2024 12:21:16 +0800 Subject: [PATCH 3/4] feat: add integration test for grpc hint --- src/client/src/database.rs | 35 ++++++++++++++++++- src/client/src/error.rs | 9 +++++ tests-integration/tests/grpc.rs | 61 +++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index e310a73e584d..528c57632e3b 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -33,9 +33,12 @@ use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; +use tonic::metadata::AsciiMetadataKey; use tonic::transport::Channel; -use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; +use crate::error::{ + ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, InvalidAsciiSnafu, ServerSnafu, +}; use crate::{from_grpc_response, Client, Result}; #[derive(Clone, Debug, Default)] @@ -130,6 +133,36 @@ impl Database { self.handle(Request::Inserts(requests)).await } + pub async fn insert_with_hints( + &self, + requests: InsertRequests, + hints: &[(&str, &str)], + ) -> Result { + let mut client = make_database_client(&self.client)?.inner; + let request = self.to_rpc_request(Request::Inserts(requests)); + + let mut request = tonic::Request::new(request); + let metadata = request.metadata_mut(); + for (key, value) in hints { + let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint:{}", key).as_bytes()) + .map_err(|_| { + InvalidAsciiSnafu { + value: key.to_string(), + } + .build() + })?; + let value = value.parse().map_err(|_| { + InvalidAsciiSnafu { + value: value.to_string(), + } + .build() + })?; + metadata.insert(key, value); + } + let response = client.handle(request).await?.into_inner(); + from_grpc_response(response) + } + async fn handle(&self, request: Request) -> Result { let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); diff --git a/src/client/src/error.rs b/src/client/src/error.rs index f200b1c93dd3..b5aef255d4f7 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -122,6 +122,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to parse ascii string: {}", value))] + InvalidAscii { + value: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -143,6 +150,8 @@ impl ErrorExt for Error { | Error::ConvertFlightData { source, .. } | Error::CreateTlsChannel { source, .. } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, + + Error::InvalidAscii { .. } => StatusCode::InvalidArguments, } } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 33332170db16..c3a979d7bfb3 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -73,6 +73,7 @@ macro_rules! grpc_tests { test_invalid_dbname, test_auto_create_table, + test_auto_create_table_with_hints, test_insert_and_select, test_dbname, test_grpc_message_size_ok, @@ -279,6 +280,17 @@ pub async fn test_auto_create_table(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_auto_create_table_with_hints(store_type: StorageType) { + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server(store_type, "auto_create_table_with_hints").await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); + insert_with_hints_and_assert(&db).await; + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + fn expect_data() -> (Column, Column, Column, Column) { // testing data: let expected_host_col = Column { @@ -379,6 +391,55 @@ pub async fn test_insert_and_select(store_type: StorageType) { guard.remove_all().await; } +async fn insert_with_hints_and_assert(db: &Database) { + // testing data: + let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); + + let request = InsertRequest { + table_name: "demo".to_string(), + columns: vec![ + expected_host_col.clone(), + expected_cpu_col.clone(), + expected_mem_col.clone(), + expected_ts_col.clone(), + ], + row_count: 4, + }; + let result = db + .insert_with_hints( + InsertRequests { + inserts: vec![request], + }, + &[("append_mode", "true"), ("merge_mode", "last_non_null")], + ) + .await; + assert_eq!(result.unwrap(), 4); + + // show table + let output = db.sql("SHOW CREATE TABLE demo").await.unwrap(); + + let record_batches = match output.data { + OutputData::RecordBatches(record_batches) => record_batches, + OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + OutputData::AffectedRows(_) => unreachable!(), + }; + + let pretty = record_batches.pretty_print().unwrap(); + let expected = "\ ++-------+------+--------+-------------------------+ +| host | cpu | memory | ts | ++-------+------+--------+-------------------------+ +| host1 | 0.31 | 0.1 | 1970-01-01T00:00:00.100 | +| host2 | | 0.2 | 1970-01-01T00:00:00.101 | +| host3 | 0.41 | | 1970-01-01T00:00:00.102 | +| host4 | 0.2 | 0.3 | 1970-01-01T00:00:00.103 | +| host5 | 66.6 | 1024.0 | 2022-12-28T04:17:07 | +| host6 | 88.8 | 333.3 | 2022-12-28T04:17:08 | ++-------+------+--------+-------------------------+\ +"; + assert_eq!(pretty, expected); +} + async fn insert_and_assert(db: &Database) { // testing data: let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); From 3fe051686a78a4e5b8722cf11692186250a6cec0 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 30 Jul 2024 20:46:50 +0800 Subject: [PATCH 4/4] test: add integration test for hints --- src/client/src/database.rs | 2 +- src/servers/src/grpc/database.rs | 10 +++++++--- tests-integration/tests/grpc.rs | 31 +++++++++++++++++++------------ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 528c57632e3b..80dc51df2ef6 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -144,7 +144,7 @@ impl Database { let mut request = tonic::Request::new(request); let metadata = request.metadata_mut(); for (key, value) in hints { - let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint:{}", key).as_bytes()) + let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes()) .map_err(|_| { InvalidAsciiSnafu { value: key.to_string(), diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index ae7660f3226b..4c3cf2c72b0d 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -26,7 +26,7 @@ use tonic::{Request, Response, Status, Streaming}; use crate::grpc::greptime_handler::GreptimeRequestHandler; use crate::grpc::{cancellation, TonicResult}; -pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint:"; +pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint-"; pub(crate) struct DatabaseService { handler: GreptimeRequestHandler, @@ -177,7 +177,11 @@ mod tests { #[test] fn test_extract_hints() { let mut metadata = MetadataMap::new(); - metadata.insert("x-greptime-hint:append_mode", "true".parse().unwrap()); + let prev = metadata.insert( + "x-greptime-hint-append_mode", + MetadataValue::from_static("true"), + ); + assert!(prev.is_none()); let hints = extract_hints(&metadata); assert_eq!(hints, vec![("append_mode".to_string(), "true".to_string())]); } @@ -186,7 +190,7 @@ mod tests { fn extract_hints_ignores_non_ascii_metadata() { let mut metadata = MetadataMap::new(); metadata.insert_bin( - "x-greptime-hint:merge_mode", + "x-greptime-hint-merge_mode-bin", MetadataValue::from_bytes(b"last_non_null"), ); let hints = extract_hints(&metadata); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index c3a979d7bfb3..4649bb103835 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -410,13 +410,13 @@ async fn insert_with_hints_and_assert(db: &Database) { InsertRequests { inserts: vec![request], }, - &[("append_mode", "true"), ("merge_mode", "last_non_null")], + &[("append_mode", "true")], ) .await; assert_eq!(result.unwrap(), 4); // show table - let output = db.sql("SHOW CREATE TABLE demo").await.unwrap(); + let output = db.sql("SHOW CREATE TABLE demo;").await.unwrap(); let record_batches = match output.data { OutputData::RecordBatches(record_batches) => record_batches, @@ -426,16 +426,23 @@ async fn insert_with_hints_and_assert(db: &Database) { let pretty = record_batches.pretty_print().unwrap(); let expected = "\ -+-------+------+--------+-------------------------+ -| host | cpu | memory | ts | -+-------+------+--------+-------------------------+ -| host1 | 0.31 | 0.1 | 1970-01-01T00:00:00.100 | -| host2 | | 0.2 | 1970-01-01T00:00:00.101 | -| host3 | 0.41 | | 1970-01-01T00:00:00.102 | -| host4 | 0.2 | 0.3 | 1970-01-01T00:00:00.103 | -| host5 | 66.6 | 1024.0 | 2022-12-28T04:17:07 | -| host6 | 88.8 | 333.3 | 2022-12-28T04:17:08 | -+-------+------+--------+-------------------------+\ ++-------+-------------------------------------+ +| Table | Create Table | ++-------+-------------------------------------+ +| demo | CREATE TABLE IF NOT EXISTS \"demo\" ( | +| | \"host\" STRING NULL, | +| | \"cpu\" DOUBLE NULL, | +| | \"memory\" DOUBLE NULL, | +| | \"ts\" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX (\"ts\"), | +| | PRIMARY KEY (\"host\") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | append_mode = 'true' | +| | ) | ++-------+-------------------------------------+\ "; assert_eq!(pretty, expected); }