Skip to content

Commit

Permalink
Merge pull request #5562 from dantengsky/feat-undrop-table-show-history
Browse files Browse the repository at this point in the history
Feat : undrop table & show history
  • Loading branch information
BohuTANG authored May 25, 2022
2 parents 59995af + f463390 commit 486b866
Show file tree
Hide file tree
Showing 43 changed files with 650 additions and 48 deletions.
2 changes: 1 addition & 1 deletion common/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn test_statements_in_legacy_suites() {
// TODO(andylokandy): support all cases eventually
// Remove currently unimplemented cases
let file_str = regex::Regex::new(
"(?i).*(SLAVE|MASTER|COMMIT|START|ROLLBACK|FIELDS|GRANT|COPY|ROLE|STAGE|ENGINES).*\n",
"(?i).*(SLAVE|MASTER|COMMIT|START|ROLLBACK|FIELDS|GRANT|COPY|ROLE|STAGE|ENGINES|UNDROP|HISTORY).*\n",
)
.unwrap()
.replace_all(&file_str, "")
Expand Down
2 changes: 2 additions & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ mod plan_table_optimize;
mod plan_table_rename;
mod plan_table_show_create;
mod plan_table_truncate;
mod plan_table_undrop;
mod plan_use_database;
mod plan_user_alter;
mod plan_user_create;
Expand Down Expand Up @@ -211,6 +212,7 @@ pub use plan_table_rename::RenameTableEntity;
pub use plan_table_rename::RenameTablePlan;
pub use plan_table_show_create::ShowCreateTablePlan;
pub use plan_table_truncate::TruncateTablePlan;
pub use plan_table_undrop::UnDropTablePlan;
pub use plan_use_database::UseDatabasePlan;
pub use plan_user_alter::AlterUserPlan;
pub use plan_user_create::CreateUserPlan;
Expand Down
4 changes: 4 additions & 0 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use common_datavalues::DataSchemaRef;

use crate::plan_table_undrop::UnDropTablePlan;
use crate::AggregatorFinalPlan;
use crate::AggregatorPartialPlan;
use crate::AlterUserPlan;
Expand Down Expand Up @@ -123,6 +124,7 @@ pub enum PlanNode {
// Table.
CreateTable(CreateTablePlan),
DropTable(DropTablePlan),
UnDropTable(UnDropTablePlan),
RenameTable(RenameTablePlan),
TruncateTable(TruncateTablePlan),
OptimizeTable(OptimizeTablePlan),
Expand Down Expand Up @@ -220,6 +222,7 @@ impl PlanNode {
// Table.
PlanNode::CreateTable(v) => v.schema(),
PlanNode::DropTable(v) => v.schema(),
PlanNode::UnDropTable(v) => v.schema(),
PlanNode::RenameTable(v) => v.schema(),
PlanNode::TruncateTable(v) => v.schema(),
PlanNode::OptimizeTable(v) => v.schema(),
Expand Down Expand Up @@ -319,6 +322,7 @@ impl PlanNode {
// Table.
PlanNode::CreateTable(_) => "CreateTablePlan",
PlanNode::DropTable(_) => "DropTablePlan",
PlanNode::UnDropTable(_) => "UndropTablePlan",
PlanNode::RenameTable(_) => "RenameTablePlan",
PlanNode::TruncateTable(_) => "TruncateTablePlan",
PlanNode::OptimizeTable(_) => "OptimizeTablePlan",
Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_node_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_exception::Result;

use crate::plan_broadcast::BroadcastPlan;
use crate::plan_subqueries_set::SubQueriesSetPlan;
use crate::plan_table_undrop::UnDropTablePlan;
use crate::AggregatorFinalPlan;
use crate::AggregatorPartialPlan;
use crate::AlterUserPlan;
Expand Down Expand Up @@ -149,6 +150,7 @@ pub trait PlanRewriter: Sized {
// Table.
PlanNode::CreateTable(plan) => self.rewrite_create_table(plan),
PlanNode::DropTable(plan) => self.rewrite_drop_table(plan),
PlanNode::UnDropTable(plan) => self.rewrite_undrop_table(plan),
PlanNode::RenameTable(plan) => self.rewrite_rename_table(plan),
PlanNode::TruncateTable(plan) => self.rewrite_truncate_table(plan),
PlanNode::OptimizeTable(plan) => self.rewrite_optimize_table(plan),
Expand Down Expand Up @@ -405,6 +407,10 @@ pub trait PlanRewriter: Sized {
Ok(PlanNode::DropTable(plan.clone()))
}

fn rewrite_undrop_table(&mut self, plan: &UnDropTablePlan) -> Result<PlanNode> {
Ok(PlanNode::UnDropTable(plan.clone()))
}

fn rewrite_drop_database(&mut self, plan: &DropDatabasePlan) -> Result<PlanNode> {
Ok(PlanNode::DropDatabase(plan.clone()))
}
Expand Down
6 changes: 6 additions & 0 deletions common/planners/src/plan_node_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use common_exception::Result;

use crate::plan_broadcast::BroadcastPlan;
use crate::plan_subqueries_set::SubQueriesSetPlan;
use crate::plan_table_undrop::UnDropTablePlan;
use crate::AggregatorFinalPlan;
use crate::AggregatorPartialPlan;
use crate::AlterUserPlan;
Expand Down Expand Up @@ -161,6 +162,7 @@ pub trait PlanVisitor {
// Table.
PlanNode::CreateTable(plan) => self.visit_create_table(plan),
PlanNode::DropTable(plan) => self.visit_drop_table(plan),
PlanNode::UnDropTable(plan) => self.visit_undrop_table(plan),
PlanNode::RenameTable(plan) => self.visit_rename_table(plan),
PlanNode::TruncateTable(plan) => self.visit_truncate_table(plan),
PlanNode::OptimizeTable(plan) => self.visit_optimize_table(plan),
Expand Down Expand Up @@ -391,6 +393,10 @@ pub trait PlanVisitor {
Ok(())
}

fn visit_undrop_table(&mut self, _: &UnDropTablePlan) -> Result<()> {
Ok(())
}

fn visit_use_database(&mut self, _: &UseDatabasePlan) -> Result<()> {
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions common/planners/src/plan_show_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ pub struct ShowTablesPlan {
pub showfull: bool,
// show tables from db1 [or in db1]
pub fromdb: Option<String>,
// TRUE if dropped tables should be shown
pub with_history: bool,
}
47 changes: 47 additions & 0 deletions common/planners/src/plan_table_undrop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_types::TableNameIdent;
use common_meta_types::UndropTableReq;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct UnDropTablePlan {
pub tenant: String,
pub catalog: String,
pub db: String,
pub table: String,
}

impl UnDropTablePlan {
pub fn schema(&self) -> DataSchemaRef {
Arc::new(DataSchema::empty())
}
}

/// The table name
impl From<UnDropTablePlan> for UndropTableReq {
fn from(p: UnDropTablePlan) -> Self {
UndropTableReq {
name_ident: TableNameIdent {
tenant: p.tenant,
db_name: p.db,
table_name: p.table,
},
}
}
}
5 changes: 5 additions & 0 deletions query/src/catalogs/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use common_meta_types::RenameTableReq;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
use common_meta_types::TableMeta;
use common_meta_types::UndropTableReply;
use common_meta_types::UndropTableReq;
use common_meta_types::UpdateTableMetaReply;
use common_meta_types::UpdateTableMetaReq;
use common_meta_types::UpsertTableOptionReply;
Expand Down Expand Up @@ -93,10 +95,13 @@ pub trait Catalog: DynClone + Send + Sync {
) -> Result<Arc<dyn Table>>;

async fn list_tables(&self, tenant: &str, db_name: &str) -> Result<Vec<Arc<dyn Table>>>;
async fn list_tables_history(&self, tenant: &str, db_name: &str)
-> Result<Vec<Arc<dyn Table>>>;

async fn create_table(&self, req: CreateTableReq) -> Result<()>;

async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply>;
async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply>;

async fn rename_table(&self, req: RenameTableReq) -> Result<RenameTableReply>;

Expand Down
55 changes: 55 additions & 0 deletions query/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use common_meta_types::RenameTableReq;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
use common_meta_types::TableMeta;
use common_meta_types::UndropTableReply;
use common_meta_types::UndropTableReq;
use common_meta_types::UpdateTableMetaReply;
use common_meta_types::UpdateTableMetaReq;
use common_meta_types::UpsertTableOptionReply;
Expand Down Expand Up @@ -283,6 +285,41 @@ impl Catalog for DatabaseCatalog {
}
}

async fn list_tables_history(
&self,
tenant: &str,
db_name: &str,
) -> Result<Vec<Arc<dyn Table>>> {
if tenant.is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while list tables)",
));
}

let db_name = if Self::is_case_insensitive_db(db_name) {
db_name.to_uppercase()
} else {
db_name.to_string()
};

let r = self
.immutable_catalog
.list_tables_history(tenant, &db_name)
.await;
match r {
Ok(x) => Ok(x),
Err(e) => {
if e.code() == ErrorCode::UnknownDatabaseCode() {
self.mutable_catalog
.list_tables_history(tenant, &db_name)
.await
} else {
Err(e)
}
}
}
}

async fn create_table(&self, req: CreateTableReq) -> Result<()> {
if req.tenant().is_empty() {
return Err(ErrorCode::TenantIsEmpty(
Expand Down Expand Up @@ -319,6 +356,24 @@ impl Catalog for DatabaseCatalog {
self.mutable_catalog.drop_table(req).await
}

async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply> {
if req.tenant().is_empty() {
return Err(ErrorCode::TenantIsEmpty(
"Tenant can not empty(while undrop table)",
));
}
tracing::info!("UnDrop table from req:{:?}", req);

if self
.immutable_catalog
.exists_database(req.tenant(), req.db_name())
.await?
{
return self.immutable_catalog.undrop_table(req).await;
}
self.mutable_catalog.undrop_table(req).await
}

async fn rename_table(&self, req: RenameTableReq) -> Result<RenameTableReply> {
if req.tenant().is_empty() {
return Err(ErrorCode::TenantIsEmpty(
Expand Down
16 changes: 16 additions & 0 deletions query/src/catalogs/default/immutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use common_meta_types::RenameTableReq;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
use common_meta_types::TableMeta;
use common_meta_types::UndropTableReply;
use common_meta_types::UndropTableReq;
use common_meta_types::UpdateTableMetaReply;
use common_meta_types::UpdateTableMetaReq;
use common_meta_types::UpsertTableOptionReply;
Expand Down Expand Up @@ -137,6 +139,14 @@ impl Catalog for ImmutableCatalog {
self.sys_db_meta.get_all_tables(db_name)
}

async fn list_tables_history(
&self,
tenant: &str,
db_name: &str,
) -> Result<Vec<Arc<dyn Table>>> {
self.list_tables(tenant, db_name).await
}

async fn create_table(&self, _req: CreateTableReq) -> Result<()> {
Err(ErrorCode::UnImplement(
"Cannot create table in system database",
Expand All @@ -149,6 +159,12 @@ impl Catalog for ImmutableCatalog {
))
}

async fn undrop_table(&self, _req: UndropTableReq) -> Result<UndropTableReply> {
Err(ErrorCode::UnImplement(
"Cannot undrop table in system database",
))
}

async fn rename_table(&self, _req: RenameTableReq) -> Result<RenameTableReply> {
Err(ErrorCode::UnImplement(
"Cannot rename table in system database",
Expand Down
49 changes: 44 additions & 5 deletions query/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use common_meta_types::RenameTableReq;
use common_meta_types::TableIdent;
use common_meta_types::TableInfo;
use common_meta_types::TableMeta;
use common_meta_types::UndropTableReply;
use common_meta_types::UndropTableReq;
use common_meta_types::UpdateTableMetaReply;
use common_meta_types::UpdateTableMetaReq;
use common_meta_types::UpsertTableOptionReply;
Expand Down Expand Up @@ -126,6 +128,14 @@ impl MutableCatalog {
};
self.ctx.database_factory.get_database(ctx, db_info)
}

fn load_tables(&self, table_infos: Vec<Arc<TableInfo>>) -> Result<Vec<Arc<dyn Table>>> {
table_infos.iter().try_fold(vec![], |mut acc, item| {
let tbl = self.get_table_by_info(item.as_ref())?;
acc.push(tbl);
Ok(acc)
})
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -226,11 +236,35 @@ impl Catalog for MutableCatalog {
.list_tables(ListTableReq::new(tenant, db_name))
.await?;

table_infos.iter().try_fold(vec![], |mut acc, item| {
let tbl = self.get_table_by_info(item.as_ref())?;
acc.push(tbl);
Ok(acc)
})
self.load_tables(table_infos)
}

async fn list_tables_history(
&self,
tenant: &str,
db_name: &str,
) -> Result<Vec<Arc<dyn Table>>> {
// `get_table_history` will not fetch the tables that created before the
// "metasrv time travel functions" is added.
// thus, only the table-infos of dropped tables are used.
let mut dropped = self
.ctx
.meta
.get_table_history(ListTableReq::new(tenant, db_name))
.await?
.into_iter()
.filter(|i| i.meta.drop_on.is_some())
.collect::<Vec<_>>();

let mut table_infos = self
.ctx
.meta
.list_tables(ListTableReq::new(tenant, db_name))
.await?;

table_infos.append(&mut dropped);

self.load_tables(table_infos)
}

async fn create_table(&self, req: CreateTableReq) -> Result<()> {
Expand All @@ -243,6 +277,11 @@ impl Catalog for MutableCatalog {
Ok(res)
}

async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply> {
let res = self.ctx.meta.undrop_table(req).await?;
Ok(res)
}

async fn rename_table(&self, req: RenameTableReq) -> Result<RenameTableReply> {
let res = self.ctx.meta.rename_table(req).await?;
Ok(res)
Expand Down
Loading

0 comments on commit 486b866

Please sign in to comment.