diff --git a/src/client/src/database.rs b/src/client/src/database.rs index e310a73e584d..80dc51df2ef6 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -33,9 +33,12 @@ use common_telemetry::tracing_context::W3cTrace; use futures_util::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; +use tonic::metadata::AsciiMetadataKey; use tonic::transport::Channel; -use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; +use crate::error::{ + ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, InvalidAsciiSnafu, ServerSnafu, +}; use crate::{from_grpc_response, Client, Result}; #[derive(Clone, Debug, Default)] @@ -130,6 +133,36 @@ impl Database { self.handle(Request::Inserts(requests)).await } + pub async fn insert_with_hints( + &self, + requests: InsertRequests, + hints: &[(&str, &str)], + ) -> Result { + let mut client = make_database_client(&self.client)?.inner; + let request = self.to_rpc_request(Request::Inserts(requests)); + + let mut request = tonic::Request::new(request); + let metadata = request.metadata_mut(); + for (key, value) in hints { + let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes()) + .map_err(|_| { + InvalidAsciiSnafu { + value: key.to_string(), + } + .build() + })?; + let value = value.parse().map_err(|_| { + InvalidAsciiSnafu { + value: value.to_string(), + } + .build() + })?; + metadata.insert(key, value); + } + let response = client.handle(request).await?.into_inner(); + from_grpc_response(response) + } + async fn handle(&self, request: Request) -> Result { let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); diff --git a/src/client/src/error.rs b/src/client/src/error.rs index f200b1c93dd3..b5aef255d4f7 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -122,6 +122,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to parse ascii string: {}", value))] + InvalidAscii { + value: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -143,6 +150,8 @@ impl ErrorExt for Error { | Error::ConvertFlightData { source, .. } | Error::CreateTlsChannel { source, .. } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, + + Error::InvalidAscii { .. } => StatusCode::InvalidArguments, } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 38e79de6c993..397c615a7c2a 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -644,9 +644,18 @@ impl Inserter { statement_executor: &StatementExecutor, create_type: AutoCreateTableType, ) -> Result { + let mut hint_options = vec![]; let options: &[(&str, &str)] = match create_type { AutoCreateTableType::Logical(_) => unreachable!(), - AutoCreateTableType::Physical => &[], + AutoCreateTableType::Physical => { + if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) { + hint_options.push((APPEND_MODE_KEY, append_mode)); + } + if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) { + hint_options.push((MERGE_MODE_KEY, merge_mode)); + } + hint_options.as_slice() + } // Set append_mode to true for log table. // because log tables should keep rows with the same ts and tags. AutoCreateTableType::Log => &[(APPEND_MODE_KEY, "true")], diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index f8c9e298d4b5..4c3cf2c72b0d 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -18,13 +18,16 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader}; use async_trait::async_trait; use common_error::status_code::StatusCode; use common_query::OutputData; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use futures::StreamExt; +use tonic::metadata::{KeyAndValueRef, MetadataMap}; use tonic::{Request, Response, Status, Streaming}; use crate::grpc::greptime_handler::GreptimeRequestHandler; use crate::grpc::{cancellation, TonicResult}; +pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint-"; + pub(crate) struct DatabaseService { handler: GreptimeRequestHandler, } @@ -42,10 +45,15 @@ impl GreptimeDatabase for DatabaseService { request: Request, ) -> TonicResult> { let remote_addr = request.remote_addr(); + let hints = extract_hints(request.metadata()); + debug!( + "GreptimeDatabase::Handle: request from {:?} with hints: {:?}", + remote_addr, hints + ); let handler = self.handler.clone(); let request_future = async move { let request = request.into_inner(); - let output = handler.handle_request(request).await?; + let output = handler.handle_request(request, hints).await?; let message = match output.data { OutputData::AffectedRows(rows) => GreptimeResponse { header: Some(ResponseHeader { @@ -83,6 +91,11 @@ impl GreptimeDatabase for DatabaseService { request: Request>, ) -> Result, Status> { let remote_addr = request.remote_addr(); + let hints = extract_hints(request.metadata()); + debug!( + "GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}", + remote_addr, hints + ); let handler = self.handler.clone(); let request_future = async move { let mut affected_rows = 0; @@ -90,7 +103,7 @@ impl GreptimeDatabase for DatabaseService { let mut stream = request.into_inner(); while let Some(request) = stream.next().await { let request = request?; - let output = handler.handle_request(request).await?; + let output = handler.handle_request(request, hints.clone()).await?; match output.data { OutputData::AffectedRows(rows) => affected_rows += rows, OutputData::Stream(_) | OutputData::RecordBatches(_) => { @@ -129,3 +142,58 @@ impl GreptimeDatabase for DatabaseService { cancellation::with_cancellation_handler(request_future, cancellation_future).await } } + +fn extract_hints(metadata: &MetadataMap) -> Vec<(String, String)> { + metadata + .iter() + .filter_map(|kv| { + let KeyAndValueRef::Ascii(key, value) = kv else { + return None; + }; + let key = key.as_str(); + if !key.starts_with(GREPTIME_DB_HEADER_HINT_PREFIX) { + return None; + } + let Ok(value) = value.to_str() else { + // Simply return None for non-string values. + return None; + }; + // Safety: we already checked the prefix. + let new_key = key + .strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX) + .unwrap() + .to_string(); + Some((new_key, value.trim().to_string())) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use tonic::metadata::MetadataValue; + + use super::*; + + #[test] + fn test_extract_hints() { + let mut metadata = MetadataMap::new(); + let prev = metadata.insert( + "x-greptime-hint-append_mode", + MetadataValue::from_static("true"), + ); + assert!(prev.is_none()); + let hints = extract_hints(&metadata); + assert_eq!(hints, vec![("append_mode".to_string(), "true".to_string())]); + } + + #[test] + fn extract_hints_ignores_non_ascii_metadata() { + let mut metadata = MetadataMap::new(); + metadata.insert_bin( + "x-greptime-hint-merge_mode-bin", + MetadataValue::from_bytes(b"last_non_null"), + ); + let hints = extract_hints(&metadata); + assert!(hints.is_empty()); + } +} diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index cd1d2a4bd019..76a6cc00cec1 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -167,7 +167,7 @@ impl FlightCraft for GreptimeRequestHandler { request_type = get_request_type(&request) ); async { - let output = self.handle_request(request).await?; + let output = self.handle_request(request, Default::default()).await?; let stream: Pin> + Send + Sync>> = to_flight_data_stream(output, TracingContext::from_current_span()); Ok(Response::new(stream)) diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 726e8dd3304c..550fc2683cd6 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -59,13 +59,17 @@ impl GreptimeRequestHandler { } #[tracing::instrument(skip_all, fields(protocol = "grpc", request_type = get_request_type(&request)))] - pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> Result { + pub(crate) async fn handle_request( + &self, + request: GreptimeRequest, + hints: Vec<(String, String)>, + ) -> Result { let query = request.request.context(InvalidQuerySnafu { reason: "Expecting non-empty GreptimeRequest.", })?; let header = request.header.as_ref(); - let query_ctx = create_query_context(header); + let query_ctx = create_query_context(header, hints); let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?; query_ctx.set_current_user(user_info); @@ -164,7 +168,10 @@ pub(crate) async fn auth( }) } -pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef { +pub(crate) fn create_query_context( + header: Option<&RequestHeader>, + extensions: Vec<(String, String)>, +) -> QueryContextRef { let (catalog, schema) = header .map(|header| { // We provide dbname field in newer versions of protos/sdks @@ -193,12 +200,14 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte ) }); let timezone = parse_timezone(header.map(|h| h.timezone.as_str())); - QueryContextBuilder::default() + let mut ctx_builder = QueryContextBuilder::default() .current_catalog(catalog) .current_schema(schema) - .timezone(timezone) - .build() - .into() + .timezone(timezone); + for (key, value) in extensions { + ctx_builder = ctx_builder.set_extension(key, value); + } + ctx_builder.build().into() } /// Histogram timer for handling gRPC request. diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index e2aeec897f7b..918631600a14 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -78,7 +78,7 @@ impl PrometheusGateway for PrometheusGatewayService { }; let header = inner.header.as_ref(); - let query_ctx = create_query_context(header); + let query_ctx = create_query_context(header, Default::default()); let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?; query_ctx.set_current_user(user_info); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 33332170db16..4649bb103835 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -73,6 +73,7 @@ macro_rules! grpc_tests { test_invalid_dbname, test_auto_create_table, + test_auto_create_table_with_hints, test_insert_and_select, test_dbname, test_grpc_message_size_ok, @@ -279,6 +280,17 @@ pub async fn test_auto_create_table(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_auto_create_table_with_hints(store_type: StorageType) { + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server(store_type, "auto_create_table_with_hints").await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); + insert_with_hints_and_assert(&db).await; + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + fn expect_data() -> (Column, Column, Column, Column) { // testing data: let expected_host_col = Column { @@ -379,6 +391,62 @@ pub async fn test_insert_and_select(store_type: StorageType) { guard.remove_all().await; } +async fn insert_with_hints_and_assert(db: &Database) { + // testing data: + let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); + + let request = InsertRequest { + table_name: "demo".to_string(), + columns: vec![ + expected_host_col.clone(), + expected_cpu_col.clone(), + expected_mem_col.clone(), + expected_ts_col.clone(), + ], + row_count: 4, + }; + let result = db + .insert_with_hints( + InsertRequests { + inserts: vec![request], + }, + &[("append_mode", "true")], + ) + .await; + assert_eq!(result.unwrap(), 4); + + // show table + let output = db.sql("SHOW CREATE TABLE demo;").await.unwrap(); + + let record_batches = match output.data { + OutputData::RecordBatches(record_batches) => record_batches, + OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + OutputData::AffectedRows(_) => unreachable!(), + }; + + let pretty = record_batches.pretty_print().unwrap(); + let expected = "\ ++-------+-------------------------------------+ +| Table | Create Table | ++-------+-------------------------------------+ +| demo | CREATE TABLE IF NOT EXISTS \"demo\" ( | +| | \"host\" STRING NULL, | +| | \"cpu\" DOUBLE NULL, | +| | \"memory\" DOUBLE NULL, | +| | \"ts\" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX (\"ts\"), | +| | PRIMARY KEY (\"host\") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | append_mode = 'true' | +| | ) | ++-------+-------------------------------------+\ +"; + assert_eq!(pretty, expected); +} + async fn insert_and_assert(db: &Database) { // testing data: let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();