diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index d5d790f95838..a53af7860ef7 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -28,6 +28,7 @@ use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; +pub mod alter_logical_tables; pub mod alter_table; pub mod create_logical_tables; pub mod create_table; diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs new file mode 100644 index 000000000000..a109919d2095 --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -0,0 +1,253 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod check; +mod metadata; +mod region_request; +mod update_metadata; + +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context, LockKey, Procedure, Status}; +use futures_util::future; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use strum::AsRefStr; +use table::metadata::TableId; + +use crate::ddl::utils::add_peer_context_if_needed; +use crate::ddl::DdlContext; +use crate::error::{Error, Result}; +use crate::instruction::CacheIdent; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::PhysicalTableRouteValue; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; +use crate::rpc::ddl::AlterTableTask; +use crate::rpc::router::{find_leader_regions, find_leaders}; +use crate::{cache_invalidator, metrics, ClusterId}; + +pub struct AlterLogicalTablesProcedure { + pub context: DdlContext, + pub data: AlterTablesData, +} + +impl AlterLogicalTablesProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables"; + + pub fn new( + cluster_id: ClusterId, + tasks: Vec, + physical_table_id: TableId, + context: DdlContext, + ) -> Self { + Self { + context, + data: AlterTablesData { + cluster_id, + state: AlterTablesState::Prepare, + tasks, + table_info_values: vec![], + physical_table_id, + physical_table_route: None, + cache_invalidate_keys: vec![], + }, + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + pub(crate) async fn on_prepare(&mut self) -> Result { + // Checks all the tasks + self.check_input_tasks()?; + // Fills the table info values + self.fill_table_info_values().await?; + // Checks the physical table, must after [fill_table_info_values] + self.check_physical_table().await?; + // Fills the physical table info + self.fill_physical_table_route().await?; + // Filter the tasks + let finished_tasks = self.check_finished_tasks()?; + if finished_tasks.iter().all(|x| *x) { + return Ok(Status::done()); + } + self.filter_task(&finished_tasks)?; + + // Next state + self.data.state = AlterTablesState::SubmitAlterRegionRequests; + Ok(Status::executing(true)) + } + + pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result { + // Safety: we have checked the state in on_prepare + let physical_table_route = &self.data.physical_table_route.as_ref().unwrap(); + let leaders = find_leaders(&physical_table_route.region_routes); + let mut alter_region_tasks = Vec::with_capacity(leaders.len()); + + for peer in leaders { + let requester = self.context.datanode_manager.datanode(&peer).await; + let region_numbers = find_leader_regions(&physical_table_route.region_routes, &peer); + + for region_number in region_numbers { + let request = self.make_request(region_number)?; + let peer = peer.clone(); + let requester = requester.clone(); + + alter_region_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer)) + }); + } + } + + future::join_all(alter_region_tasks) + .await + .into_iter() + .collect::>>()?; + + self.data.state = AlterTablesState::UpdateMetadata; + + Ok(Status::executing(true)) + } + + pub(crate) async fn on_update_metadata(&mut self) -> Result { + let table_info_values = self.build_update_metadata()?; + let manager = &self.context.table_metadata_manager; + let chunk_size = manager.batch_update_table_info_value_chunk_size(); + if table_info_values.len() > chunk_size { + let chunks = table_info_values + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|check| check.collect::>()) + .collect::>(); + for chunk in chunks { + manager.batch_update_table_info_values(chunk).await?; + } + } else { + manager + .batch_update_table_info_values(table_info_values) + .await?; + } + + self.data.state = AlterTablesState::InvalidateTableCache; + Ok(Status::executing(true)) + } + + pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result { + let to_invalidate = self + .data + .cache_invalidate_keys + .drain(..) + .map(CacheIdent::TableId) + .collect::>(); + self.context + .cache_invalidator + .invalidate(&cache_invalidator::Context::default(), to_invalidate) + .await?; + Ok(Status::done()) + } +} + +#[async_trait] +impl Procedure for AlterLogicalTablesProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &Context) -> ProcedureResult { + let error_handler = |e: Error| { + if e.is_retry_later() { + common_procedure::Error::retry_later(e) + } else { + common_procedure::Error::external(e) + } + }; + + let state = &self.data.state; + + let step = state.as_ref(); + + let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE + .with_label_values(&[step]) + .start_timer(); + + match state { + AlterTablesState::Prepare => self.on_prepare().await, + AlterTablesState::SubmitAlterRegionRequests => { + self.on_submit_alter_region_requests().await + } + AlterTablesState::UpdateMetadata => self.on_update_metadata().await, + AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await, + } + .map_err(error_handler) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + // CatalogLock, SchemaLock, + // TableLock + // TableNameLock(s) + let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len()); + let table_ref = self.data.tasks[0].table_ref(); + lock_key.push(CatalogLock::Read(table_ref.catalog).into()); + lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); + lock_key.push(TableLock::Write(self.data.physical_table_id).into()); + + for task in &self.data.tasks { + lock_key.push( + TableNameLock::new( + &task.alter_table.catalog_name, + &task.alter_table.schema_name, + &task.alter_table.table_name, + ) + .into(), + ); + } + LockKey::new(lock_key) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AlterTablesData { + cluster_id: ClusterId, + state: AlterTablesState, + tasks: Vec, + /// Table info values before the alter operation. + /// Corresponding one-to-one with the AlterTableTask in tasks. + table_info_values: Vec, + /// Physical table info + physical_table_id: TableId, + physical_table_route: Option, + cache_invalidate_keys: Vec, +} + +#[derive(Debug, Serialize, Deserialize, AsRefStr)] +enum AlterTablesState { + /// Prepares to alter the table + Prepare, + SubmitAlterRegionRequests, + /// Updates table metadata. + UpdateMetadata, + /// Broadcasts the invalidating table cache instruction. + InvalidateTableCache, +} diff --git a/src/common/meta/src/ddl/alter_logical_tables/check.rs b/src/common/meta/src/ddl/alter_logical_tables/check.rs new file mode 100644 index 000000000000..a4dd9b3e5b34 --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/check.rs @@ -0,0 +1,139 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::alter_expr::Kind; +use snafu::{ensure, OptionExt}; + +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::error::{AlterLogicalTablesInvalidArgumentsSnafu, Result}; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::TableRouteValue; +use crate::rpc::ddl::AlterTableTask; + +impl AlterLogicalTablesProcedure { + pub(crate) fn check_input_tasks(&self) -> Result<()> { + self.check_schema()?; + self.check_alter_kind()?; + Ok(()) + } + + pub(crate) async fn check_physical_table(&self) -> Result<()> { + let table_route_manager = self.context.table_metadata_manager.table_route_manager(); + let table_ids = self + .data + .table_info_values + .iter() + .map(|v| v.table_info.ident.table_id) + .collect::>(); + let table_routes = table_route_manager + .batch_get_table_routes(&table_ids) + .await?; + let physical_table_id = self.data.physical_table_id; + let is_same_physical_table = table_routes.iter().all(|r| { + if let Some(TableRouteValue::Logical(r)) = r { + r.physical_table_id() == physical_table_id + } else { + false + } + }); + + ensure!( + is_same_physical_table, + AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "All the tasks should have the same physical table id" + } + ); + + Ok(()) + } + + pub(crate) fn check_finished_tasks(&self) -> Result> { + let task = &self.data.tasks; + let table_info_values = &self.data.table_info_values; + let mut finished_tasks = vec![false; task.len()]; + for ((task, table), is_finished) in task + .iter() + .zip(table_info_values.iter()) + .zip(finished_tasks.iter_mut()) + { + *is_finished = Self::check_finished_task(task, table); + } + + Ok(finished_tasks) + } + + // Checks if the schemas of the tasks are the same + fn check_schema(&self) -> Result<()> { + let is_same_schema = self.data.tasks.windows(2).all(|pair| { + pair[0].alter_table.catalog_name == pair[1].alter_table.catalog_name + && pair[0].alter_table.schema_name == pair[1].alter_table.schema_name + }); + + ensure!( + is_same_schema, + AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "Schemas of the tasks are not the same" + } + ); + + Ok(()) + } + + fn check_alter_kind(&self) -> Result<()> { + for task in &self.data.tasks { + let kind = task.alter_table.kind.as_ref().context( + AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "Alter kind is missing", + }, + )?; + let Kind::AddColumns(_) = kind else { + return AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: "Only support add columns operation", + } + .fail(); + }; + } + + Ok(()) + } + + fn check_finished_task(task: &AlterTableTask, table: &TableInfoValue) -> bool { + let columns = table + .table_info + .meta + .schema + .column_schemas + .iter() + .map(|c| &c.name) + .collect::>(); + + let Some(kind) = task.alter_table.kind.as_ref() else { + return false; + }; + let Kind::AddColumns(add_columns) = kind else { + return false; + }; + + // We only check that all columns have been finished. That is to say, + // if one part is finished but another part is not, it will be considered + // unfinished. + add_columns + .add_columns + .iter() + .map(|add_column| add_column.column_def.as_ref().map(|c| &c.name)) + .all(|column| column.map(|c| columns.contains(c)).unwrap_or(false)) + } +} diff --git a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs new file mode 100644 index 000000000000..13be168263fb --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs @@ -0,0 +1,136 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_catalog::format_full_table_name; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::error::{Result, TableInfoNotFoundSnafu, TableNotFoundSnafu}; +use crate::key::table_info::TableInfoValue; +use crate::key::table_name::TableNameKey; +use crate::rpc::ddl::AlterTableTask; + +impl AlterLogicalTablesProcedure { + pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> { + self.data.tasks = self + .data + .tasks + .drain(..) + .zip(finished_tasks.iter()) + .filter_map(|(task, finished)| if *finished { None } else { Some(task) }) + .collect(); + self.data.table_info_values = self + .data + .table_info_values + .drain(..) + .zip(finished_tasks.iter()) + .filter_map(|(table_info_value, finished)| { + if *finished { + None + } else { + Some(table_info_value) + } + }) + .collect(); + self.data.cache_invalidate_keys = self + .data + .table_info_values + .iter() + .map(|table| table.table_info.ident.table_id) + .collect(); + + Ok(()) + } + + pub(crate) async fn fill_physical_table_route(&mut self) -> Result<()> { + let table_route_manager = self.context.table_metadata_manager.table_route_manager(); + let (_, physical_table_route) = table_route_manager + .get_physical_table_route(self.data.physical_table_id) + .await?; + self.data.physical_table_route = Some(physical_table_route); + + Ok(()) + } + + pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> { + let table_ids = self.get_all_table_ids().await?; + let table_info_values = self.get_all_table_info_values(&table_ids).await?; + debug_assert!(table_info_values.len() == self.data.tasks.len()); + self.data.table_info_values = table_info_values; + + Ok(()) + } + + async fn get_all_table_info_values( + &self, + table_ids: &[TableId], + ) -> Result> { + let table_info_manager = self.context.table_metadata_manager.table_info_manager(); + let mut table_info_map = table_info_manager.batch_get(table_ids).await?; + let mut table_info_values = Vec::with_capacity(table_ids.len()); + for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) { + let table_info_value = + table_info_map + .remove(table_id) + .with_context(|| TableInfoNotFoundSnafu { + table_name: extract_table_name(task), + })?; + table_info_values.push(table_info_value); + } + + Ok(table_info_values) + } + + async fn get_all_table_ids(&self) -> Result> { + let table_name_manager = self.context.table_metadata_manager.table_name_manager(); + let table_name_keys = self + .data + .tasks + .iter() + .map(|task| extract_table_name_key(task)) + .collect(); + + let table_name_values = table_name_manager.batch_get(table_name_keys).await?; + let mut table_ids = Vec::with_capacity(table_name_values.len()); + for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) { + let table_id = value + .with_context(|| TableNotFoundSnafu { + table_name: extract_table_name(task), + })? + .table_id(); + table_ids.push(table_id); + } + + Ok(table_ids) + } +} + +#[inline] +fn extract_table_name(task: &AlterTableTask) -> String { + format_full_table_name( + &task.alter_table.catalog_name, + &task.alter_table.schema_name, + &task.alter_table.table_name, + ) +} + +#[inline] +fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey { + TableNameKey::new( + &task.alter_table.catalog_name, + &task.alter_table.schema_name, + &task.alter_table.table_name, + ) +} diff --git a/src/common/meta/src/ddl/alter_logical_tables/region_request.rs b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs new file mode 100644 index 000000000000..1f6fa2e67b2d --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/region_request.rs @@ -0,0 +1,98 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1; +use api::v1::alter_expr::Kind; +use api::v1::region::{ + alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests, + RegionColumnDef, RegionRequest, RegionRequestHeader, +}; +use common_telemetry::tracing_context::TracingContext; +use store_api::storage::{RegionId, RegionNumber}; + +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::error::Result; +use crate::key::table_info::TableInfoValue; +use crate::rpc::ddl::AlterTableTask; + +impl AlterLogicalTablesProcedure { + pub(crate) fn make_request(&self, region_number: RegionNumber) -> Result { + let alter_requests = self.make_alter_region_requests(region_number)?; + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Alters(alter_requests)), + }; + + Ok(request) + } + + fn make_alter_region_requests(&self, region_number: RegionNumber) -> Result { + let mut requests = Vec::with_capacity(self.data.tasks.len()); + for (task, table) in self + .data + .tasks + .iter() + .zip(self.data.table_info_values.iter()) + { + let region_id = RegionId::new(table.table_info.ident.table_id, region_number); + let request = self.make_alter_region_request(region_id, task, table)?; + requests.push(request); + } + + Ok(AlterRequests { requests }) + } + + fn make_alter_region_request( + &self, + region_id: RegionId, + task: &AlterTableTask, + table: &TableInfoValue, + ) -> Result { + let region_id = region_id.as_u64(); + let schema_version = table.table_info.ident.version; + let kind = match &task.alter_table.kind { + Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns( + to_region_add_columns(add_columns), + )), + _ => unreachable!(), // Safety: we have checked the kind in check_input_tasks + }; + + Ok(AlterRequest { + region_id, + schema_version, + kind, + }) + } +} + +fn to_region_add_columns(add_columns: &v1::AddColumns) -> AddColumns { + let add_columns = add_columns + .add_columns + .iter() + .map(|add_column| { + let region_column_def = RegionColumnDef { + column_def: add_column.column_def.clone(), + ..Default::default() // other fields are not used in alter logical table + }; + AddColumn { + column_def: Some(region_column_def), + ..Default::default() // other fields are not used in alter logical table + } + }) + .collect(); + AddColumns { add_columns } +} diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs new file mode 100644 index 000000000000..e9ba0e72226f --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -0,0 +1,66 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_grpc_expr::alter_expr_to_request; +use snafu::ResultExt; +use table::metadata::{RawTableInfo, TableInfo}; + +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::error; +use crate::error::{ConvertAlterTableRequestSnafu, Result}; +use crate::key::table_info::TableInfoValue; +use crate::rpc::ddl::AlterTableTask; + +impl AlterLogicalTablesProcedure { + pub(crate) fn build_update_metadata(&mut self) -> Result> { + let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len()); + let table_info_values = std::mem::take(&mut self.data.table_info_values); + for (task, table) in self.data.tasks.iter().zip(table_info_values.into_iter()) { + table_info_values_to_update.push(self.build_new_table_info(task, table)?); + } + + Ok(table_info_values_to_update) + } + + fn build_new_table_info( + &self, + task: &AlterTableTask, + table: TableInfoValue, + ) -> Result<(TableInfoValue, RawTableInfo)> { + // Builds new_meta + let table_info = TableInfo::try_from(table.table_info.clone()) + .context(error::ConvertRawTableInfoSnafu)?; + let table_ref = task.table_ref(); + let request = + alter_expr_to_request(table.table_info.ident.table_id, task.alter_table.clone()) + .context(ConvertAlterTableRequestSnafu)?; + let new_meta = table_info + .meta + .builder_with_alter_kind(table_ref.table, &request.alter_kind, true) + .context(error::TableSnafu)? + .build() + .with_context(|_| error::BuildTableMetaSnafu { + table_name: table_ref.table, + })?; + let version = table_info.ident.version + 1; + let mut new_table = table_info; + new_table.meta = new_meta; + new_table.ident.version = version; + + let mut raw_table_info = RawTableInfo::from(new_table); + raw_table_info.sort_columns(); + + Ok((table, raw_table_info)) + } +} diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index e554256a1b39..7298d0a52011 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -281,7 +281,7 @@ impl AlterTableProcedure { let new_meta = table_info .meta - .builder_with_alter_kind(table_ref.table, &request.alter_kind) + .builder_with_alter_kind(table_ref.table, &request.alter_kind, false) .context(error::TableSnafu)? .build() .with_context(|_| error::BuildTableMetaSnafu { @@ -363,7 +363,8 @@ impl AlterTableProcedure { ) .into(), ); - lock_key.push(TableLock::Read(*physical_table_id).into()) + // We must acquire the write lock since ti maybe update the physical table schema + lock_key.push(TableLock::Write(*physical_table_id).into()) } let table_ref = self.data.table_ref(); diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 0c7dd792efc2..80cba554c3e7 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -54,7 +54,7 @@ use crate::{metrics, ClusterId}; pub struct CreateLogicalTablesProcedure { pub context: DdlContext, - pub creator: TablesCreator, + pub data: CreateTablesData, } impl CreateLogicalTablesProcedure { @@ -66,14 +66,22 @@ impl CreateLogicalTablesProcedure { physical_table_id: TableId, context: DdlContext, ) -> Self { - let creator = TablesCreator::new(cluster_id, tasks, physical_table_id); - Self { context, creator } + let len = tasks.len(); + let data = CreateTablesData { + cluster_id, + state: CreateTablesState::Prepare, + tasks, + table_ids_already_exists: vec![None; len], + physical_table_id, + physical_region_numbers: vec![], + physical_columns: vec![], + }; + Self { context, data } } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - let creator = TablesCreator { data }; - Ok(Self { context, creator }) + Ok(Self { context, data }) } /// On the prepares step, it performs: @@ -90,19 +98,17 @@ impl CreateLogicalTablesProcedure { let manager = &self.context.table_metadata_manager; // Sets physical region numbers - let physical_table_id = self.creator.data.physical_table_id(); + let physical_table_id = self.data.physical_table_id(); let physical_region_numbers = manager .table_route_manager() .get_physical_table_route(physical_table_id) .await .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?; - self.creator - .data + self.data .set_physical_region_numbers(physical_region_numbers); // Checks if the tables exist let table_name_keys = self - .creator .data .all_create_table_exprs() .iter() @@ -117,7 +123,7 @@ impl CreateLogicalTablesProcedure { .collect::>(); // Validates the tasks - let tasks = &mut self.creator.data.tasks; + let tasks = &mut self.data.tasks; for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) { if table_id.is_some() { // If a table already exists, we just ignore it. @@ -155,19 +161,16 @@ impl CreateLogicalTablesProcedure { // sort columns in task task.sort_columns(); - - common_telemetry::info!("[DEBUG] sorted task {:?}", task); } - self.creator - .data + self.data .set_table_ids_already_exists(already_exists_tables_ids); - self.creator.data.state = CreateTablesState::DatanodeCreateRegions; + self.data.state = CreateTablesState::DatanodeCreateRegions; Ok(Status::executing(true)) } pub async fn on_datanode_create_regions(&mut self) -> Result { - let physical_table_id = self.creator.data.physical_table_id(); + let physical_table_id = self.data.physical_table_id(); let (_, physical_table_route) = self .context .table_metadata_manager @@ -186,12 +189,12 @@ impl CreateLogicalTablesProcedure { /// - Failed to create table metadata. pub async fn on_create_metadata(&mut self) -> Result { let manager = &self.context.table_metadata_manager; - let physical_table_id = self.creator.data.physical_table_id(); - let remaining_tasks = self.creator.data.remaining_tasks(); + let physical_table_id = self.data.physical_table_id(); + let remaining_tasks = self.data.remaining_tasks(); let num_tables = remaining_tasks.len(); if num_tables > 0 { - let chunk_size = manager.max_logical_tables_per_batch(); + let chunk_size = manager.create_logical_tables_metadata_chunk_size(); if num_tables > chunk_size { let chunks = remaining_tasks .into_iter() @@ -212,28 +215,26 @@ impl CreateLogicalTablesProcedure { // The `table_id` MUST be collected after the [Prepare::Prepare], // ensures the all `table_id`s have been allocated. let table_ids = self - .creator .data .tasks .iter() .map(|task| task.table_info.ident.table_id) .collect::>(); - if !self.creator.data.physical_columns.is_empty() { + if !self.data.physical_columns.is_empty() { // fetch old physical table's info let physical_table_info = self .context .table_metadata_manager - .get_full_table_info(self.creator.data.physical_table_id) + .get_full_table_info(self.data.physical_table_id) .await? .0 .context(TableInfoNotFoundSnafu { - table_name: format!("table id - {}", self.creator.data.physical_table_id), + table_name: format!("table id - {}", self.data.physical_table_id), })?; // generate new table info let new_table_info = self - .creator .data .build_new_physical_table_info(&physical_table_info); @@ -248,7 +249,7 @@ impl CreateLogicalTablesProcedure { .cache_invalidator .invalidate( &Context::default(), - vec![CacheIdent::TableId(self.creator.data.physical_table_id)], + vec![CacheIdent::TableId(self.data.physical_table_id)], ) .await?; } else { @@ -275,7 +276,7 @@ impl CreateLogicalTablesProcedure { datanode: &Peer, region_routes: &[RegionRoute], ) -> Result { - let create_tables_data = &self.creator.data; + let create_tables_data = &self.data; let tasks = &create_tables_data.tasks; let physical_table_id = create_tables_data.physical_table_id(); let regions = find_leader_regions(region_routes, datanode); @@ -332,7 +333,7 @@ impl CreateLogicalTablesProcedure { .collect::>>()?; if raw_schemas.is_empty() { - self.creator.data.state = CreateTablesState::CreateMetadata; + self.data.state = CreateTablesState::CreateMetadata; return Ok(Status::executing(false)); } @@ -350,12 +351,12 @@ impl CreateLogicalTablesProcedure { if let Some(raw_schema) = first { let physical_columns = ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?; - self.creator.data.physical_columns = physical_columns; + self.data.physical_columns = physical_columns; } else { warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } - self.creator.data.state = CreateTablesState::CreateMetadata; + self.data.state = CreateTablesState::CreateMetadata; // Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage. Ok(Status::executing(false)) @@ -369,7 +370,7 @@ impl Procedure for CreateLogicalTablesProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let state = &self.creator.data.state; + let state = &self.data.state; let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES .with_label_values(&[state.as_ref()]) @@ -384,20 +385,20 @@ impl Procedure for CreateLogicalTablesProcedure { } fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.creator.data).context(ToJsonSnafu) + serde_json::to_string(&self.data).context(ToJsonSnafu) } fn lock_key(&self) -> LockKey { // CatalogLock, SchemaLock, // TableLock // TableNameLock(s) - let mut lock_key = Vec::with_capacity(2 + 1 + self.creator.data.tasks.len()); - let table_ref = self.creator.data.tasks[0].table_ref(); + let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len()); + let table_ref = self.data.tasks[0].table_ref(); lock_key.push(CatalogLock::Read(table_ref.catalog).into()); lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); - lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into()); + lock_key.push(TableLock::Write(self.data.physical_table_id()).into()); - for task in &self.creator.data.tasks { + for task in &self.data.tasks { lock_key.push( TableNameLock::new( &task.create_table.catalog_name, @@ -411,33 +412,6 @@ impl Procedure for CreateLogicalTablesProcedure { } } -pub struct TablesCreator { - /// The serializable data. - pub data: CreateTablesData, -} - -impl TablesCreator { - pub fn new( - cluster_id: ClusterId, - tasks: Vec, - physical_table_id: TableId, - ) -> Self { - let len = tasks.len(); - - Self { - data: CreateTablesData { - cluster_id, - state: CreateTablesState::Prepare, - tasks, - table_ids_already_exists: vec![None; len], - physical_table_id, - physical_region_numbers: vec![], - physical_columns: vec![], - }, - } - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct CreateTablesData { cluster_id: ClusterId, diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index e7e1992b337b..d869af7c90d2 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -198,8 +198,10 @@ mod tests { use table::metadata::RawTableInfo; use super::*; - use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; - use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; + use crate::ddl::test_util::columns::TestColumnDefBuilder; + use crate::ddl::test_util::create_table::{ + build_raw_table_info_from_expr, TestCreateTableExprBuilder, + }; use crate::table_name::TableName; use crate::test_util::{new_ddl_context, MockDatanodeManager}; diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 239a655fb2cb..0245d4fc905a 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod alter_table; +pub mod columns; pub mod create_table; - -pub use create_table::{ - TestColumnDef, TestColumnDefBuilder, TestCreateTableExpr, TestCreateTableExprBuilder, -}; diff --git a/src/common/meta/src/ddl/test_util/alter_table.rs b/src/common/meta/src/ddl/test_util/alter_table.rs new file mode 100644 index 000000000000..13da97e3d82e --- /dev/null +++ b/src/common/meta/src/ddl/test_util/alter_table.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::alter_expr::Kind; +use api::v1::{AddColumn, AddColumns, AlterExpr, ColumnDef, RenameTable}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use derive_builder::Builder; + +#[derive(Default, Builder)] +#[builder(default)] +pub struct TestAlterTableExpr { + #[builder(setter(into), default = "DEFAULT_CATALOG_NAME.to_string()")] + catalog_name: String, + #[builder(setter(into), default = "DEFAULT_SCHEMA_NAME.to_string()")] + schema_name: String, + #[builder(setter(into))] + table_name: String, + #[builder(setter(into))] + add_columns: Vec, + #[builder(setter(into))] + new_table_name: Option, +} + +impl From for AlterExpr { + fn from(value: TestAlterTableExpr) -> Self { + if let Some(new_table_name) = value.new_table_name { + Self { + catalog_name: value.catalog_name, + schema_name: value.schema_name, + table_name: value.table_name, + kind: Some(Kind::RenameTable(RenameTable { new_table_name })), + } + } else { + Self { + catalog_name: value.catalog_name, + schema_name: value.schema_name, + table_name: value.table_name, + kind: Some(Kind::AddColumns(AddColumns { + add_columns: value + .add_columns + .into_iter() + .map(|col| AddColumn { + column_def: Some(col), + location: None, + }) + .collect(), + })), + } + } + } +} diff --git a/src/common/meta/src/ddl/test_util/columns.rs b/src/common/meta/src/ddl/test_util/columns.rs new file mode 100644 index 000000000000..9e258939b021 --- /dev/null +++ b/src/common/meta/src/ddl/test_util/columns.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::{ColumnDataType, ColumnDef, SemanticType}; +use derive_builder::Builder; + +#[derive(Default, Builder)] +pub struct TestColumnDef { + #[builder(setter(into), default)] + name: String, + data_type: ColumnDataType, + #[builder(default)] + is_nullable: bool, + semantic_type: SemanticType, + #[builder(setter(into), default)] + comment: String, +} + +impl From for ColumnDef { + fn from( + TestColumnDef { + name, + data_type, + is_nullable, + semantic_type, + comment, + }: TestColumnDef, + ) -> Self { + Self { + name, + data_type: data_type as i32, + is_nullable, + default_constraint: vec![], + semantic_type: semantic_type as i32, + comment, + datatype_extension: None, + } + } +} diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index 12b13d74f93c..eb80d8c16007 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use api::v1::column_def::try_as_column_schema; -use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType}; +use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE}; use datatypes::schema::RawSchema; @@ -24,40 +24,6 @@ use store_api::storage::TableId; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; -#[derive(Default, Builder)] -pub struct TestColumnDef { - #[builder(setter(into), default)] - name: String, - data_type: ColumnDataType, - #[builder(default)] - is_nullable: bool, - semantic_type: SemanticType, - #[builder(setter(into), default)] - comment: String, -} - -impl From for ColumnDef { - fn from( - TestColumnDef { - name, - data_type, - is_nullable, - semantic_type, - comment, - }: TestColumnDef, - ) -> Self { - Self { - name, - data_type: data_type as i32, - is_nullable, - default_constraint: vec![], - semantic_type: semantic_type as i32, - comment, - datatype_extension: None, - } - } -} - #[derive(Default, Builder)] #[builder(default)] pub struct TestCreateTableExpr { diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index 5ea7d6a85803..fcbe52189a84 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod alter_logical_tables; mod create_logical_tables; mod create_table; diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs new file mode 100644 index 000000000000..9be0f4c584de --- /dev/null +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -0,0 +1,412 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::sync::Arc; + +use api::v1::{ColumnDataType, SemanticType}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure::{Procedure, ProcedureId, Status}; +use common_procedure_test::MockContextProvider; +use table::metadata::TableId; + +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; +use crate::ddl::test_util::columns::TestColumnDefBuilder; +use crate::ddl::tests::create_logical_tables; +use crate::ddl::tests::create_logical_tables::{ + test_create_physical_table_task, NaiveDatanodeHandler, +}; +use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; +use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound}; +use crate::key::table_name::TableNameKey; +use crate::rpc::ddl::AlterTableTask; +use crate::test_util::{new_ddl_context, MockDatanodeManager}; +use crate::ClusterId; + +fn make_alter_logical_table_add_column_task( + schema: Option<&str>, + table: &str, + add_columns: Vec, +) -> AlterTableTask { + let add_columns = add_columns + .into_iter() + .map(|name| { + TestColumnDefBuilder::default() + .name(name) + .data_type(ColumnDataType::String) + .is_nullable(true) + .semantic_type(SemanticType::Tag) + .comment("new column".to_string()) + .build() + .unwrap() + .into() + }) + .collect::>(); + let mut alter_table = TestAlterTableExprBuilder::default(); + if let Some(schema) = schema { + alter_table.schema_name(schema.to_string()); + } + let alter_table = alter_table + .table_name(table.to_string()) + .add_columns(add_columns) + .build() + .unwrap(); + + AlterTableTask { + alter_table: alter_table.into(), + } +} + +fn make_alter_logical_table_rename_task( + schema: &str, + table: &str, + new_table_name: &str, +) -> AlterTableTask { + let alter_table = TestAlterTableExprBuilder::default() + .schema_name(schema.to_string()) + .table_name(table.to_string()) + .new_table_name(new_table_name.to_string()) + .build() + .unwrap(); + + AlterTableTask { + alter_table: alter_table.into(), + } +} + +#[tokio::test] +async fn test_on_prepare_check_schema() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let tasks = vec![ + make_alter_logical_table_add_column_task( + Some("schema1"), + "table1", + vec!["column1".to_string()], + ), + make_alter_logical_table_add_column_task( + Some("schema2"), + "table2", + vec!["column2".to_string()], + ), + ]; + let physical_table_id = 1024u32; + let mut procedure = + AlterLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, AlterLogicalTablesInvalidArguments { .. }); +} + +#[tokio::test] +async fn test_on_prepare_check_alter_kind() { + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + let cluster_id = 1; + let tasks = vec![make_alter_logical_table_rename_task( + "schema1", + "table1", + "new_table1", + )]; + let physical_table_id = 1024u32; + let mut procedure = + AlterLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, AlterLogicalTablesInvalidArguments { .. }); +} + +async fn create_physical_table( + ddl_context: DdlContext, + cluster_id: ClusterId, + name: &str, +) -> TableId { + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task(name); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_logical_tables::create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + table_route, + ) + .await; + + table_id +} + +async fn create_logical_table( + ddl_context: DdlContext, + cluster_id: ClusterId, + physical_table_id: TableId, + table_name: &str, +) { + let tasks = vec![create_logical_tables::test_create_logical_table_task( + table_name, + )]; + let mut procedure = + CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + let status = procedure.on_create_metadata().await.unwrap(); + assert_matches!(status, Status::Done { .. }); +} + +#[tokio::test] +async fn test_on_prepare_different_physical_table() { + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + + let phy1_id = create_physical_table(ddl_context.clone(), cluster_id, "phy1").await; + create_logical_table(ddl_context.clone(), cluster_id, phy1_id, "table1").await; + let phy2_id = create_physical_table(ddl_context.clone(), cluster_id, "phy2").await; + create_logical_table(ddl_context.clone(), cluster_id, phy2_id, "table2").await; + + let tasks = vec![ + make_alter_logical_table_add_column_task(None, "table1", vec!["column1".to_string()]), + make_alter_logical_table_add_column_task(None, "table2", vec!["column2".to_string()]), + ]; + + let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy1_id, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, AlterLogicalTablesInvalidArguments { .. }); +} + +#[tokio::test] +async fn test_on_prepare_logical_table_not_exists() { + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + + // Creates physical table + let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + // Creates 3 logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + + let tasks = vec![ + make_alter_logical_table_add_column_task(None, "table1", vec!["column1".to_string()]), + // table2 not exists + make_alter_logical_table_add_column_task(None, "table2", vec!["column2".to_string()]), + ]; + + let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, TableNotFound { .. }); +} + +#[tokio::test] +async fn test_on_prepare() { + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(datanode_manager); + + // Creates physical table + let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + // Creates 3 logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await; + + let tasks = vec![ + make_alter_logical_table_add_column_task(None, "table1", vec!["column1".to_string()]), + make_alter_logical_table_add_column_task(None, "table2", vec!["column2".to_string()]), + make_alter_logical_table_add_column_task(None, "table3", vec!["column3".to_string()]), + ]; + + let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context); + let result = procedure.on_prepare().await; + assert_matches!(result, Ok(Status::Executing { persist: true })); +} + +#[tokio::test] +async fn test_on_update_metadata() { + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + + // Creates physical table + let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + // Creates 3 logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table3").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table4").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table5").await; + + let tasks = vec![ + make_alter_logical_table_add_column_task(None, "table1", vec!["new_col".to_string()]), + make_alter_logical_table_add_column_task(None, "table2", vec!["mew_col".to_string()]), + make_alter_logical_table_add_column_task(None, "table3", vec!["new_col".to_string()]), + ]; + + let mut procedure = AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context); + let mut status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + + let ctx = common_procedure::Context { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + // on_submit_alter_region_requests + status = procedure.execute(&ctx).await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + // on_update_metadata + status = procedure.execute(&ctx).await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); +} + +#[tokio::test] +async fn test_on_part_duplicate_alter_request() { + let cluster_id = 1; + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(datanode_manager); + + // Creates physical table + let phy_id = create_physical_table(ddl_context.clone(), cluster_id, "phy").await; + // Creates 3 logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table2").await; + + let tasks = vec![ + make_alter_logical_table_add_column_task(None, "table1", vec!["col_0".to_string()]), + make_alter_logical_table_add_column_task(None, "table2", vec!["col_0".to_string()]), + ]; + + let mut procedure = + AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context.clone()); + let mut status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + + let ctx = common_procedure::Context { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + // on_submit_alter_region_requests + status = procedure.execute(&ctx).await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + // on_update_metadata + status = procedure.execute(&ctx).await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + + // re-alter + let tasks = vec![ + make_alter_logical_table_add_column_task( + None, + "table1", + vec!["col_0".to_string(), "new_col_1".to_string()], + ), + make_alter_logical_table_add_column_task( + None, + "table2", + vec![ + "col_0".to_string(), + "new_col_2".to_string(), + "new_col_1".to_string(), + ], + ), + ]; + + let mut procedure = + AlterLogicalTablesProcedure::new(cluster_id, tasks, phy_id, ddl_context.clone()); + let mut status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + + let ctx = common_procedure::Context { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + // on_submit_alter_region_requests + status = procedure.execute(&ctx).await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + // on_update_metadata + status = procedure.execute(&ctx).await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + + let table_name_keys = vec![ + TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table1"), + TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table2"), + ]; + let table_ids = ddl_context + .table_metadata_manager + .table_name_manager() + .batch_get(table_name_keys) + .await + .unwrap() + .into_iter() + .map(|x| x.unwrap().table_id()) + .collect::>(); + let tables = ddl_context + .table_metadata_manager + .table_info_manager() + .batch_get(&table_ids) + .await + .unwrap(); + + let table1 = tables.get(&table_ids[0]).unwrap(); + let table2 = tables.get(&table_ids[1]).unwrap(); + assert_eq!(table1.table_info.name, "table1"); + assert_eq!(table2.table_info.name, "table2"); + + let table1_cols = table1 + .table_info + .meta + .schema + .column_schemas + .iter() + .map(|x| x.name.clone()) + .collect::>(); + assert_eq!( + table1_cols, + vec![ + "col_0".to_string(), + "cpu".to_string(), + "host".to_string(), + "new_col_1".to_string(), + "ts".to_string() + ] + ); + + let table2_cols = table2 + .table_info + .meta + .schema + .column_schemas + .iter() + .map(|x| x.name.clone()) + .collect::>(); + assert_eq!( + table2_cols, + vec![ + "col_0".to_string(), + "cpu".to_string(), + "host".to_string(), + "new_col_1".to_string(), + "new_col_2".to_string(), + "ts".to_string() + ] + ); +} diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index 7f82d372ca05..7223c2e43458 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -30,8 +30,10 @@ use table::metadata::RawTableInfo; use crate::datanode_manager::HandleResponse; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; -use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; +use crate::ddl::test_util::columns::TestColumnDefBuilder; +use crate::ddl::test_util::create_table::{ + build_raw_table_info_from_expr, TestCreateTableExprBuilder, +}; use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; use crate::error::{Error, Result}; use crate::key::table_route::TableRouteValue; @@ -41,7 +43,7 @@ use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager // Note: this code may be duplicated with others. // However, it's by design, ensures the tests are easy to be modified or added. -fn test_create_logical_table_task(name: &str) -> CreateTableTask { +pub(crate) fn test_create_logical_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() .column_defs([ TestColumnDefBuilder::default() @@ -86,7 +88,7 @@ fn test_create_logical_table_task(name: &str) -> CreateTableTask { // Note: this code may be duplicated with others. // However, it's by design, ensures the tests are easy to be modified or added. -fn test_create_physical_table_task(name: &str) -> CreateTableTask { +pub(crate) fn test_create_physical_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() .column_defs([ TestColumnDefBuilder::default() @@ -135,7 +137,7 @@ async fn test_on_prepare_physical_table_not_found() { assert_matches!(err, Error::TableRouteNotFound { .. }); } -async fn create_physical_table_metadata( +pub(crate) async fn create_physical_table_metadata( ddl_context: &DdlContext, table_info: RawTableInfo, table_route: TableRouteValue, diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 3040ae6d2f25..2ba289488dea 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -28,8 +28,10 @@ use common_telemetry::debug; use crate::datanode_manager::HandleResponse; use crate::ddl::create_table::CreateTableProcedure; -use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; -use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; +use crate::ddl::test_util::columns::TestColumnDefBuilder; +use crate::ddl::test_util::create_table::{ + build_raw_table_info_from_expr, TestCreateTableExprBuilder, +}; use crate::error; use crate::error::{Error, Result}; use crate::key::table_route::TableRouteValue; diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index e8fc3ef5fe38..de6171d4efcf 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -19,9 +19,7 @@ use snafu::{ensure, location, Location, OptionExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use table::metadata::TableId; -use crate::error::{ - EmptyCreateTableTasksSnafu, Error, Result, TableNotFoundSnafu, UnsupportedSnafu, -}; +use crate::error::{Error, Result, TableNotFoundSnafu, UnsupportedSnafu}; use crate::key::table_name::TableNameKey; use crate::key::TableMetadataManagerRef; use crate::peer::Peer; @@ -98,7 +96,8 @@ pub async fn check_and_get_physical_table_id( None => Some(current_physical_table_name), }; } - let physical_table_name = physical_table_name.context(EmptyCreateTableTasksSnafu)?; + // Safety: `physical_table_name` is `Some` here + let physical_table_name = physical_table_name.unwrap(); table_metadata_manager .table_name_manager() .get(physical_table_name) @@ -108,3 +107,22 @@ pub async fn check_and_get_physical_table_id( }) .map(|table| table.table_id()) } + +pub async fn get_physical_table_id( + table_metadata_manager: &TableMetadataManagerRef, + logical_table_name: TableNameKey<'_>, +) -> Result { + let logical_table_id = table_metadata_manager + .table_name_manager() + .get(logical_table_name) + .await? + .context(TableNotFoundSnafu { + table_name: logical_table_name.to_string(), + }) + .map(|table| table.table_id())?; + + table_metadata_manager + .table_route_manager() + .get_physical_table_id(logical_table_id) + .await +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 5f712fbdd8a6..ef63153f4e02 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -22,6 +22,7 @@ use store_api::storage::TableId; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; @@ -30,7 +31,7 @@ use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; use crate::error::{ - self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, + self, EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; @@ -133,6 +134,20 @@ impl DdlManager { let context = self.create_context(); + self.procedure_manager + .register_loader( + AlterLogicalTablesProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + AlterLogicalTablesProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(RegisterProcedureLoaderSnafu { + type_name: AlterLogicalTablesProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + self.procedure_manager .register_loader( DropTableProcedure::TYPE_NAME, @@ -215,7 +230,7 @@ impl DdlManager { } #[tracing::instrument(skip_all)] - /// Submits and executes a create table task. + /// Submits and executes a create multiple logical table tasks. pub async fn submit_create_logical_table_tasks( &self, cluster_id: ClusterId, @@ -236,6 +251,28 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes alter multiple table tasks. + pub async fn submit_alter_logical_table_tasks( + &self, + cluster_id: ClusterId, + alter_table_tasks: Vec, + physical_table_id: TableId, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + + let procedure = AlterLogicalTablesProcedure::new( + cluster_id, + alter_table_tasks, + physical_table_id, + context, + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a drop table task. pub async fn submit_drop_table_task( @@ -490,7 +527,12 @@ async fn handle_create_logical_table_tasks( cluster_id: ClusterId, mut create_table_tasks: Vec, ) -> Result { - ensure!(!create_table_tasks.is_empty(), EmptyCreateTableTasksSnafu); + ensure!( + !create_table_tasks.is_empty(), + EmptyDdlTasksSnafu { + name: "create logical tables" + } + ); let physical_table_id = utils::check_and_get_physical_table_id( &ddl_manager.table_metadata_manager, &create_table_tasks, @@ -529,6 +571,42 @@ async fn handle_create_logical_table_tasks( }) } +async fn handle_alter_logical_table_tasks( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + alter_table_tasks: Vec, +) -> Result { + ensure!( + !alter_table_tasks.is_empty(), + EmptyDdlTasksSnafu { + name: "alter logical tables" + } + ); + + // Use the physical table id in the first logical table, then it will be checked in the procedure. + let first_table = TableNameKey { + catalog: &alter_table_tasks[0].alter_table.catalog_name, + schema: &alter_table_tasks[0].alter_table.schema_name, + table: &alter_table_tasks[0].alter_table.table_name, + }; + let physical_table_id = + utils::get_physical_table_id(&ddl_manager.table_metadata_manager, first_table).await?; + let num_logical_tables = alter_table_tasks.len(); + + let (id, _) = ddl_manager + .submit_alter_logical_table_tasks(cluster_id, alter_table_tasks, physical_table_id) + .await?; + + info!("{num_logical_tables} logical tables on physical table: {physical_table_id:?} is altered via procedure_id {id:?}"); + + let procedure_id = id.to_string(); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + /// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it. #[async_trait::async_trait] impl ProcedureExecutor for DdlManager { @@ -562,8 +640,10 @@ impl ProcedureExecutor for DdlManager { CreateLogicalTables(create_table_tasks) => { handle_create_logical_table_tasks(self, cluster_id, create_table_tasks).await } + AlterLogicalTables(alter_table_tasks) => { + handle_alter_logical_table_tasks(self, cluster_id, alter_table_tasks).await + } DropLogicalTables(_) => todo!(), - AlterLogicalTables(_) => todo!(), } } .trace(span) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 56823fd2e9ab..b3cf8e9f63f4 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -392,11 +392,14 @@ pub enum Error { #[snafu(display("Unexpected table route type: {}", err_msg))] UnexpectedLogicalRouteTable { location: Location, err_msg: String }, - #[snafu(display("The tasks of create tables cannot be empty"))] - EmptyCreateTableTasks { location: Location }, + #[snafu(display("The tasks of {} cannot be empty", name))] + EmptyDdlTasks { name: String, location: Location }, #[snafu(display("Metadata corruption: {}", err_msg))] MetadataCorruption { err_msg: String, location: Location }, + + #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))] + AlterLogicalTablesInvalidArguments { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -456,7 +459,8 @@ impl ErrorExt for Error { ProcedureNotFound { .. } | PrimaryKeyNotFound { .. } | EmptyKey { .. } - | InvalidEngineType { .. } => StatusCode::InvalidArguments, + | InvalidEngineType { .. } + | AlterLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments, TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, @@ -472,7 +476,7 @@ impl ErrorExt for Error { InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), - ParseProcedureId { .. } | InvalidNumTopics { .. } | EmptyCreateTableTasks { .. } => { + ParseProcedureId { .. } | InvalidNumTopics { .. } | EmptyDdlTasks { .. } => { StatusCode::InvalidArguments } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index acc5e38c1a9e..c9b0644d84e0 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -457,7 +457,7 @@ impl TableMetadataManager { Ok(()) } - pub fn max_logical_tables_per_batch(&self) -> usize { + pub fn create_logical_tables_metadata_chunk_size(&self) -> usize { // The batch size is max_txn_size / 3 because the size of the `tables_data` // is 3 times the size of the `tables_data`. self.kv_backend.max_txn_ops() / 3 @@ -682,6 +682,64 @@ impl TableMetadataManager { Ok(()) } + pub fn batch_update_table_info_value_chunk_size(&self) -> usize { + self.kv_backend.max_txn_ops() + } + + pub async fn batch_update_table_info_values( + &self, + table_info_value_pairs: Vec<(TableInfoValue, RawTableInfo)>, + ) -> Result<()> { + let len = table_info_value_pairs.len(); + let mut txns = Vec::with_capacity(len); + struct OnFailure + where + F: FnOnce(&Vec) -> R, + { + table_info_value: TableInfoValue, + on_update_table_info_failure: F, + } + let mut on_failures = Vec::with_capacity(len); + + for (table_info_value, new_table_info) in table_info_value_pairs { + let table_id = table_info_value.table_info.ident.table_id; + + let new_table_info_value = table_info_value.update(new_table_info); + + let (update_table_info_txn, on_update_table_info_failure) = + self.table_info_manager().build_update_txn( + table_id, + &DeserializedValueWithBytes::from_inner(table_info_value), + &new_table_info_value, + )?; + + txns.push(update_table_info_txn); + + on_failures.push(OnFailure { + table_info_value: new_table_info_value, + on_update_table_info_failure, + }); + } + + let txn = Txn::merge_all(txns); + let r = self.kv_backend.txn(txn).await?; + + if !r.succeeded { + for on_failure in on_failures { + let remote_table_info = (on_failure.on_update_table_info_failure)(&r.responses)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty table info during the updating table info", + })? + .into_inner(); + + let op_name = "the batch updating table info"; + ensure_values!(remote_table_info, on_failure.table_info_value, op_name); + } + } + + Ok(()) + } + pub async fn update_table_route( &self, table_id: TableId, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 3b2e643176d9..95fb2f6a1152 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -397,6 +397,14 @@ impl TableRouteManager { Ok(physical_table_routes) } + /// Returns [TableRouteValue::Physical]s or [TableRouteValue::Logical]s by the given `table_ids`. + pub async fn batch_get_table_routes( + &self, + table_ids: &[TableId], + ) -> Result>> { + self.storage.batch_get(table_ids).await + } + /// Returns [`RegionDistribution`] of the table(`table_id`). pub async fn get_region_distribution( &self, diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index b97924aa9f25..a532503114a9 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -22,7 +22,7 @@ use api::v1::meta::{ DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; -use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, SemanticType, TruncateTableExpr}; +use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr}; use base64::engine::general_purpose; use base64::Engine as _; use prost::Message; @@ -63,6 +63,15 @@ impl DdlTask { ) } + pub fn new_alter_logical_tables(table_data: Vec) -> Self { + DdlTask::AlterLogicalTables( + table_data + .into_iter() + .map(|alter_table| AlterTableTask { alter_table }) + .collect(), + ) + } + pub fn new_drop_table( catalog: String, schema: String, @@ -380,31 +389,7 @@ impl CreateTableTask { .column_defs .sort_unstable_by(|a, b| a.name.cmp(&b.name)); - // compute new indices of sorted columns - // this part won't do any check or verification. - let mut primary_key_indices = Vec::with_capacity(self.create_table.primary_keys.len()); - let mut value_indices = - Vec::with_capacity(self.create_table.column_defs.len() - primary_key_indices.len() - 1); - let mut timestamp_index = None; - for (index, col) in self.create_table.column_defs.iter().enumerate() { - if self.create_table.primary_keys.contains(&col.name) { - primary_key_indices.push(index); - } else if col.semantic_type == SemanticType::Timestamp as i32 { - timestamp_index = Some(index); - } else { - value_indices.push(index); - } - } - - // overwrite table info - self.table_info - .meta - .schema - .column_schemas - .sort_unstable_by(|a, b| a.name.cmp(&b.name)); - self.table_info.meta.schema.timestamp_index = timestamp_index; - self.table_info.meta.primary_key_indices = primary_key_indices; - self.table_info.meta.value_indices = value_indices; + self.table_info.sort_columns(); } } @@ -644,7 +629,8 @@ mod tests { "column1".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), false, - ), + ) + .with_time_index(true), ColumnSchema::new( "column2".to_string(), ConcreteDataType::float64_datatype(), diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 03648168e032..3caf1b63dd5c 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -32,8 +32,10 @@ use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, Crea use common_meta::ddl::create_table::*; use common_meta::ddl::drop_table::executor::DropTableExecutor; use common_meta::ddl::drop_table::DropTableProcedure; -use common_meta::ddl::test_util::create_table::build_raw_table_info_from_expr; -use common_meta::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; +use common_meta::ddl::test_util::columns::TestColumnDefBuilder; +use common_meta::ddl::test_util::create_table::{ + build_raw_table_info_from_expr, TestCreateTableExprBuilder, +}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::DeserializedValueWithBytes; @@ -303,7 +305,7 @@ async fn test_on_datanode_create_logical_regions() { let status = procedure.on_datanode_create_regions().await.unwrap(); assert!(matches!(status, Status::Executing { persist: false })); assert!(matches!( - procedure.creator.data.state(), + procedure.data.state(), &CreateTablesState::CreateMetadata )); diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 55b4c79238cb..731fbe288b3e 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -508,23 +508,20 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Do not support creating tables in multiple catalogs: {}", - catalog_names - ))] - CreateTableWithMultiCatalogs { - catalog_names: String, + #[snafu(display("Do not support {} in multiple catalogs", ddl_name))] + DdlWithMultiCatalogs { + ddl_name: String, location: Location, }, - #[snafu(display("Do not support creating tables in multiple schemas: {}", schema_names))] - CreateTableWithMultiSchemas { - schema_names: String, + #[snafu(display("Do not support {} in multiple schemas", ddl_name))] + DdlWithMultiSchemas { + ddl_name: String, location: Location, }, - #[snafu(display("Empty creating table expr"))] - EmptyCreateTableExpr { location: Location }, + #[snafu(display("Empty {} expr", name))] + EmptyDdlExpr { name: String, location: Location }, #[snafu(display("Failed to create logical tables: {}", reason))] CreateLogicalTables { reason: String, location: Location }, @@ -650,9 +647,9 @@ impl ErrorExt for Error { Error::ColumnDefaultValue { source, .. } => source.status_code(), - Error::CreateTableWithMultiCatalogs { .. } - | Error::CreateTableWithMultiSchemas { .. } - | Error::EmptyCreateTableExpr { .. } + Error::DdlWithMultiCatalogs { .. } + | Error::DdlWithMultiSchemas { .. } + | Error::EmptyDdlExpr { .. } | Error::InvalidPartitionRule { .. } | Error::ParseSqlValue { .. } => StatusCode::InvalidArguments, diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 526c540d2ea6..5460fe9a59b3 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -256,6 +256,7 @@ impl Inserter { statement_executor: &StatementExecutor, ) -> Result<()> { let mut create_tables = vec![]; + let mut alter_tables = vec![]; for req in &requests.inserts { let catalog = ctx.current_catalog(); let schema = ctx.current_schema(); @@ -264,16 +265,19 @@ impl Inserter { Some(table) => { // TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`) validate_request_with_table(req, &table)?; - self.alter_table_on_demand(req, table, ctx, statement_executor) - .await? + let alter_expr = self.get_alter_table_expr_on_demand(req, table, ctx)?; + if let Some(alter_expr) = alter_expr { + alter_tables.push(alter_expr); + } } None => { create_tables.push(req); } } } - if !create_tables.is_empty() { - if let Some(on_physical_table) = on_physical_table { + + if let Some(on_physical_table) = on_physical_table { + if !create_tables.is_empty() { // Creates logical tables in batch. self.create_logical_tables( create_tables, @@ -282,10 +286,19 @@ impl Inserter { statement_executor, ) .await?; - } else { - for req in create_tables { - self.create_table(req, ctx, statement_executor).await?; - } + } + if !alter_tables.is_empty() { + // Alter logical tables in batch. + statement_executor + .alter_logical_tables(alter_tables) + .await?; + } + } else { + for req in create_tables { + self.create_table(req, ctx, statement_executor).await?; + } + for alter_expr in alter_tables.into_iter() { + statement_executor.alter_table_inner(alter_expr).await?; } } @@ -364,13 +377,12 @@ impl Inserter { .context(CatalogSnafu) } - async fn alter_table_on_demand( + fn get_alter_table_expr_on_demand( &self, req: &RowInsertRequest, table: TableRef, ctx: &QueryContextRef, - statement_executor: &StatementExecutor, - ) -> Result<()> { + ) -> Result> { let catalog_name = ctx.current_catalog(); let schema_name = ctx.current_schema(); let table_name = table.table_info().name.clone(); @@ -380,39 +392,15 @@ impl Inserter { let add_columns = extract_new_columns(&table.schema(), column_exprs) .context(FindNewColumnsOnInsertionSnafu)?; let Some(add_columns) = add_columns else { - return Ok(()); + return Ok(None); }; - info!( - "Adding new columns: {:?} to table: {}.{}.{}", - add_columns, catalog_name, schema_name, table_name - ); - - let alter_table_expr = AlterExpr { + Ok(Some(AlterExpr { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), table_name: table_name.to_string(), kind: Some(Kind::AddColumns(add_columns)), - }; - - let res = statement_executor.alter_table_inner(alter_table_expr).await; - - match res { - Ok(_) => { - info!( - "Successfully added new columns to table: {}.{}.{}", - catalog_name, schema_name, table_name - ); - Ok(()) - } - Err(err) => { - error!( - "Failed to add new columns to table: {}.{}.{}: {}", - catalog_name, schema_name, table_name, err - ); - Err(err) - } - } + })) } /// Create a table with schema from insert request. diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 4069332b2c6a..6774b26decac 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -26,6 +26,11 @@ lazy_static! { "table operator create table" ) .unwrap(); + pub static ref DIST_ALTER_TABLES: Histogram = register_histogram!( + "greptime_table_operator_alter_tables", + "table operator alter table" + ) + .unwrap(); pub static ref DIST_INGEST_ROW_COUNT: IntCounter = register_int_counter!( "greptime_table_operator_ingest_rows", "table operator ingest rows" diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index ea66296462ce..ce48be482938 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; @@ -57,8 +57,8 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu, - CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu, + CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu, + DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, @@ -242,20 +242,18 @@ impl StatementExecutor { create_table_exprs: &[CreateTableExpr], ) -> Result> { let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer(); - ensure!(!create_table_exprs.is_empty(), EmptyCreateTableExprSnafu); + ensure!( + !create_table_exprs.is_empty(), + EmptyDdlExprSnafu { + name: "create table" + } + ); ensure!( create_table_exprs .windows(2) .all(|expr| expr[0].catalog_name == expr[1].catalog_name), - CreateTableWithMultiCatalogsSnafu { - catalog_names: create_table_exprs - .iter() - .map(|x| x.catalog_name.as_str()) - .collect::>() - .into_iter() - .collect::>() - .join(",") - .to_string() + DdlWithMultiCatalogsSnafu { + ddl_name: "create tables" } ); let catalog_name = create_table_exprs[0].catalog_name.to_string(); @@ -264,15 +262,8 @@ impl StatementExecutor { create_table_exprs .windows(2) .all(|expr| expr[0].schema_name == expr[1].schema_name), - CreateTableWithMultiSchemasSnafu { - schema_names: create_table_exprs - .iter() - .map(|x| x.schema_name.as_str()) - .collect::>() - .into_iter() - .collect::>() - .join(",") - .to_string() + DdlWithMultiSchemasSnafu { + ddl_name: "create tables" } ); let schema_name = create_table_exprs[0].schema_name.to_string(); @@ -329,6 +320,38 @@ impl StatementExecutor { .collect()) } + #[tracing::instrument(skip_all)] + pub async fn alter_logical_tables(&self, alter_table_exprs: Vec) -> Result { + let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); + ensure!( + !alter_table_exprs.is_empty(), + EmptyDdlExprSnafu { + name: "alter table" + } + ); + ensure!( + alter_table_exprs + .windows(2) + .all(|expr| expr[0].catalog_name == expr[1].catalog_name), + DdlWithMultiCatalogsSnafu { + ddl_name: "alter tables", + } + ); + ensure!( + alter_table_exprs + .windows(2) + .all(|expr| expr[0].schema_name == expr[1].schema_name), + DdlWithMultiSchemasSnafu { + ddl_name: "alter tables", + } + ); + + self.alter_logical_tables_procedure(alter_table_exprs) + .await?; + + Ok(Output::new_with_affected_rows(0)) + } + #[tracing::instrument(skip_all)] pub async fn drop_table(&self, table_name: TableName, drop_if_exists: bool) -> Result { if let Some(table) = self @@ -415,7 +438,7 @@ impl StatementExecutor { let _ = table_info .meta - .builder_with_alter_kind(table_name, &request.alter_kind) + .builder_with_alter_kind(table_name, &request.alter_kind, false) .context(error::TableSnafu)? .build() .context(error::BuildTableMetaSnafu { table_name })?; @@ -523,6 +546,20 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + async fn alter_logical_tables_procedure( + &self, + tables_data: Vec, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_alter_logical_tables(tables_data), + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + async fn drop_table_procedure( &self, table_name: &TableName, diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 064ddbe5b726..4746a17bcf7b 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -188,9 +188,12 @@ impl TableMeta { &self, table_name: &str, alter_kind: &AlterKind, + add_if_not_exists: bool, ) -> Result { match alter_kind { - AlterKind::AddColumns { columns } => self.add_columns(table_name, columns), + AlterKind::AddColumns { columns } => { + self.add_columns(table_name, columns, add_if_not_exists) + } AlterKind::DropColumns { names } => self.remove_columns(table_name, names), // No need to rebuild table meta when renaming tables. AlterKind::RenameTable { .. } => { @@ -248,6 +251,7 @@ impl TableMeta { &self, table_name: &str, requests: &[AddColumnRequest], + add_if_not_exists: bool, ) -> Result { let table_schema = &self.schema; let mut meta_builder = self.new_meta_builder(); @@ -255,7 +259,31 @@ impl TableMeta { self.primary_key_indices.iter().collect(); let mut names = HashSet::with_capacity(requests.len()); - + let mut new_requests = Vec::with_capacity(requests.len()); + let requests = if add_if_not_exists { + for col_to_add in requests { + if let Some(column_schema) = + table_schema.column_schema_by_name(&col_to_add.column_schema.name) + { + // If the column already exists, we should check if the type is the same. + ensure!( + column_schema.data_type == col_to_add.column_schema.data_type, + error::InvalidAlterRequestSnafu { + table: table_name, + err: format!( + "column {} already exists with different type", + col_to_add.column_schema.name + ), + } + ); + } else { + new_requests.push(col_to_add.clone()); + } + } + &new_requests[..] + } else { + requests + }; for col_to_add in requests { ensure!( names.insert(&col_to_add.column_schema.name), @@ -630,6 +658,44 @@ pub struct RawTableInfo { pub table_type: TableType, } +impl RawTableInfo { + /// Sort the columns in [RawTableInfo], logical tables require it. + pub fn sort_columns(&mut self) { + let column_schemas = &self.meta.schema.column_schemas; + let primary_keys = self + .meta + .primary_key_indices + .iter() + .map(|index| column_schemas[*index].name.clone()) + .collect::>(); + + self.meta + .schema + .column_schemas + .sort_unstable_by(|a, b| a.name.cmp(&b.name)); + + // Compute new indices of sorted columns + let mut primary_key_indices = Vec::with_capacity(primary_keys.len()); + let mut timestamp_index = None; + let mut value_indices = + Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1); + for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() { + if primary_keys.contains(&column_schema.name) { + primary_key_indices.push(index); + } else if column_schema.is_time_index() { + timestamp_index = Some(index); + } else { + value_indices.push(index); + } + } + + // Overwrite table meta + self.meta.schema.timestamp_index = timestamp_index; + self.meta.primary_key_indices = primary_key_indices; + self.meta.value_indices = value_indices; + } +} + impl From for RawTableInfo { fn from(info: TableInfo) -> RawTableInfo { RawTableInfo { @@ -731,7 +797,7 @@ mod tests { }; let builder = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .unwrap(); builder.build().unwrap() } @@ -761,7 +827,7 @@ mod tests { }; let builder = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .unwrap(); builder.build().unwrap() } @@ -808,7 +874,7 @@ mod tests { names: vec![String::from("col2"), String::from("my_field")], }; let new_meta = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .unwrap() .build() .unwrap(); @@ -863,7 +929,7 @@ mod tests { names: vec![String::from("col3"), String::from("col1")], }; let new_meta = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .unwrap() .build() .unwrap(); @@ -903,7 +969,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .err() .unwrap(); assert_eq!(StatusCode::TableColumnExists, err.status_code()); @@ -933,7 +999,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -955,7 +1021,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .err() .unwrap(); assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); @@ -978,7 +1044,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); @@ -989,7 +1055,7 @@ mod tests { }; let err = meta - .builder_with_alter_kind("my_table", &alter_kind) + .builder_with_alter_kind("my_table", &alter_kind, false) .err() .unwrap(); assert_eq!(StatusCode::InvalidArguments, err.status_code());