From 2df635099bcee47e03fb9eccb21aa07f64b03958 Mon Sep 17 00:00:00 2001 From: Lilit0x Date: Thu, 21 Sep 2023 20:52:18 +0100 Subject: [PATCH 01/36] feat: open_tsdb support for row insert requests --- src/frontend/src/instance/opentsdb.rs | 8 +++--- src/servers/src/opentsdb.rs | 37 ++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 1ac8fe029048..e4d8ff14cf04 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::InsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; use servers::error as server_error; use servers::error::AuthSnafu; use servers::opentsdb::codec::DataPoint; +use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use snafu::prelude::*; @@ -34,11 +34,9 @@ impl OpentsdbProtocolHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::Opentsdb) .context(AuthSnafu)?; - let requests = InsertRequests { - inserts: vec![data_point.as_grpc_insert()], - }; + let (requests, _) = data_point_to_grpc_row_insert_requests(data_point)?; let _ = self - .handle_inserts(requests, ctx) + .handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) .with_context(|_| server_error::ExecuteQuerySnafu { diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 7c569cc2ffed..a53f3bc89468 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -20,6 +20,7 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; +use api::v1::RowInsertRequests; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::error; @@ -27,12 +28,14 @@ use futures::StreamExt; use tokio::sync::broadcast; use crate::error::Result; +use crate::opentsdb::codec::DataPoint; use crate::opentsdb::connection::Connection; use crate::opentsdb::handler::Handler; +use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; use crate::query_handler::OpentsdbProtocolHandlerRef; +use crate::row_writer::{self, MultiTableData}; use crate::server::{AbortableStream, BaseTcpServer, Server}; use crate::shutdown::Shutdown; - pub struct OpentsdbServer { base_server: BaseTcpServer, query_handler: OpentsdbProtocolHandlerRef, @@ -123,3 +126,35 @@ impl Server for OpentsdbServer { OPENTSDB_SERVER } } + +pub fn data_point_to_grpc_row_insert_requests( + data_point: &DataPoint, +) -> Result<(RowInsertRequests, usize)> { + let mut multi_table_data = MultiTableData::new(); + let table_name = data_point.metric(); + let tags = data_point.tags(); + let value = data_point.value(); + let timestamp = data_point.ts_millis(); + let num_columns = tags.len() + 1; + + let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 0); + let mut one_row = table_data.alloc_one_row(); + + //tags + let kvs = tags.iter().map(|(k, v)| (k.as_str(), v.as_str())); + row_writer::write_tags(table_data, kvs, &mut one_row)?; + + // value + row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?; + // timestamp + row_writer::write_ts_millis( + table_data, + TIMESTAMP_COLUMN_NAME, + Some(timestamp), + &mut one_row, + )?; + + table_data.add_row(one_row); + + Ok(multi_table_data.into_row_insert_requests()) +} From 67651e2be18ff51e77f8c4510b4f2d44af3bedbe Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 22 Sep 2023 09:57:48 +0800 Subject: [PATCH 02/36] refactor: remove SqlStatementExecutor (#2464) Signed-off-by: Ruihang Xia --- src/datanode/src/lib.rs | 16 ---------------- src/datanode/src/tests.rs | 22 +--------------------- src/query/src/query_engine.rs | 8 -------- 3 files changed, 1 insertion(+), 45 deletions(-) diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 890e5c38ce92..62a9bd9f309a 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -15,8 +15,6 @@ #![feature(assert_matches)] #![feature(trait_upcasting)] -use query::query_engine::SqlStatementExecutor; - pub mod alive_keeper; pub mod config; pub mod datanode; @@ -31,17 +29,3 @@ mod store; #[cfg(test)] #[allow(dead_code)] mod tests; - -// TODO(ruihang): remove this -pub struct Instance; - -#[async_trait::async_trait] -impl SqlStatementExecutor for Instance { - async fn execute_sql( - &self, - _stmt: sql::statements::statement::Statement, - _query_ctx: session::context::QueryContextRef, - ) -> query::error::Result { - unreachable!() - } -} diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 3407390b463a..43a06b34db96 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, RegionIdent}; +use common_meta::instruction::{Instruction, OpenRegion, RegionIdent}; use common_query::prelude::ScalarUdf; use common_query::Output; use common_runtime::Runtime; @@ -34,29 +34,9 @@ use query::query_engine::DescribeResult; use query::QueryEngine; use session::context::QueryContextRef; use table::TableRef; -use tokio::sync::mpsc::{self, Receiver}; use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; -use crate::Instance; - -struct HandlerTestGuard { - instance: Instance, - mailbox: Arc, - rx: Receiver<(MessageMeta, InstructionReply)>, -} - -async fn prepare_handler_test(_name: &str) -> HandlerTestGuard { - let instance = Instance; - let (tx, rx) = mpsc::channel(8); - let mailbox = Arc::new(HeartbeatMailbox::new(tx)); - - HandlerTestGuard { - instance, - mailbox, - rx, - } -} pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta { MessageMeta { diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index b0dde22c36c3..b18834d2c8ec 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -28,7 +28,6 @@ use common_query::prelude::ScalarUdf; use common_query::Output; use datatypes::schema::Schema; use session::context::QueryContextRef; -use sql::statements::statement::Statement; use table::TableRef; use crate::dataframe::DataFrame; @@ -40,8 +39,6 @@ pub use crate::query_engine::context::QueryEngineContext; pub use crate::query_engine::state::QueryEngineState; use crate::region_query::RegionQueryHandlerRef; -pub type SqlStatementExecutorRef = Arc; - /// Describe statement result #[derive(Debug)] pub struct DescribeResult { @@ -51,11 +48,6 @@ pub struct DescribeResult { pub logical_plan: LogicalPlan, } -#[async_trait] -pub trait SqlStatementExecutor: Send + Sync { - async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result; -} - #[async_trait] pub trait QueryEngine: Send + Sync { /// Returns the query engine as Any From 2292c4170ada0d04b0d89f1f85a13f0e5f912c1f Mon Sep 17 00:00:00 2001 From: LinFeng Date: Fri, 22 Sep 2023 10:07:46 +0800 Subject: [PATCH 03/36] feat: limit grpc message size (#2459) * feat: add two grpc config options Those options are for: * Limit receiving(decoding) message size * Limit sending(enoding) message size * test: add integration tests for message size limit --- src/client/src/client.rs | 18 +++++-- src/common/grpc/src/channel_manager.rs | 19 ++++--- src/datanode/src/config.rs | 9 ++++ src/datanode/src/server.rs | 7 ++- src/frontend/src/server.rs | 7 ++- src/frontend/src/service_config/grpc.rs | 9 ++++ src/servers/src/grpc.rs | 39 ++++++++++----- tests-integration/src/cluster.rs | 1 + tests-integration/src/test_util.rs | 15 +++++- tests-integration/tests/grpc.rs | 66 ++++++++++++++++++++++++- tests-integration/tests/http.rs | 2 + 11 files changed, 162 insertions(+), 30 deletions(-) diff --git a/src/client/src/client.rs b/src/client/src/client.rs index ada1ae92c56a..c5457e58741f 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -138,8 +138,12 @@ impl Client { Ok((addr, channel)) } - fn max_grpc_message_size(&self) -> usize { - self.inner.channel_manager.config().max_message_size + fn max_grpc_recv_message_size(&self) -> usize { + self.inner.channel_manager.config().max_recv_message_size + } + + fn max_grpc_send_message_size(&self) -> usize { + self.inner.channel_manager.config().max_send_message_size } pub(crate) fn make_flight_client(&self) -> Result { @@ -147,7 +151,8 @@ impl Client { Ok(FlightClient { addr, client: FlightServiceClient::new(channel) - .max_decoding_message_size(self.max_grpc_message_size()), + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()), }) } @@ -155,13 +160,16 @@ impl Client { let (_, channel) = self.find_channel()?; Ok(DatabaseClient { inner: GreptimeDatabaseClient::new(channel) - .max_decoding_message_size(self.max_grpc_message_size()), + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()), }) } pub(crate) fn raw_region_client(&self) -> Result> { let (_, channel) = self.find_channel()?; - Ok(PbRegionClient::new(channel).max_decoding_message_size(self.max_grpc_message_size())) + Ok(PbRegionClient::new(channel) + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size())) } pub fn make_prometheus_gateway_client(&self) -> Result> { diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 1102ac0fd303..f52177e2890d 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -31,7 +31,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10; -pub const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024; lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -248,9 +249,10 @@ pub struct ChannelConfig { pub tcp_keepalive: Option, pub tcp_nodelay: bool, pub client_tls: Option, - // Max gRPC message size - // TODO(dennis): make it configurable - pub max_message_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for ChannelConfig { @@ -269,7 +271,8 @@ impl Default for ChannelConfig { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } @@ -534,7 +537,8 @@ mod tests { tcp_keepalive: None, tcp_nodelay: true, client_tls: None, - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }, default_cfg ); @@ -577,7 +581,8 @@ mod tests { client_cert_path: "some_cert_path".to_string(), client_key_path: "some_key_path".to_string(), }), - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }, cfg ); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 61001f2e39d1..b3835e6e2e84 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -18,6 +18,9 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_config::WalConfig; +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; pub use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use file_engine::config::EngineConfig as FileEngineConfig; @@ -324,6 +327,10 @@ pub struct DatanodeOptions { pub rpc_addr: String, pub rpc_hostname: Option, pub rpc_runtime_size: usize, + // Max gRPC receiving(decoding) message size + pub rpc_max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub rpc_max_send_message_size: usize, pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, @@ -344,6 +351,8 @@ impl Default for DatanodeOptions { rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, rpc_runtime_size: 8, + rpc_max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, http: HttpOptions::default(), meta_client: None, wal: WalConfig::default(), diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 71c050dc0bbc..1847dc4c992a 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -16,7 +16,7 @@ use std::net::SocketAddr; use std::sync::Arc; use futures::future; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; @@ -39,9 +39,14 @@ impl Services { let flight_handler = Some(Arc::new(region_server.clone()) as _); let region_server_handler = Some(Arc::new(region_server.clone()) as _); let runtime = region_server.runtime(); + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.rpc_max_recv_message_size, + max_send_message_size: opts.rpc_max_send_message_size, + }; Ok(Self { grpc_server: GrpcServer::new( + Some(grpc_config), None, None, flight_handler, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d96178f1d3ad..5a61c3b48834 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -22,7 +22,7 @@ use common_runtime::Builder as RuntimeBuilder; use common_telemetry::info; use servers::configurator::ConfiguratorRef; use servers::error::Error::InternalIo; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -69,7 +69,12 @@ impl Services { .context(error::RuntimeResourceSnafu)?, ); + let grpc_config = GrpcServerConfig { + max_recv_message_size: opts.max_recv_message_size, + max_send_message_size: opts.max_send_message_size, + }; let grpc_server = GrpcServer::new( + Some(grpc_config), Some(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())), Some(instance.clone()), None, diff --git a/src/frontend/src/service_config/grpc.rs b/src/frontend/src/service_config/grpc.rs index 92d6ea771710..e0a64565015d 100644 --- a/src/frontend/src/service_config/grpc.rs +++ b/src/frontend/src/service_config/grpc.rs @@ -12,12 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GrpcOptions { pub addr: String, pub runtime_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for GrpcOptions { @@ -25,6 +32,8 @@ impl Default for GrpcOptions { Self { addr: "127.0.0.1:4001".to_string(), runtime_size: 8, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 5d5582de7faf..69ea1943a6fa 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -33,7 +33,9 @@ use arrow_flight::flight_service_server::FlightService; use arrow_flight::flight_service_server::FlightServiceServer; use async_trait::async_trait; use auth::UserProviderRef; -use common_grpc::channel_manager::DEFAULT_MAX_GRPC_MESSAGE_SIZE; +use common_grpc::channel_manager::{ + DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, +}; use common_runtime::Runtime; use common_telemetry::logging::info; use common_telemetry::{error, warn}; @@ -82,21 +84,24 @@ pub struct GrpcServer { /// Grpc Server configuration #[derive(Debug, Clone)] pub struct GrpcServerConfig { - // Max gRPC message size - // TODO(dennis): make it configurable - pub max_message_size: usize, + // Max gRPC receiving(decoding) message size + pub max_recv_message_size: usize, + // Max gRPC sending(encoding) message size + pub max_send_message_size: usize, } impl Default for GrpcServerConfig { fn default() -> Self { Self { - max_message_size: DEFAULT_MAX_GRPC_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, } } } impl GrpcServer { pub fn new( + config: Option, query_handler: Option, prometheus_handler: Option, flight_handler: Option, @@ -110,7 +115,7 @@ impl GrpcServer { let region_server_handler = region_server_handler .map(|handler| RegionServerRequestHandler::new(handler, runtime.clone())); Self { - config: GrpcServerConfig::default(), + config: config.unwrap_or_default(), shutdown_tx: Mutex::new(None), user_provider, serve_state: Mutex::new(None), @@ -201,7 +206,8 @@ impl Server for GrpcServer { } async fn start(&self, addr: SocketAddr) -> Result { - let max_message_size = self.config.max_message_size; + let max_recv_message_size = self.config.max_recv_message_size; + let max_send_message_size = self.config.max_send_message_size; let (tx, rx) = oneshot::channel(); let (listener, addr) = { let mut shutdown_tx = self.shutdown_tx.lock().await; @@ -227,7 +233,8 @@ impl Server for GrpcServer { if let Some(database_handler) = &self.database_handler { builder = builder.add_service( GreptimeDatabaseServer::new(DatabaseService::new(database_handler.clone())) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) } if let Some(prometheus_handler) = &self.prometheus_handler { @@ -237,18 +244,24 @@ impl Server for GrpcServer { if let Some(flight_handler) = &self.flight_handler { builder = builder.add_service( FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) } else { // TODO(ruihang): this is a temporary workaround before region server is ready. - builder = builder.add_service(FlightServiceServer::new(FlightCraftWrapper( - self.database_handler.clone().unwrap(), - ))) + builder = builder.add_service( + FlightServiceServer::new(FlightCraftWrapper( + self.database_handler.clone().unwrap(), + )) + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), + ) } if let Some(region_server_handler) = &self.region_server_handler { builder = builder.add_service( RegionServer::new(region_server_handler.clone()) - .max_decoding_message_size(max_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ); } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 9bdb11f90b00..013eeb681ec6 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -272,6 +272,7 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let flight_handler = Some(Arc::new(datanode.region_server()) as _); let region_server_handler = Some(Arc::new(datanode.region_server()) as _); let grpc_server = GrpcServer::new( + None, None, None, flight_handler, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index a6c0cf72fb65..e3370836eb2f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -39,7 +39,7 @@ use object_store::test_util::TempFolder; use object_store::ObjectStore; use secrecy::ExposeSecret; use servers::grpc::greptime_handler::GreptimeRequestHandler; -use servers::grpc::GrpcServer; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -423,13 +423,22 @@ pub async fn setup_grpc_server( store_type: StorageType, name: &str, ) -> (String, TestGuard, Arc) { - setup_grpc_server_with_user_provider(store_type, name, None).await + setup_grpc_server_with(store_type, name, None, None).await } pub async fn setup_grpc_server_with_user_provider( store_type: StorageType, name: &str, user_provider: Option, +) -> (String, TestGuard, Arc) { + setup_grpc_server_with(store_type, name, user_provider, None).await +} + +pub async fn setup_grpc_server_with( + store_type: StorageType, + name: &str, + user_provider: Option, + grpc_config: Option, ) -> (String, TestGuard, Arc) { let instance = setup_standalone_instance(name, store_type).await; @@ -447,7 +456,9 @@ pub async fn setup_grpc_server_with_user_provider( user_provider.clone(), runtime.clone(), )); + let fe_grpc_server = Arc::new(GrpcServer::new( + grpc_config, Some(ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone())), Some(fe_instance_ref.clone()), Some(flight_handler), diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7f2f20d87ec8..3d766a5a37f0 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -24,10 +24,11 @@ use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; use common_query::Output; use common_recordbatch::RecordBatches; +use servers::grpc::GrpcServerConfig; use servers::http::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse}; use servers::server::Server; use tests_integration::test_util::{ - setup_grpc_server, setup_grpc_server_with_user_provider, StorageType, + setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, StorageType, }; #[macro_export] @@ -64,6 +65,9 @@ macro_rules! grpc_tests { test_auto_create_table, test_insert_and_select, test_dbname, + test_grpc_message_size_ok, + test_grpc_message_size_limit_recv, + test_grpc_message_size_limit_send, test_grpc_auth, test_health_check, test_prom_gateway_query, @@ -115,6 +119,66 @@ pub async fn test_dbname(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_grpc_message_size_ok(store_type: StorageType) { + let config = GrpcServerConfig { + max_recv_message_size: 1024, + max_send_message_size: 1024, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + db.sql("show tables;").await.unwrap(); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + +pub async fn test_grpc_message_size_limit_send(store_type: StorageType) { + let config = GrpcServerConfig { + max_recv_message_size: 1024, + max_send_message_size: 50, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + let err_msg = db.sql("show tables;").await.unwrap_err().to_string(); + assert!(err_msg.contains("message length too large"), "{}", err_msg); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + +pub async fn test_grpc_message_size_limit_recv(store_type: StorageType) { + let config = GrpcServerConfig { + max_recv_message_size: 10, + max_send_message_size: 1024, + }; + let (addr, mut guard, fe_grpc_server) = + setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await; + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new_with_dbname( + format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + grpc_client, + ); + let err_msg = db.sql("show tables;").await.unwrap_err().to_string(); + assert!( + err_msg.contains("Operation was attempted past the valid range"), + "{}", + err_msg + ); + let _ = fe_grpc_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_grpc_auth(store_type: StorageType) { let user_provider = user_provider_from_option( &"static_user_provider:cmd:greptime_user=greptime_pwd".to_string(), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c186dab8ba7f..0317482da7f4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -610,6 +610,8 @@ node_id = 0 require_lease_before_startup = true rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 +rpc_max_recv_message_size = 536870912 +rpc_max_send_message_size = 536870912 enable_telemetry = true [heartbeat] From f9315bdeabd7d956dc6018e3fd7ec1c5a5447694 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 22 Sep 2023 10:27:29 +0800 Subject: [PATCH 04/36] chore: remove unused region_stats method form table (#2458) chore: remove unused region_status method form table --- src/catalog/src/lib.rs | 67 +----------------------------------------- src/table/src/table.rs | 9 ------ 2 files changed, 1 insertion(+), 75 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 3dd45213ed7b..25e1f10d1753 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -20,10 +20,8 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use api::v1::meta::RegionStat; -use common_telemetry::warn; use futures::future::BoxFuture; -use table::metadata::{TableId, TableType}; +use table::metadata::TableId; use table::requests::CreateTableRequest; use table::TableRef; @@ -124,66 +122,3 @@ pub struct RegisterSchemaRequest { pub catalog: String, pub schema: String, } - -/// The stat of regions in the datanode node. -/// The number of regions can be got from len of vec. -/// -/// Ignores any errors occurred during iterating regions. The intention of this method is to -/// collect region stats that will be carried in Datanode's heartbeat to Metasrv, so it's a -/// "try our best" job. -pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec) { - let mut region_number: u64 = 0; - let mut region_stats = Vec::new(); - - let Ok(catalog_names) = catalog_manager.catalog_names().await else { - return (region_number, region_stats); - }; - for catalog_name in catalog_names { - let Ok(schema_names) = catalog_manager.schema_names(&catalog_name).await else { - continue; - }; - for schema_name in schema_names { - let Ok(table_names) = catalog_manager - .table_names(&catalog_name, &schema_name) - .await - else { - continue; - }; - for table_name in table_names { - let Ok(Some(table)) = catalog_manager - .table(&catalog_name, &schema_name, &table_name) - .await - else { - continue; - }; - - if table.table_type() != TableType::Base { - continue; - } - - let table_info = table.table_info(); - let region_numbers = &table_info.meta.region_numbers; - region_number += region_numbers.len() as u64; - - let engine = &table_info.meta.engine; - - match table.region_stats() { - Ok(stats) => { - let stats = stats.into_iter().map(|stat| RegionStat { - region_id: stat.region_id, - approximate_bytes: stat.disk_usage_bytes as i64, - engine: engine.clone(), - ..Default::default() - }); - - region_stats.extend(stats); - } - Err(e) => { - warn!("Failed to get region status, err: {:?}", e); - } - }; - } - } - } - (region_number, region_stats) -} diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 099481a9077b..8af3cfd1ffec 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -30,7 +30,6 @@ use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; use crate::stats::TableStatistics; -use crate::RegionStat; pub type AlterContext = anymap::Map; @@ -101,14 +100,6 @@ pub trait Table: Send + Sync { Ok(()) } - /// Get region stats in this table. - fn region_stats(&self) -> Result> { - UnsupportedSnafu { - operation: "REGION_STATS", - } - .fail()? - } - /// Return true if contains the region fn contains_region(&self, _region: RegionNumber) -> Result { UnsupportedSnafu { From 37156fc4ff3886632f67cba3c6dd1f18826c204e Mon Sep 17 00:00:00 2001 From: Baasit Date: Fri, 22 Sep 2023 03:34:57 +0100 Subject: [PATCH 05/36] feat: support for show full tables (#2410) * feat: added show tables command * fix(tests): fixed parser and statement unit tests * chore: implemeted display trait for table type * fix: handled no tabletype and error for usopprted command in show databse * chore: removed full as a show kind, instead as a show option * chore(tests): fixed failing test and added more tests for show full * chore: refactored table types to use filters * fix: changed table_type to tables --- src/datatypes/src/vectors/helper.rs | 40 +++++++++++++ src/query/src/sql.rs | 81 ++++++++++++++++++++++--- src/sql/src/parsers/show_parser.rs | 91 ++++++++++++++++++++++++++++- src/sql/src/statements/show.rs | 1 + src/table/src/metadata.rs | 10 ++++ 5 files changed, 211 insertions(+), 12 deletions(-) diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 37e2cbb47be7..8d3128411259 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -350,6 +350,15 @@ impl Helper { let result = compute::filter(&array, &filter).context(error::ArrowComputeSnafu)?; Helper::try_into_vector(result) } + + pub fn like_utf8_filter(names: Vec, s: &str) -> Result<(VectorRef, BooleanVector)> { + let array = StringArray::from(names); + let filter = comparison::like_utf8_scalar(&array, s).context(error::ArrowComputeSnafu)?; + let result = compute::filter(&array, &filter).context(error::ArrowComputeSnafu)?; + let vector = Helper::try_into_vector(result)?; + + Ok((vector, BooleanVector::from(filter))) + } } #[cfg(test)] @@ -463,6 +472,37 @@ mod tests { assert_vector(vec!["greptime", "hello", "public", "world"], &ret); } + #[test] + fn test_like_utf8_filter() { + fn assert_vector(expected: Vec<&str>, actual: &VectorRef) { + let actual = actual.as_any().downcast_ref::().unwrap(); + assert_eq!(*actual, StringVector::from(expected)); + } + + fn assert_filter(array: Vec, s: &str, expected_filter: &BooleanVector) { + let array = StringArray::from(array); + let actual_filter = comparison::like_utf8_scalar(&array, s).unwrap(); + assert_eq!(BooleanVector::from(actual_filter), *expected_filter); + } + + let names: Vec = vec!["greptime", "timeseries", "cloud", "database"] + .into_iter() + .map(|x| x.to_string()) + .collect(); + + let (table, filter) = Helper::like_utf8_filter(names.clone(), "%ti%").unwrap(); + assert_vector(vec!["greptime", "timeseries"], &table); + assert_filter(names.clone(), "%ti%", &filter); + + let (tables, filter) = Helper::like_utf8_filter(names.clone(), "%lou").unwrap(); + assert_vector(vec![], &tables); + assert_filter(names.clone(), "%lou", &filter); + + let (tables, filter) = Helper::like_utf8_filter(names.clone(), "%d%").unwrap(); + assert_vector(vec!["cloud", "database"], &tables); + assert_filter(names.clone(), "%d%", &filter); + } + fn check_try_into_vector(array: impl Array + 'static) { let array: ArrayRef = Arc::new(array); let vector = Helper::try_into_vector(array.clone()).unwrap(); diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 8a99671a2246..4369d4205c3e 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -144,42 +144,86 @@ pub async fn show_tables( catalog_manager: CatalogManagerRef, query_ctx: QueryContextRef, ) -> Result { - let schema = if let Some(database) = stmt.database { + let schema_name = if let Some(database) = stmt.database { database } else { query_ctx.current_schema().to_owned() }; // TODO(sunng87): move this function into query_ctx let mut tables = catalog_manager - .table_names(query_ctx.current_catalog(), &schema) + .table_names(query_ctx.current_catalog(), &schema_name) .await .context(error::CatalogSnafu)?; // TODO(dennis): Specify the order of the results in schema provider API tables.sort(); - let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + + let table_types: Option> = { + if stmt.full { + Some( + get_table_types( + &tables, + catalog_manager.clone(), + query_ctx.clone(), + &schema_name, + ) + .await?, + ) + } else { + None + } + }; + + let mut column_schema = vec![ColumnSchema::new( TABLES_COLUMN, ConcreteDataType::string_datatype(), false, - )])); + )]; + if table_types.is_some() { + column_schema.push(ColumnSchema::new( + "Table_type", + ConcreteDataType::string_datatype(), + false, + )); + } + + let schema = Arc::new(Schema::new(column_schema)); + match stmt.kind { ShowKind::All => { let tables = Arc::new(StringVector::from(tables)) as _; - let records = RecordBatches::try_from_columns(schema, vec![tables]) + let mut columns = vec![tables]; + if let Some(table_types) = table_types { + columns.push(table_types) + } + + let records = RecordBatches::try_from_columns(schema, columns) .context(error::CreateRecordBatchSnafu)?; Ok(Output::RecordBatches(records)) } ShowKind::Where(filter) => { - let columns = vec![Arc::new(StringVector::from(tables)) as _]; + let mut columns = vec![Arc::new(StringVector::from(tables)) as _]; + if let Some(table_types) = table_types { + columns.push(table_types) + } let record_batch = RecordBatch::new(schema, columns).context(error::CreateRecordBatchSnafu)?; let result = execute_show_with_filter(record_batch, Some(filter)).await?; Ok(result) } ShowKind::Like(ident) => { - let tables = - Helper::like_utf8(tables, &ident.value).context(error::VectorComputationSnafu)?; - let records = RecordBatches::try_from_columns(schema, vec![tables]) + let (tables, filter) = Helper::like_utf8_filter(tables, &ident.value) + .context(error::VectorComputationSnafu)?; + let mut columns = vec![tables]; + + if let Some(table_types) = table_types { + let table_types = table_types + .filter(&filter) + .context(error::VectorComputationSnafu)?; + columns.push(table_types) + } + + let records = RecordBatches::try_from_columns(schema, columns) .context(error::CreateRecordBatchSnafu)?; Ok(Output::RecordBatches(records)) } @@ -452,6 +496,25 @@ fn parse_file_table_format(options: &HashMap) -> Result Result> { + let mut table_types = Vec::with_capacity(tables.len()); + for table_name in tables { + if let Some(table) = catalog_manager + .table(query_ctx.current_catalog(), schema_name, table_name) + .await + .context(error::CatalogSnafu)? + { + table_types.push(table.table_type().to_string()); + } + } + Ok(Arc::new(StringVector::from(table_types)) as _) +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/sql/src/parsers/show_parser.rs b/src/sql/src/parsers/show_parser.rs index 90baef8ff6fc..48a64e86ec5f 100644 --- a/src/sql/src/parsers/show_parser.rs +++ b/src/sql/src/parsers/show_parser.rs @@ -30,13 +30,19 @@ impl<'a> ParserContext<'a> { self.parse_show_databases() } else if self.matches_keyword(Keyword::TABLES) { let _ = self.parser.next_token(); - self.parse_show_tables() + self.parse_show_tables(false) } else if self.consume_token("CREATE") { if self.consume_token("TABLE") { self.parse_show_create_table() } else { self.unsupported(self.peek_token_as_string()) } + } else if self.consume_token("FULL") { + if self.consume_token("TABLES") { + self.parse_show_tables(true) + } else { + self.unsupported(self.peek_token_as_string()) + } } else { self.unsupported(self.peek_token_as_string()) } @@ -61,12 +67,13 @@ impl<'a> ParserContext<'a> { Ok(Statement::ShowCreateTable(ShowCreateTable { table_name })) } - fn parse_show_tables(&mut self) -> Result { + fn parse_show_tables(&mut self, full: bool) -> Result { let database = match self.parser.peek_token().token { Token::EOF | Token::SemiColon => { return Ok(Statement::ShowTables(ShowTables { kind: ShowKind::All, database: None, + full, })); } @@ -126,7 +133,11 @@ impl<'a> ParserContext<'a> { _ => return self.unsupported(self.peek_token_as_string()), }; - Ok(Statement::ShowTables(ShowTables { kind, database })) + Ok(Statement::ShowTables(ShowTables { + kind, + database, + full, + })) } /// Parses `SHOW DATABASES` statement. @@ -234,6 +245,7 @@ mod tests { Statement::ShowTables(ShowTables { kind: ShowKind::All, database: None, + full: false }) ); } @@ -253,6 +265,7 @@ mod tests { quote_style: None, }), database: None, + full: false }) ); @@ -269,6 +282,7 @@ mod tests { quote_style: None, }), database: Some(_), + full: false }) ); } @@ -285,6 +299,7 @@ mod tests { Statement::ShowTables(ShowTables { kind: ShowKind::Where(sqlparser::ast::Expr::Like { .. }), database: None, + full: false }) ); @@ -298,6 +313,76 @@ mod tests { Statement::ShowTables(ShowTables { kind: ShowKind::Where(sqlparser::ast::Expr::Like { .. }), database: Some(_), + full: false + }) + ); + } + + #[test] + pub fn test_show_full_tables() { + let sql = "SHOW FULL TABLES"; + let stmts = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::ShowTables { .. }); + match &stmts[0] { + Statement::ShowTables(show) => { + assert!(show.full); + } + _ => { + unreachable!(); + } + } + } + + #[test] + pub fn test_show_full_tables_where() { + let sql = "SHOW FULL TABLES IN test_db WHERE Tables LIKE test_table"; + let stmts = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap(); + assert_eq!(1, stmts.len()); + + assert_matches!( + &stmts[0], + Statement::ShowTables(ShowTables { + kind: ShowKind::Where(sqlparser::ast::Expr::Like { .. }), + database: Some(_), + full: true + }) + ); + } + + #[test] + pub fn test_show_full_tables_like() { + let sql = "SHOW FULL TABLES LIKE test_table"; + let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}); + let stmts = result.unwrap(); + assert_eq!(1, stmts.len()); + + assert_matches!( + &stmts[0], + Statement::ShowTables(ShowTables { + kind: ShowKind::Like(sqlparser::ast::Ident { + value: _, + quote_style: None, + }), + database: None, + full: true + }) + ); + + let sql = "SHOW FULL TABLES in test_db LIKE test_table"; + let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}); + let stmts = result.unwrap(); + assert_eq!(1, stmts.len()); + + assert_matches!( + &stmts[0], + Statement::ShowTables(ShowTables { + kind: ShowKind::Like(sqlparser::ast::Ident { + value: _, + quote_style: None, + }), + database: Some(_), + full: true }) ); } diff --git a/src/sql/src/statements/show.rs b/src/sql/src/statements/show.rs index eed546fba907..b0d712142863 100644 --- a/src/sql/src/statements/show.rs +++ b/src/sql/src/statements/show.rs @@ -54,6 +54,7 @@ impl ShowDatabases { pub struct ShowTables { pub kind: ShowKind, pub database: Option, + pub full: bool, } /// SQL structure for `SHOW CREATE TABLE`. diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index f27d4d1e6145..628b7ce2c29c 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -80,6 +80,16 @@ pub enum TableType { Temporary, } +impl std::fmt::Display for TableType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TableType::Base => f.write_str("BASE TABLE"), + TableType::Temporary => f.write_str("TEMPORARY"), + TableType::View => f.write_str("VIEW"), + } + } +} + /// Identifier of the table. #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)] pub struct TableIdent { From 8a12762ff60e5bcf1c70f19e260cda299968f5a4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 22 Sep 2023 11:24:49 +0800 Subject: [PATCH 06/36] feat: update proto and remove create_if_not_exists (#2467) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/ddl/create_table.rs | 1 - src/file-engine/src/engine.rs | 15 +++------------ src/file-engine/src/error.rs | 7 ------- src/file-engine/src/region.rs | 3 --- src/meta-srv/src/procedure/tests.rs | 1 - src/mito2/src/error.rs | 7 ------- src/mito2/src/test_util.rs | 1 - src/store-api/src/region_request.rs | 4 ---- 10 files changed, 5 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3aa3264286ea..349938a47461 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4186,7 +4186,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9d3f28d07d29607d0e3c1823f4a4d2bc229d05b9#9d3f28d07d29607d0e3c1823f4a4d2bc229d05b9" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=115c1080773be8a819e50b257fece9f839a0c836#115c1080773be8a819e50b257fece9f839a0c836" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8513ed610edd..426b07d468b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9d3f28d07d29607d0e3c1823f4a4d2bc229d05b9" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "115c1080773be8a819e50b257fece9f839a0c836" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 3712ce6ccbbb..87a6cf0dc046 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -161,7 +161,6 @@ impl CreateTableProcedure { engine: create_table_expr.engine.to_string(), column_defs, primary_key, - create_if_not_exists: true, path: String::new(), options: create_table_expr.table_options.clone(), }) diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index fe853701d204..6a9a97e264da 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -33,8 +33,7 @@ use tokio::sync::{Mutex, RwLock}; use crate::config::EngineConfig; use crate::error::{ - RegionExistsSnafu, RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, - UnsupportedSnafu, + RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, UnsupportedSnafu, }; use crate::region::{FileRegion, FileRegionRef}; @@ -168,11 +167,7 @@ impl EngineInner { ); if self.exists(region_id).await { - return if request.create_if_not_exists { - Ok(Output::AffectedRows(0)) - } else { - RegionExistsSnafu { region_id }.fail() - }; + return Ok(Output::AffectedRows(0)); } info!("Try to create region, region_id: {}", region_id); @@ -180,11 +175,7 @@ impl EngineInner { let _lock = self.region_mutex.lock().await; // Check again after acquiring the lock if self.exists(region_id).await { - return if request.create_if_not_exists { - Ok(Output::AffectedRows(0)) - } else { - RegionExistsSnafu { region_id }.fail() - }; + return Ok(Output::AffectedRows(0)); } let res = FileRegion::create(region_id, request, &self.object_store).await; diff --git a/src/file-engine/src/error.rs b/src/file-engine/src/error.rs index 57e2d26b11b9..87cf49278315 100644 --- a/src/file-engine/src/error.rs +++ b/src/file-engine/src/error.rs @@ -40,12 +40,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Region {} already exists", region_id))] - RegionExists { - region_id: RegionId, - location: Location, - }, - #[snafu(display("Region not found, region_id: {}", region_id))] RegionNotFound { region_id: RegionId, @@ -191,7 +185,6 @@ impl ErrorExt for Error { | CreateDefault { .. } | MissingColumnNoDefault { .. } => StatusCode::InvalidArguments, - RegionExists { .. } => StatusCode::RegionAlreadyExists, RegionNotFound { .. } => StatusCode::RegionNotFound, BuildBackend { source, .. } => source.status_code(), diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index 6340bbc81ed1..218a5e49d5ed 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -112,7 +112,6 @@ mod tests { engine: "file".to_string(), column_metadatas: new_test_column_metadata(), primary_key: vec![1], - create_if_not_exists: true, options: new_test_options(), region_dir: "create_region_dir/".to_string(), }; @@ -151,7 +150,6 @@ mod tests { engine: "file".to_string(), column_metadatas: new_test_column_metadata(), primary_key: vec![1], - create_if_not_exists: true, options: new_test_options(), region_dir: region_dir.clone(), }; @@ -189,7 +187,6 @@ mod tests { engine: "file".to_string(), column_metadatas: new_test_column_metadata(), primary_key: vec![1], - create_if_not_exists: true, options: new_test_options(), region_dir: region_dir.clone(), }; diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index eac9dfd16b92..785d0f8edacb 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -151,7 +151,6 @@ fn test_create_region_request_template() { }, ], primary_key: vec![2, 1], - create_if_not_exists: true, path: String::new(), options: HashMap::new(), }; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3502fe217f93..06aa47a9cd74 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -98,12 +98,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Region {} already exists", region_id))] - RegionExists { - region_id: RegionId, - location: Location, - }, - #[snafu(display("Failed to create RecordBatch from vectors"))] NewRecordBatch { location: Location, @@ -405,7 +399,6 @@ impl ErrorExt for Error { | CreateDefault { .. } | InvalidParquet { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, - RegionExists { .. } => StatusCode::RegionAlreadyExists, InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidRequest { .. } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 887466838e43..2ad545825de2 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -304,7 +304,6 @@ impl CreateRequestBuilder { column_metadatas, primary_key: self.primary_key.clone().unwrap_or(primary_key), options: self.options.clone(), - create_if_not_exists: false, region_dir: self.region_dir.clone(), } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 693e1a55c5c4..9085943f21e0 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -79,7 +79,6 @@ impl RegionRequest { engine: create.engine, column_metadatas, primary_key: create.primary_key, - create_if_not_exists: create.create_if_not_exists, options: create.options, region_dir, }), @@ -150,9 +149,6 @@ pub struct RegionCreateRequest { pub column_metadatas: Vec, /// Columns in the primary key. pub primary_key: Vec, - /// Create region if not exists. - // TODO(yingwen): Remove this. - pub create_if_not_exists: bool, /// Options of the created region. pub options: HashMap, /// Directory for region's data home. Usually is composed by catalog and table id From e2a8c0c296f5144b9c143c25dc9e934f96098d0a Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 22 Sep 2023 14:13:12 +0800 Subject: [PATCH 07/36] fix(mito): compaction scheduler schedules more tasks than expected (#2466) * test: test on_compaction_finished * fix: avoid submit same region to compact * feat: persist and recover compaction time window * test: fix test * test: sort like result --- src/mito2/src/compaction.rs | 150 +++++++++++++++--- src/mito2/src/compaction/twcs.rs | 9 +- src/mito2/src/manifest/action.rs | 13 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 +- src/mito2/src/region/opener.rs | 1 + src/mito2/src/region/version.rs | 16 ++ src/mito2/src/request.rs | 3 + src/mito2/src/test_util/scheduler_util.rs | 6 + src/mito2/src/test_util/version_util.rs | 39 +++++ src/mito2/src/worker/handle_compaction.rs | 2 +- .../standalone/common/select/like.result | 1 + tests/cases/standalone/common/select/like.sql | 1 + 12 files changed, 219 insertions(+), 24 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 23ce3517e0ed..38327f10ca81 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef; pub struct CompactionRequest { pub(crate) current_version: VersionRef, pub(crate) access_layer: AccessLayerRef, - pub(crate) compaction_time_window: Option, /// Sender to send notification to the region worker. pub(crate) request_sender: mpsc::Sender, /// Waiters of the compaction request. @@ -101,24 +100,21 @@ impl CompactionScheduler { file_purger: &FilePurgerRef, waiter: OptionOutputTx, ) -> Result<()> { - let status = self.region_status.entry(region_id).or_insert_with(|| { - CompactionStatus::new( - region_id, - version_control.clone(), - access_layer.clone(), - file_purger.clone(), - ) - }); - if status.compacting { + if let Some(status) = self.region_status.get_mut(®ion_id) { // Region is compacting. Add the waiter to pending list. status.merge_waiter(waiter); return Ok(()); } // The region can compact directly. + let mut status = CompactionStatus::new( + region_id, + version_control.clone(), + access_layer.clone(), + file_purger.clone(), + ); let request = status.new_compaction_request(self.request_sender.clone(), waiter); - // Mark the region as compacting. - status.compacting = true; + self.region_status.insert(region_id, status); self.schedule_compaction_request(request) } @@ -127,7 +123,6 @@ impl CompactionScheduler { let Some(status) = self.region_status.get_mut(®ion_id) else { return; }; - status.compacting = false; // We should always try to compact the region until picker returns None. let request = status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none()); @@ -252,8 +247,6 @@ struct CompactionStatus { access_layer: AccessLayerRef, /// File purger of the region. file_purger: FilePurgerRef, - /// Whether a compaction task is running. - compacting: bool, /// Compaction pending to schedule. /// /// For simplicity, we merge all pending compaction requests into one. @@ -273,7 +266,6 @@ impl CompactionStatus { version_control, access_layer, file_purger, - compacting: false, pending_compaction: None, } } @@ -306,8 +298,6 @@ impl CompactionStatus { let mut req = CompactionRequest { current_version, access_layer: self.access_layer.clone(), - // TODO(hl): get persisted region compaction time window - compaction_time_window: None, request_sender: request_sender.clone(), waiters: Vec::new(), file_purger: self.file_purger.clone(), @@ -324,12 +314,15 @@ impl CompactionStatus { #[cfg(test)] mod tests { + use std::sync::Mutex; + use common_query::Output; use tokio::sync::oneshot; use super::*; + use crate::schedule::scheduler::{Job, Scheduler}; use crate::test_util::scheduler_util::SchedulerEnv; - use crate::test_util::version_util::VersionControlBuilder; + use crate::test_util::version_util::{apply_edit, VersionControlBuilder}; #[tokio::test] async fn test_schedule_empty() { @@ -373,4 +366,123 @@ mod tests { assert!(matches!(output, Output::AffectedRows(0))); assert!(scheduler.region_status.is_empty()); } + + #[derive(Default)] + struct VecScheduler { + jobs: Mutex>, + } + + impl VecScheduler { + fn num_jobs(&self) -> usize { + self.jobs.lock().unwrap().len() + } + } + + #[async_trait::async_trait] + impl Scheduler for VecScheduler { + fn schedule(&self, job: Job) -> Result<()> { + self.jobs.lock().unwrap().push(job); + Ok(()) + } + + async fn stop(&self, _await_termination: bool) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_schedule_on_finished() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + let region_id = builder.region_id(); + + // 5 files to compact. + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + scheduler + .schedule_compaction( + region_id, + &version_control, + &env.access_layer, + &purger, + OptionOutputTx::none(), + ) + .unwrap(); + // Should schedule 1 compaction. + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + let data = version_control.current(); + let file_metas: Vec<_> = data.version.ssts.levels()[0] + .files + .values() + .map(|file| file.meta()) + .collect(); + + // 5 files for next compaction and removes old files. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &file_metas, + purger.clone(), + ); + // The task is pending. + scheduler + .schedule_compaction( + region_id, + &version_control, + &env.access_layer, + &purger, + OptionOutputTx::none(), + ) + .unwrap(); + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + assert!(scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .pending_compaction + .is_some()); + + // On compaction finished and schedule next compaction. + scheduler.on_compaction_finished(region_id); + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(2, job_scheduler.num_jobs()); + // 5 files for next compaction. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &[], + purger.clone(), + ); + // The task is pending. + scheduler + .schedule_compaction( + region_id, + &version_control, + &env.access_layer, + &purger, + OptionOutputTx::none(), + ) + .unwrap(); + assert_eq!(2, job_scheduler.num_jobs()); + assert!(scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .pending_compaction + .is_some()); + } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 5f03a3aa5fdb..f840d6aeb2f9 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -120,7 +120,6 @@ impl Picker for TwcsPicker { let CompactionRequest { current_version, access_layer, - compaction_time_window, request_sender, waiters, file_purger, @@ -138,6 +137,9 @@ impl Picker for TwcsPicker { expired_ssts.iter().for_each(|f| f.set_compacting(true)); } + let compaction_time_window = current_version + .compaction_time_window + .map(|window| window.as_secs() as i64); let time_window_size = compaction_time_window .or(self.time_window_seconds) .unwrap_or_else(|| { @@ -169,7 +171,7 @@ impl Picker for TwcsPicker { outputs, expired_ssts, sst_write_buffer_size: ReadableSize::mb(4), - compaction_time_window: None, + compaction_time_window: Some(time_window_size), request_sender, waiters, file_purger, @@ -357,6 +359,9 @@ impl CompactionTask for TwcsCompactionTask { compacted_files: deleted, senders: std::mem::take(&mut self.waiters), file_purger: self.file_purger.clone(), + compaction_time_window: self + .compaction_time_window + .map(|seconds| Duration::from_secs(seconds as u64)), }) } Err(e) => { diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index a39a89cc739d..aa490fad80fa 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -15,6 +15,7 @@ //! Defines [RegionMetaAction] related structs and [RegionCheckpoint]. use std::collections::HashMap; +use std::time::Duration; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -49,7 +50,8 @@ pub struct RegionChange { pub struct RegionEdit { pub files_to_add: Vec, pub files_to_remove: Vec, - pub compaction_time_window: Option, + #[serde(with = "humantime_serde")] + pub compaction_time_window: Option, pub flushed_entry_id: Option, pub flushed_sequence: Option, } @@ -84,6 +86,9 @@ pub struct RegionManifest { pub manifest_version: ManifestVersion, /// Last WAL entry id of truncated data. pub truncated_entry_id: Option, + /// Inferred compaction time window. + #[serde(with = "humantime_serde")] + pub compaction_time_window: Option, } #[derive(Debug, Default)] @@ -94,6 +99,7 @@ pub struct RegionManifestBuilder { flushed_sequence: SequenceNumber, manifest_version: ManifestVersion, truncated_entry_id: Option, + compaction_time_window: Option, } impl RegionManifestBuilder { @@ -107,6 +113,7 @@ impl RegionManifestBuilder { manifest_version: s.manifest_version, flushed_sequence: s.flushed_sequence, truncated_entry_id: s.truncated_entry_id, + compaction_time_window: s.compaction_time_window, } } else { Default::default() @@ -132,6 +139,9 @@ impl RegionManifestBuilder { if let Some(flushed_sequence) = edit.flushed_sequence { self.flushed_sequence = self.flushed_sequence.max(flushed_sequence); } + if let Some(window) = edit.compaction_time_window { + self.compaction_time_window = Some(window); + } } pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) { @@ -156,6 +166,7 @@ impl RegionManifestBuilder { flushed_sequence: self.flushed_sequence, manifest_version: self.manifest_version, truncated_entry_id: self.truncated_entry_id, + compaction_time_window: self.compaction_time_window, }) } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index e1b1c54cd82c..68c7063e1e63 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -150,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":816,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":846,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index fb6e5f89896b..211cecb75047 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -218,6 +218,7 @@ impl RegionOpener { .flushed_entry_id(manifest.flushed_entry_id) .flushed_sequence(manifest.flushed_sequence) .truncated_entry_id(manifest.truncated_entry_id) + .compaction_time_window(manifest.compaction_time_window) .options(options) .build(); let flushed_entry_id = version.flushed_entry_id; diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index e88fafedaf08..c7d84fd913df 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -24,6 +24,7 @@ //! and became invisible between step 1 and 2, so need to acquire version at first. use std::sync::{Arc, RwLock}; +use std::time::Duration; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; @@ -205,6 +206,8 @@ pub(crate) struct Version { /// /// Used to check if it is a flush task during the truncating table. pub(crate) truncated_entry_id: Option, + /// Inferred compaction time window. + pub(crate) compaction_time_window: Option, /// Options of the region. pub(crate) options: RegionOptions, } @@ -219,6 +222,7 @@ pub(crate) struct VersionBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, truncated_entry_id: Option, + compaction_time_window: Option, options: RegionOptions, } @@ -232,6 +236,7 @@ impl VersionBuilder { flushed_entry_id: 0, flushed_sequence: 0, truncated_entry_id: None, + compaction_time_window: None, options: RegionOptions::default(), } } @@ -245,6 +250,7 @@ impl VersionBuilder { flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, truncated_entry_id: version.truncated_entry_id, + compaction_time_window: version.compaction_time_window, options: version.options.clone(), } } @@ -279,6 +285,12 @@ impl VersionBuilder { self } + /// Sets compaction time window. + pub(crate) fn compaction_time_window(mut self, window: Option) -> Self { + self.compaction_time_window = window; + self + } + /// Sets options. pub(crate) fn options(mut self, options: RegionOptions) -> Self { self.options = options; @@ -293,6 +305,9 @@ impl VersionBuilder { if let Some(sequence) = edit.flushed_sequence { self.flushed_sequence = self.flushed_sequence.max(sequence); } + if let Some(window) = edit.compaction_time_window { + self.compaction_time_window = Some(window); + } if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() { let mut ssts = (*self.ssts).clone(); ssts.add_files(file_purger, edit.files_to_add.into_iter()); @@ -335,6 +350,7 @@ impl VersionBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, truncated_entry_id: self.truncated_entry_id, + compaction_time_window: self.compaction_time_window, options: self.options, } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 1fbcbc962338..33cc121a5a3e 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use api::helper::{ is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, @@ -640,6 +641,8 @@ pub(crate) struct CompactionFinished { pub(crate) senders: Vec, /// File purger for cleaning files on failure. pub(crate) file_purger: FilePurgerRef, + /// Inferred Compaction time window. + pub(crate) compaction_time_window: Option, } impl CompactionFinished { diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 5e1e259e75d9..611371a7ec0b 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -52,6 +52,12 @@ impl SchedulerEnv { } } + /// Set scheduler. + pub(crate) fn scheduler(mut self, scheduler: SchedulerRef) -> Self { + self.scheduler = Some(scheduler); + self + } + /// Creates a new compaction scheduler. pub(crate) fn mock_compaction_scheduler( &self, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 2fee8a987b06..e480b1f146df 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -24,6 +24,7 @@ use datatypes::schema::ColumnSchema; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; +use crate::manifest::action::RegionEdit; use crate::memtable::{MemtableBuilder, MemtableBuilderRef}; use crate::region::version::{Version, VersionBuilder, VersionControl}; use crate::sst::file::{FileId, FileMeta}; @@ -113,3 +114,41 @@ impl VersionControlBuilder { VersionControl::new(version) } } + +/// Add mocked l0 files to the version control. +/// `files_to_add` are slice of `(start_ms, end_ms)`. +pub(crate) fn apply_edit( + version_control: &VersionControl, + files_to_add: &[(i64, i64)], + files_to_remove: &[FileMeta], + purger: FilePurgerRef, +) { + let region_id = version_control.current().version.metadata.region_id; + let files_to_add = files_to_add + .iter() + .map(|(start_ms, end_ms)| { + FileMeta { + region_id, + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(*start_ms), + Timestamp::new_millisecond(*end_ms), + ), + level: 0, + file_size: 0, // We don't care file size. + } + }) + .collect(); + + version_control.apply_edit( + RegionEdit { + files_to_add, + files_to_remove: files_to_remove.to_vec(), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + &[], + purger, + ); +} diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 0e0f3a07ef34..79ade0f4dc9f 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -61,7 +61,7 @@ impl RegionWorkerLoop { let edit = RegionEdit { files_to_add: std::mem::take(&mut request.compaction_outputs), files_to_remove: std::mem::take(&mut request.compacted_files), - compaction_time_window: None, // TODO(hl): update window maybe + compaction_time_window: request.compaction_time_window, flushed_entry_id: None, flushed_sequence: None, }; diff --git a/tests/cases/standalone/common/select/like.result b/tests/cases/standalone/common/select/like.result index 91b648e1a83f..b47528b80e23 100644 --- a/tests/cases/standalone/common/select/like.result +++ b/tests/cases/standalone/common/select/like.result @@ -14,6 +14,7 @@ INSERT INTO TABLE host VALUES Affected Rows: 4 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM host WHERE host LIKE '%+%'; +-------------------------+------+-----+ diff --git a/tests/cases/standalone/common/select/like.sql b/tests/cases/standalone/common/select/like.sql index b4ef76bb93b7..281cef769a36 100644 --- a/tests/cases/standalone/common/select/like.sql +++ b/tests/cases/standalone/common/select/like.sql @@ -10,6 +10,7 @@ INSERT INTO TABLE host VALUES (2, 'a', 3.0), (3, 'c', 4.0); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM host WHERE host LIKE '%+%'; DROP TABLE host; From 5e1322e1f44df2d6a2ca7ba3c32932a1f15fc82a Mon Sep 17 00:00:00 2001 From: Wei <47681251+QuenKar@users.noreply.github.com> Date: Fri, 22 Sep 2023 14:28:02 +0800 Subject: [PATCH 08/36] refactor: not allowed int64 type as time index (#2460) * refactor: remove is_timestamp_compatible. * chore: fmt * refactor: remove int64 to timestamp match * chore * chore: apply suggestions from code review Co-authored-by: dennis zhuang * chore: fmt --------- Co-authored-by: dennis zhuang --- src/datatypes/src/data_type.rs | 46 ++-------------------- src/datatypes/src/schema.rs | 3 +- src/datatypes/src/schema/constraint.rs | 15 ++----- src/datatypes/src/types/binary_type.rs | 4 -- src/datatypes/src/types/boolean_type.rs | 4 -- src/datatypes/src/types/date_type.rs | 4 -- src/datatypes/src/types/datetime_type.rs | 4 -- src/datatypes/src/types/dictionary_type.rs | 4 -- src/datatypes/src/types/duration_type.rs | 3 -- src/datatypes/src/types/interval_type.rs | 3 -- src/datatypes/src/types/list_type.rs | 4 -- src/datatypes/src/types/null_type.rs | 4 -- src/datatypes/src/types/primitive_type.rs | 11 ------ src/datatypes/src/types/string_type.rs | 4 -- src/datatypes/src/types/time_type.rs | 4 -- src/datatypes/src/types/timestamp_type.rs | 4 -- src/datatypes/src/value.rs | 2 - src/mito2/src/read.rs | 8 ++-- src/storage/src/sst/parquet.rs | 3 +- src/storage/src/sst/pruning.rs | 9 +---- src/store-api/src/metadata.rs | 11 ++---- 21 files changed, 19 insertions(+), 135 deletions(-) diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 541bc6b8b3b2..938c8ba498dc 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -179,6 +179,10 @@ impl ConcreteDataType { ) } + pub fn is_timestamp(&self) -> bool { + matches!(self, ConcreteDataType::Timestamp(_)) + } + pub fn numerics() -> Vec { vec![ ConcreteDataType::int8_datatype(), @@ -217,9 +221,6 @@ impl ConcreteDataType { /// Try to cast data type as a [`TimestampType`]. pub fn as_timestamp(&self) -> Option { match self { - ConcreteDataType::Int64(_) => { - Some(TimestampType::Millisecond(TimestampMillisecondType)) - } ConcreteDataType::Timestamp(t) => Some(*t), _ => None, } @@ -473,10 +474,6 @@ pub trait DataType: std::fmt::Debug + Send + Sync { /// Creates a mutable vector with given `capacity` of this type. fn create_mutable_vector(&self, capacity: usize) -> Box; - /// Returns true if the data type is compatible with timestamp type so we can - /// use it as a timestamp. - fn is_timestamp_compatible(&self) -> bool; - /// Casts the value to specific DataType. /// Return None if cast failed. fn try_cast(&self, from: Value) -> Option; @@ -596,41 +593,6 @@ mod tests { ); } - #[test] - fn test_is_timestamp_compatible() { - assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Second).is_timestamp_compatible()); - assert!( - ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp_compatible() - ); - assert!( - ConcreteDataType::timestamp_datatype(TimeUnit::Microsecond).is_timestamp_compatible() - ); - assert!( - ConcreteDataType::timestamp_datatype(TimeUnit::Nanosecond).is_timestamp_compatible() - ); - assert!(ConcreteDataType::timestamp_second_datatype().is_timestamp_compatible()); - assert!(ConcreteDataType::timestamp_millisecond_datatype().is_timestamp_compatible()); - assert!(ConcreteDataType::timestamp_microsecond_datatype().is_timestamp_compatible()); - assert!(ConcreteDataType::timestamp_nanosecond_datatype().is_timestamp_compatible()); - assert!(ConcreteDataType::int64_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::null_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::binary_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::boolean_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::date_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::datetime_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::string_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::int32_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::uint64_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::time_second_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::time_millisecond_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::time_microsecond_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::time_nanosecond_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::duration_second_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::duration_millisecond_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::duration_microsecond_datatype().is_timestamp_compatible()); - assert!(!ConcreteDataType::duration_nanosecond_datatype().is_timestamp_compatible()); - } - #[test] fn test_is_null() { assert!(ConcreteDataType::null_datatype().is_null()); diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index fd6da64beb29..b506dd12ea00 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -23,7 +23,6 @@ use arrow::datatypes::{Field, Schema as ArrowSchema}; use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; -use crate::data_type::DataType; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY}; pub use crate::schema::constraint::ColumnDefaultConstraint; @@ -269,7 +268,7 @@ fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: us let column_schema = &column_schemas[timestamp_index]; ensure!( - column_schema.data_type.is_timestamp_compatible(), + column_schema.data_type.is_timestamp(), error::InvalidTimestampIndexSnafu { index: timestamp_index, } diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index b00a8d7dcaf7..e0dacbd5f719 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -81,7 +81,7 @@ impl ColumnDefaultConstraint { error::UnsupportedDefaultExprSnafu { expr } ); ensure!( - data_type.is_timestamp_compatible(), + data_type.is_timestamp(), error::DefaultValueTypeSnafu { reason: "return value of the function must has timestamp type", } @@ -199,7 +199,7 @@ fn create_current_timestamp_vector( let current_timestamp_vector = TimestampMillisecondVector::from_values( std::iter::repeat(util::current_time_millis()).take(num_rows), ); - if data_type.is_timestamp_compatible() { + if data_type.is_timestamp() { current_timestamp_vector.cast(data_type) } else { error::DefaultValueTypeSnafu { @@ -350,15 +350,8 @@ mod tests { // Int64 type. let data_type = ConcreteDataType::int64_datatype(); - let v = constraint - .create_default_vector(&data_type, false, 4) - .unwrap(); - assert_eq!(4, v.len()); - assert!( - matches!(v.get(0), Value::Int64(_)), - "v {:?} is not timestamp", - v.get(0) - ); + let v = constraint.create_default_vector(&data_type, false, 4); + assert!(v.is_err()); let constraint = ColumnDefaultConstraint::Function("no".to_string()); let data_type = ConcreteDataType::timestamp_millisecond_datatype(); diff --git a/src/datatypes/src/types/binary_type.rs b/src/datatypes/src/types/binary_type.rs index c9e8d7f12b6e..6f4c1f6bc0b5 100644 --- a/src/datatypes/src/types/binary_type.rs +++ b/src/datatypes/src/types/binary_type.rs @@ -54,10 +54,6 @@ impl DataType for BinaryType { Box::new(BinaryVectorBuilder::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { match from { Value::Binary(v) => Some(Value::Binary(v)), diff --git a/src/datatypes/src/types/boolean_type.rs b/src/datatypes/src/types/boolean_type.rs index df33d3862ce2..1d4b9e80a2b9 100644 --- a/src/datatypes/src/types/boolean_type.rs +++ b/src/datatypes/src/types/boolean_type.rs @@ -54,10 +54,6 @@ impl DataType for BooleanType { Box::new(BooleanVectorBuilder::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { match from { Value::Boolean(v) => Some(Value::Boolean(v)), diff --git a/src/datatypes/src/types/date_type.rs b/src/datatypes/src/types/date_type.rs index 1bc243da3a60..8bbde3a7c7f0 100644 --- a/src/datatypes/src/types/date_type.rs +++ b/src/datatypes/src/types/date_type.rs @@ -52,10 +52,6 @@ impl DataType for DateType { Box::new(DateVectorBuilder::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { match from { Value::Int32(v) => Some(Value::Date(Date::from(v))), diff --git a/src/datatypes/src/types/datetime_type.rs b/src/datatypes/src/types/datetime_type.rs index 76b432e82584..cd0e5a3cd1bf 100644 --- a/src/datatypes/src/types/datetime_type.rs +++ b/src/datatypes/src/types/datetime_type.rs @@ -50,10 +50,6 @@ impl DataType for DateTimeType { Box::new(DateTimeVectorBuilder::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { match from { Value::Int64(v) => Some(Value::DateTime(DateTime::from(v))), diff --git a/src/datatypes/src/types/dictionary_type.rs b/src/datatypes/src/types/dictionary_type.rs index fdbdd85ac1c0..cc29c41403df 100644 --- a/src/datatypes/src/types/dictionary_type.rs +++ b/src/datatypes/src/types/dictionary_type.rs @@ -85,10 +85,6 @@ impl DataType for DictionaryType { unimplemented!() } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, _: Value) -> Option { None } diff --git a/src/datatypes/src/types/duration_type.rs b/src/datatypes/src/types/duration_type.rs index 94c80a7962b3..ffc8fe92467b 100644 --- a/src/datatypes/src/types/duration_type.rs +++ b/src/datatypes/src/types/duration_type.rs @@ -98,9 +98,6 @@ macro_rules! impl_data_type_for_duration { Box::new([]::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } fn try_cast(&self, _: Value) -> Option { // TODO(QuenKar): Implement casting for duration types. diff --git a/src/datatypes/src/types/interval_type.rs b/src/datatypes/src/types/interval_type.rs index b87df2733717..1acc506cfce0 100644 --- a/src/datatypes/src/types/interval_type.rs +++ b/src/datatypes/src/types/interval_type.rs @@ -86,9 +86,6 @@ macro_rules! impl_data_type_for_interval { Box::new([]::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } fn try_cast(&self, _: Value) -> Option { // TODO(QuenKar): Implement casting for interval types. diff --git a/src/datatypes/src/types/list_type.rs b/src/datatypes/src/types/list_type.rs index 4b4769ed3862..37d620620297 100644 --- a/src/datatypes/src/types/list_type.rs +++ b/src/datatypes/src/types/list_type.rs @@ -76,10 +76,6 @@ impl DataType for ListType { )) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { match from { Value::List(v) => Some(Value::List(v)), diff --git a/src/datatypes/src/types/null_type.rs b/src/datatypes/src/types/null_type.rs index e69cdae24985..04c44c38c573 100644 --- a/src/datatypes/src/types/null_type.rs +++ b/src/datatypes/src/types/null_type.rs @@ -52,10 +52,6 @@ impl DataType for NullType { Box::::default() } - fn is_timestamp_compatible(&self) -> bool { - false - } - // Unconditional cast other type to Value::Null fn try_cast(&self, _from: Value) -> Option { Some(Value::Null) diff --git a/src/datatypes/src/types/primitive_type.rs b/src/datatypes/src/types/primitive_type.rs index 7bf90c964a3c..52e1bd30a7c6 100644 --- a/src/datatypes/src/types/primitive_type.rs +++ b/src/datatypes/src/types/primitive_type.rs @@ -271,9 +271,6 @@ macro_rules! define_non_timestamp_primitive { Box::new(PrimitiveVectorBuilder::<$DataType>::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } fn try_cast(&self, from: Value) -> Option { match from { @@ -373,10 +370,6 @@ impl DataType for Int64Type { Box::new(PrimitiveVectorBuilder::::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - true - } - fn try_cast(&self, from: Value) -> Option { match from { Value::Boolean(v) => bool_to_numeric(v).map(Value::Int64), @@ -424,10 +417,6 @@ impl DataType for Int32Type { Box::new(PrimitiveVectorBuilder::::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { match from { Value::Boolean(v) => bool_to_numeric(v).map(Value::Int32), diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index 85a970f116d6..febff36324a1 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -54,10 +54,6 @@ impl DataType for StringType { Box::new(StringVectorBuilder::with_capacity(capacity)) } - fn is_timestamp_compatible(&self) -> bool { - false - } - fn try_cast(&self, from: Value) -> Option { if from.logical_type_id() == self.logical_type_id() { return Some(from); diff --git a/src/datatypes/src/types/time_type.rs b/src/datatypes/src/types/time_type.rs index aaa9fec914e7..a8d48a7f586d 100644 --- a/src/datatypes/src/types/time_type.rs +++ b/src/datatypes/src/types/time_type.rs @@ -112,10 +112,6 @@ macro_rules! impl_data_type_for_time { Box::new([