diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 7532d2fb49d0..2502db857653 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -185,6 +185,7 @@ build_exceptions! { BackgroundJobAlreadyExists(1501), UnknownBackgroundJob(1502), + InvalidRowIdIndex(1503), // Index related errors. UnsupportedIndex(1601), RefreshIndexError(1602), diff --git a/src/common/storage/src/common_metrics/merge_into.rs b/src/common/storage/src/common_metrics/merge_into.rs new file mode 100644 index 000000000000..27a585d6baac --- /dev/null +++ b/src/common/storage/src/common_metrics/merge_into.rs @@ -0,0 +1,37 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use metrics::increment_gauge; + +macro_rules! key { + ($key: literal) => { + concat!("query_", $key) + }; +} + +pub fn metrics_inc_merge_into_replace_blocks_counter(c: u32) { + increment_gauge!(key!("merge_into_replace_blocks_counter"), c as f64); +} + +pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) { + increment_gauge!(key!("merge_into_append_blocks_counter"), c as f64); +} + +pub fn metrics_inc_merge_into_matched_rows(c: u32) { + increment_gauge!(key!("merge_into_matched_rows"), c as f64); +} + +pub fn metrics_inc_merge_into_unmatched_rows(c: u32) { + increment_gauge!(key!("merge_into_unmatched_rows"), c as f64); +} diff --git a/src/common/storage/src/common_metrics/mod.rs b/src/common/storage/src/common_metrics/mod.rs index 54ee433ca1a9..890e46a7413d 100644 --- a/src/common/storage/src/common_metrics/mod.rs +++ b/src/common/storage/src/common_metrics/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod copy; +pub mod merge_into; mod storage_metrics; pub use storage_metrics::StorageMetrics; diff --git a/src/query/ast/src/ast/statements/merge_into.rs b/src/query/ast/src/ast/statements/merge_into.rs new file mode 100644 index 000000000000..33f496797a6c --- /dev/null +++ b/src/query/ast/src/ast/statements/merge_into.rs @@ -0,0 +1,262 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::fmt::Display; +use std::fmt::Formatter; + +use common_exception::ErrorCode; +use common_exception::Result; + +use super::Hint; +use crate::ast::write_comma_separated_list; +use crate::ast::write_period_separated_list; +use crate::ast::Expr; +use crate::ast::Identifier; +use crate::ast::Query; +use crate::ast::TableAlias; +use crate::ast::TableReference; + +#[derive(Debug, Clone, PartialEq)] +pub struct MergeUpdateExpr { + pub catalog: Option, + pub table: Option, + pub name: Identifier, + pub expr: Expr, +} + +impl Display for MergeUpdateExpr { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + if self.catalog.is_some() { + write!(f, "{}.", self.catalog.clone().unwrap())?; + } + + if self.table.is_some() { + write!(f, "{}.", self.table.clone().unwrap())?; + } + + write!(f, "{} = {}", self.name, self.expr) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum MatchOperation { + Update { update_list: Vec }, + Delete, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct MatchedClause { + pub selection: Option, + pub operation: MatchOperation, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct InsertOperation { + pub columns: Option>, + pub values: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct UnmatchedClause { + pub selection: Option, + pub insert_operation: InsertOperation, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum MergeOption { + Match(MatchedClause), + Unmatch(UnmatchedClause), +} + +#[derive(Debug, Clone, PartialEq)] +pub struct MergeIntoStmt { + pub hints: Option, + pub catalog: Option, + pub database: Option, + pub table_ident: Identifier, + pub source: MergeSource, + // alias_target is belong to target + pub alias_target: Option, + pub join_expr: Expr, + pub merge_options: Vec, +} + +impl Display for MergeIntoStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "MERGE INTO ")?; + write_period_separated_list( + f, + self.catalog + .iter() + .chain(&self.database) + .chain(Some(&self.table_ident)), + )?; + + write!(f, " USING {} ON {}", self.source, self.join_expr)?; + + for clause in &self.merge_options { + match clause { + MergeOption::Match(match_clause) => { + write!(f, " WHEN MATCHED ")?; + if let Some(e) = &match_clause.selection { + write!(f, " AND {} ", e)?; + } + write!(f, " THEN ")?; + + match &match_clause.operation { + MatchOperation::Update { update_list } => { + write!(f, " UPDATE SET ")?; + write_comma_separated_list(f, update_list)?; + } + MatchOperation::Delete => { + write!(f, " DELETE ")?; + } + } + } + MergeOption::Unmatch(unmatch_clause) => { + write!(f, " WHEN NOT MATCHED ")?; + if let Some(e) = &unmatch_clause.selection { + write!(f, " AND {} ", e)?; + } + write!(f, " THEN INSERT ")?; + if let Some(columns) = &unmatch_clause.insert_operation.columns { + if !columns.is_empty() { + write!(f, " (")?; + write_comma_separated_list(f, columns)?; + write!(f, ")")?; + } + } + write!(f, "VALUES")?; + for (i, value) in unmatch_clause.insert_operation.values.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "(")?; + write_comma_separated_list(f, vec![value])?; + write!(f, ")")?; + } + } + } + } + Ok(()) + } +} + +impl MergeIntoStmt { + pub fn split_clauses(&self) -> (Vec, Vec) { + let mut match_clauses = Vec::with_capacity(self.merge_options.len()); + let mut unmatch_clauses = Vec::with_capacity(self.merge_options.len()); + for merge_operation in &self.merge_options { + match merge_operation { + MergeOption::Match(match_clause) => match_clauses.push(match_clause.clone()), + MergeOption::Unmatch(unmatch_clause) => { + unmatch_clauses.push(unmatch_clause.clone()) + } + } + } + (match_clauses, unmatch_clauses) + } + + pub fn check_multi_match_clauses_semantic(clauses: &Vec) -> Result<()> { + // check match_clauses + if clauses.len() > 1 { + for (idx, clause) in clauses.iter().enumerate() { + if clause.selection.is_none() && idx < clauses.len() - 1 { + return Err(ErrorCode::SemanticError( + "when there are multi matched clauses, we must have a condition for every one except the last one".to_string(), + )); + } + } + } + Ok(()) + } + + pub fn check_multi_unmatch_clauses_semantic(clauses: &Vec) -> Result<()> { + // check unmatch_clauses + if clauses.len() > 1 { + for (idx, clause) in clauses.iter().enumerate() { + if clause.selection.is_none() && idx < clauses.len() - 1 { + return Err(ErrorCode::SemanticError( + "when there are multi unmatched clauses, we must have a condition for every one except the last one".to_string(), + )); + } + } + } + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum MergeSource { + StreamingV2 { + settings: BTreeMap, + on_error_mode: Option, + start: usize, + }, + + Select { + query: Box, + }, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct StreamingSource { + settings: BTreeMap, + on_error_mode: Option, + start: usize, +} + +impl MergeSource { + pub fn transform_table_reference(&self) -> TableReference { + match self { + Self::StreamingV2 { + settings: _, + on_error_mode: _, + start: _, + } => unimplemented!(), + + Self::Select { query } => TableReference::Subquery { + span: None, + subquery: query.clone(), + alias: None, + }, + } + } +} + +impl Display for MergeSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + MergeSource::StreamingV2 { + settings, + on_error_mode, + start: _, + } => { + write!(f, " FILE_FORMAT = (")?; + for (k, v) in settings.iter() { + write!(f, " {} = '{}'", k, v)?; + } + write!(f, " )")?; + write!( + f, + " ON_ERROR = '{}'", + on_error_mode.as_ref().unwrap_or(&"Abort".to_string()) + ) + } + + MergeSource::Select { query } => write!(f, "{query}"), + } + } +} diff --git a/src/query/ast/src/ast/statements/mod.rs b/src/query/ast/src/ast/statements/mod.rs index 2f58be38c4f0..3e8505c194f2 100644 --- a/src/query/ast/src/ast/statements/mod.rs +++ b/src/query/ast/src/ast/statements/mod.rs @@ -23,6 +23,7 @@ mod hint; mod index; mod insert; mod kill; +mod merge_into; mod network_policy; mod presign; mod replace; @@ -48,6 +49,7 @@ pub use hint::*; pub use index::*; pub use insert::*; pub use kill::*; +pub use merge_into::*; pub use network_policy::*; pub use presign::*; pub use replace::*; diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index a8de3dd75b38..e03e5b982016 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -19,6 +19,7 @@ use common_meta_app::principal::FileFormatOptionsAst; use common_meta_app::principal::PrincipalIdentity; use common_meta_app::principal::UserIdentity; +use super::merge_into::MergeIntoStmt; use super::*; use crate::ast::write_comma_separated_list; use crate::ast::Expr; @@ -76,7 +77,7 @@ pub enum Statement { Insert(InsertStmt), Replace(ReplaceStmt), - + MergeInto(MergeIntoStmt), Delete { hints: Option, table_reference: TableReference, @@ -298,6 +299,7 @@ impl Display for Statement { Statement::Query(query) => write!(f, "{query}")?, Statement::Insert(insert) => write!(f, "{insert}")?, Statement::Replace(replace) => write!(f, "{replace}")?, + Statement::MergeInto(merge_into) => write!(f, "{merge_into}")?, Statement::Delete { table_reference, selection, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index cbc5007ef569..396512fbcf60 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -151,6 +151,36 @@ pub fn statement(i: Input) -> IResult { }, ); + let merge = map( + rule! { + MERGE ~ #hint? ~ INTO ~ #period_separated_idents_1_to_3 ~ #table_alias? ~ USING + ~ #merge_source ~ ON ~ #expr ~ (#match_clause | #unmatch_clause)* + }, + |( + _, + opt_hints, + _, + (catalog, database, table), + alias_target, + _, + source, + _, + join_expr, + merge_options, + )| { + Statement::MergeInto(MergeIntoStmt { + hints: opt_hints, + catalog, + database, + table_ident: table, + source, + alias_target, + join_expr, + merge_options, + }) + }, + ); + let delete = map( rule! { DELETE ~ #hint? ~ FROM ~ #table_reference_only @@ -1460,6 +1490,7 @@ pub fn statement(i: Input) -> IResult { rule!( #insert : "`INSERT INTO [TABLE] [(, ...)] (FORMAT | VALUES | )`" | #replace : "`REPLACE INTO [TABLE]
[(, ...)] (FORMAT | VALUES | )`" + | #merge : "`MERGE INTO USING ON { matchedClause | notMatchedClause } [ ... ]`" ), rule!( #set_variable : "`SET = `" @@ -1633,6 +1664,28 @@ pub fn insert_source(i: Input) -> IResult { )(i) } +pub fn merge_source(i: Input) -> IResult { + let streaming_v2 = map( + rule! { + #file_format_clause ~ ( ON_ERROR ~ "=" ~ #ident)? ~ #rest_str + }, + |(options, on_error_opt, (_, start))| MergeSource::StreamingV2 { + settings: options, + on_error_mode: on_error_opt.map(|v| v.2.to_string()), + start, + }, + ); + + let query = map(query, |query| MergeSource::Select { + query: Box::new(query), + }); + + rule!( + #streaming_v2 + | #query + )(i) +} + pub fn unset_source(i: Input) -> IResult { //#ident ~ ( "(" ~ ^#comma_separated_list1(ident) ~ ")")? let var = map( @@ -2148,6 +2201,67 @@ pub fn alter_table_action(i: Input) -> IResult { )(i) } +pub fn match_clause(i: Input) -> IResult { + map( + rule! { + WHEN ~ MATCHED ~ (AND ~ #expr)? ~ THEN ~ #match_operation + }, + |(_, _, expr_op, _, match_operation)| match expr_op { + Some(expr) => MergeOption::Match(MatchedClause { + selection: Some(expr.1), + operation: match_operation, + }), + None => MergeOption::Match(MatchedClause { + selection: None, + operation: match_operation, + }), + }, + )(i) +} + +fn match_operation(i: Input) -> IResult { + alt(( + value(MatchOperation::Delete, rule! {DELETE}), + map( + rule! { + UPDATE ~ SET ~ ^#comma_separated_list1(merge_update_expr) + }, + |(_, _, update_list)| MatchOperation::Update { update_list }, + ), + ))(i) +} + +pub fn unmatch_clause(i: Input) -> IResult { + map( + rule! { + WHEN ~ NOT ~ MATCHED ~ (AND ~ #expr)? ~ THEN ~ INSERT ~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )? + ~ VALUES ~ ^#row_values + }, + |(_, _, _, expr_op, _, _, columns_op, _, values)| { + let selection = match expr_op { + Some(e) => Some(e.1), + None => None, + }; + match columns_op { + Some(columns) => MergeOption::Unmatch(UnmatchedClause { + insert_operation: InsertOperation { + columns: Some(columns.1), + values, + }, + selection, + }), + None => MergeOption::Unmatch(UnmatchedClause { + insert_operation: InsertOperation { + columns: None, + values, + }, + selection, + }), + } + }, + )(i) +} + pub fn add_column_option(i: Input) -> IResult { alt(( value(AddColumnOption::First, rule! { FIRST }), @@ -2533,3 +2647,15 @@ pub fn update_expr(i: Input) -> IResult { UpdateExpr { name, expr } })(i) } + +pub fn merge_update_expr(i: Input) -> IResult { + map( + rule! { ( #period_separated_idents_1_to_3 ~ "=" ~ ^#expr ) }, + |((catalog, table, name), _, expr)| MergeUpdateExpr { + catalog, + table, + name, + expr, + }, + )(i) +} diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index a9be7ba7c4e7..1c0604e49897 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -744,6 +744,12 @@ pub enum TokenKind { RENAME, #[token("REPLACE", ignore(ascii_case))] REPLACE, + #[token("MERGE", ignore(ascii_case))] + MERGE, + #[token("MATCHED", ignore(ascii_case))] + MATCHED, + #[token("UNMATCHED", ignore(ascii_case))] + UNMATCHED, #[token("ROW", ignore(ascii_case))] ROW, #[token("ROWS", ignore(ascii_case))] diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs index 2fc36dd6bb3a..9121c0d6885c 100644 --- a/src/query/ast/src/visitors/visitor.rs +++ b/src/query/ast/src/visitors/visitor.rs @@ -404,7 +404,7 @@ pub trait Visitor<'ast>: Sized { fn visit_insert(&mut self, _insert: &'ast InsertStmt) {} fn visit_replace(&mut self, _replace: &'ast ReplaceStmt) {} - + fn visit_merge_into(&mut self, _merge_into: &'ast MergeIntoStmt) {} fn visit_insert_source(&mut self, _insert_source: &'ast InsertSource) {} fn visit_delete( diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs index b0426dc83d01..d03e129df55e 100644 --- a/src/query/ast/src/visitors/visitor_mut.rs +++ b/src/query/ast/src/visitors/visitor_mut.rs @@ -419,7 +419,7 @@ pub trait VisitorMut: Sized { fn visit_insert(&mut self, _insert: &mut InsertStmt) {} fn visit_replace(&mut self, _replace: &mut ReplaceStmt) {} - + fn visit_merge_into(&mut self, _merge_into: &mut MergeIntoStmt) {} fn visit_insert_source(&mut self, _insert_source: &mut InsertSource) {} fn visit_delete( diff --git a/src/query/ast/src/visitors/walk.rs b/src/query/ast/src/visitors/walk.rs index f9bd77cf5d7d..e42c7da258d0 100644 --- a/src/query/ast/src/visitors/walk.rs +++ b/src/query/ast/src/visitors/walk.rs @@ -336,6 +336,7 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem Statement::Query(query) => visitor.visit_query(query), Statement::Insert(insert) => visitor.visit_insert(insert), Statement::Replace(replace) => visitor.visit_replace(replace), + Statement::MergeInto(merge_into) => visitor.visit_merge_into(merge_into), Statement::Delete { table_reference, selection, diff --git a/src/query/ast/src/visitors/walk_mut.rs b/src/query/ast/src/visitors/walk_mut.rs index 162801723a5a..e6e74bc7f21f 100644 --- a/src/query/ast/src/visitors/walk_mut.rs +++ b/src/query/ast/src/visitors/walk_mut.rs @@ -311,6 +311,7 @@ pub fn walk_statement_mut(visitor: &mut V, statement: &mut Statem Statement::Query(query) => visitor.visit_query(&mut *query), Statement::Insert(insert) => visitor.visit_insert(insert), Statement::Replace(replace) => visitor.visit_replace(replace), + Statement::MergeInto(merge_into) => visitor.visit_merge_into(merge_into), Statement::Delete { table_reference, selection, diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index f5e4c970fb45..661ba4e7c8d9 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -441,6 +441,18 @@ impl AccessChecker for PrivilegeAccess { ) .await?; } + Plan::MergeInto(plan) => { + session + .validate_privilege( + &GrantObject::Table( + plan.catalog.clone(), + plan.database.clone(), + plan.table.clone(), + ), + vec![UserPrivilegeType::Insert, UserPrivilegeType::Delete], + ) + .await?; + } Plan::Delete(plan) => { session .validate_privilege( diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 373fd034d0b5..b4fa1a557e7d 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -22,6 +22,7 @@ use super::interpreter_catalog_create::CreateCatalogInterpreter; use super::interpreter_catalog_show_create::ShowCreateCatalogInterpreter; use super::interpreter_index_create::CreateIndexInterpreter; use super::interpreter_index_drop::DropIndexInterpreter; +use super::interpreter_merge_into::MergeIntoInterpreter; use super::interpreter_share_desc::DescShareInterpreter; use super::interpreter_table_set_options::SetOptionsInterpreter; use super::interpreter_user_stage_drop::DropUserStageInterpreter; @@ -265,7 +266,9 @@ impl InterpreterFactory { Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()), Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()), - + Plan::MergeInto(merge_into) => { + MergeIntoInterpreter::try_create(ctx, *merge_into.clone()) + } Plan::Delete(delete) => Ok(Arc::new(DeleteInterpreter::try_create( ctx, *delete.clone(), diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs new file mode 100644 index 000000000000..4aca6901d326 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -0,0 +1,310 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::runtime::GlobalIORuntime; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::ConstantFolder; +use common_expression::DataSchemaRef; +use common_expression::FieldIndex; +use common_expression::RemoteExpr; +use common_functions::BUILTIN_FUNCTIONS; +use common_meta_app::schema::TableInfo; +use common_sql::executor::MergeInto; +use common_sql::executor::MergeIntoSource; +use common_sql::executor::MutationAggregate; +use common_sql::executor::MutationKind; +use common_sql::executor::PhysicalPlan; +use common_sql::executor::PhysicalPlanBuilder; +use common_sql::plans::MergeInto as MergePlan; +use common_sql::plans::UpdatePlan; +use common_sql::ScalarExpr; +use common_sql::TypeCheck; +use common_storages_factory::Table; +use common_storages_fuse::FuseTable; +use common_storages_fuse::TableContext; +use itertools::Itertools; +use storages_common_table_meta::meta::TableSnapshot; +use table_lock::TableLockHandlerWrapper; + +use super::Interpreter; +use super::InterpreterPtr; +use crate::pipelines::PipelineBuildResult; +use crate::schedulers::build_query_pipeline_without_render_result_set; +use crate::sessions::QueryContext; + +const DUMMY_COL_INDEX: usize = 1; +pub struct MergeIntoInterpreter { + ctx: Arc, + plan: MergePlan, +} + +impl MergeIntoInterpreter { + pub fn try_create(ctx: Arc, plan: MergePlan) -> Result { + Ok(Arc::new(MergeIntoInterpreter { ctx, plan })) + } +} + +#[async_trait::async_trait] +impl Interpreter for MergeIntoInterpreter { + fn name(&self) -> &str { + "MergeIntoInterpreter" + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let (physical_plan, table_info) = self.build_physical_plan().await?; + let mut build_res = + build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false) + .await?; + + // Add table lock heartbeat before execution. + let handler = TableLockHandlerWrapper::instance(self.ctx.clone()); + let mut heartbeat = handler.try_lock(self.ctx.clone(), table_info).await?; + + if build_res.main_pipeline.is_empty() { + heartbeat.shutdown().await?; + } else { + build_res.main_pipeline.set_on_finished(move |may_error| { + // shutdown table lock heartbeat. + GlobalIORuntime::instance().block_on(async move { heartbeat.shutdown().await })?; + match may_error { + None => Ok(()), + Some(error_code) => Err(error_code.clone()), + } + }); + } + Ok(build_res) + } +} + +// todo:(JackTan25) computed exprs +impl MergeIntoInterpreter { + async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> { + let MergePlan { + bind_context, + input, + meta_data, + columns_set, + catalog, + database, + table, + matched_evaluators, + unmatched_evaluators, + target_table_idx, + .. + } = &self.plan; + let table_name = table.clone(); + let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false); + + // build source for MergeInto + let join_input = builder.build(input, *columns_set.clone()).await?; + + // find row_id column index + let join_output_schema = join_input.output_schema()?; + + let mut row_id_idx = match meta_data + .read() + .row_id_index_by_table_index(*target_table_idx) + { + None => { + return Err(ErrorCode::InvalidRowIdIndex( + "can't get internal row_id_idx when running merge into", + )); + } + Some(row_id_idx) => row_id_idx, + }; + + let mut found_row_id = false; + for (idx, data_filed) in join_output_schema.fields().iter().enumerate() { + if *data_filed.name() == row_id_idx.to_string() { + row_id_idx = idx; + found_row_id = true; + break; + } + } + + // we can't get row_id_idx, throw an exception + if !found_row_id { + return Err(ErrorCode::InvalidRowIdIndex( + "can't get internal row_id_idx when running merge into", + )); + } + + let table = self.ctx.get_table(catalog, database, &table_name).await?; + let fuse_table = + table + .as_any() + .downcast_ref::() + .ok_or(ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support REPLACE INTO", + table.name(), + table.get_table_info().engine(), + )))?; + let table_info = fuse_table.get_table_info(); + let catalog_ = self.ctx.get_catalog(catalog).await?; + + // merge_into_source is used to recv join's datablocks and split them into macthed and not matched + // datablocks. + let merge_into_source = PhysicalPlan::MergeIntoSource(MergeIntoSource { + input: Box::new(join_input), + row_id_idx: row_id_idx as u32, + }); + + // transform unmatched for insert + // reference to func `build_eval_scalar` + // (DataSchemaRef, Option, Vec,Vec) => (source_schema, condition, value_exprs) + let mut unmatched = + Vec::<(DataSchemaRef, Option, Vec)>::with_capacity( + unmatched_evaluators.len(), + ); + + for item in unmatched_evaluators { + let filter = if let Some(filter_expr) = &item.condition { + Some(self.transform_scalar_expr2expr(filter_expr, join_output_schema.clone())?) + } else { + None + }; + let mut values_exprs = Vec::::with_capacity(item.values.len()); + + for scalar_expr in &item.values { + values_exprs + .push(self.transform_scalar_expr2expr(scalar_expr, join_output_schema.clone())?) + } + + unmatched.push((item.source_schema.clone(), filter, values_exprs)) + } + + // the first option is used for condition + // the second option is used to distinct update and delete + let mut matched = Vec::with_capacity(matched_evaluators.len()); + + // transform matched for delete/update + for item in matched_evaluators { + let condition = if let Some(condition) = &item.condition { + let expr = self + .transform_scalar_expr2expr(condition, join_output_schema.clone())? + .as_expr(&BUILTIN_FUNCTIONS); + let (expr, _) = ConstantFolder::fold( + &expr, + &self.ctx.get_function_context()?, + &BUILTIN_FUNCTIONS, + ); + Some(expr.as_remote_expr()) + } else { + None + }; + // update + let update_list = if let Some(update_list) = &item.update { + // use update_plan to get exprs + let update_plan = UpdatePlan { + selection: None, + subquery_desc: vec![], + database: database.clone(), + table: table_name.clone(), + update_list: update_list.clone(), + bind_context: bind_context.clone(), + metadata: self.plan.meta_data.clone(), + catalog: catalog.clone(), + }; + let col_indices = if item.condition.is_none() { + vec![] + } else { + // we don't need to real col_indices here, just give a + // dummy index, that's ok. + vec![DUMMY_COL_INDEX] + }; + let update_list: Vec<(FieldIndex, RemoteExpr)> = update_plan + .generate_update_list( + self.ctx.clone(), + fuse_table.schema().into(), + col_indices, + false, + )?; + let update_list = update_list + .iter() + .map(|(idx, remote_expr)| { + ( + *idx, + remote_expr + .as_expr(&BUILTIN_FUNCTIONS) + .project_column_ref(|name| { + join_output_schema.index_of(name).unwrap() + }) + .as_remote_expr(), + ) + }) + .collect_vec(); + Some(update_list) + } else { + // delete + None + }; + matched.push((condition, update_list)) + } + + let base_snapshot = fuse_table.read_table_snapshot().await?.unwrap_or_else(|| { + Arc::new(TableSnapshot::new_empty_snapshot( + fuse_table.schema().as_ref().clone(), + )) + }); + + // recv datablocks from matched upstream and unmatched upstream + // transform and append dat + let merge_into = PhysicalPlan::MergeInto(MergeInto { + input: Box::new(merge_into_source), + table_info: table_info.clone(), + catalog_info: catalog_.info(), + unmatched, + matched, + row_id_idx, + segments: base_snapshot + .segments + .clone() + .into_iter() + .enumerate() + .collect(), + }); + + // build mutation_aggregate + let physical_plan = PhysicalPlan::MutationAggregate(Box::new(MutationAggregate { + input: Box::new(merge_into), + snapshot: (*base_snapshot).clone(), + table_info: table_info.clone(), + catalog_info: catalog_.info(), + // let's use update first, we will do some optimizeations and select exact strategy + mutation_kind: MutationKind::Update, + })); + + Ok((physical_plan, table_info.clone())) + } + + fn transform_scalar_expr2expr( + &self, + scalar_expr: &ScalarExpr, + schema: DataSchemaRef, + ) -> Result { + let scalar_expr = scalar_expr + .resolve_and_check(schema.as_ref())? + .project_column_ref(|index| schema.index_of(&index.to_string()).unwrap()); + let (filer, _) = ConstantFolder::fold( + &scalar_expr, + &self.ctx.get_function_context().unwrap(), + &BUILTIN_FUNCTIONS, + ); + Ok(filer.as_remote_expr()) + } +} diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 1040b105d417..5766e0598c68 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -156,6 +156,7 @@ impl Interpreter for UpdateInterpreter { self.ctx.clone(), tbl.schema().into(), col_indices.clone(), + true, )?; let computed_list = self diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index eb1369466325..f6da928968c2 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -41,6 +41,7 @@ mod interpreter_index_drop; mod interpreter_index_refresh; mod interpreter_insert; mod interpreter_kill; +mod interpreter_merge_into; mod interpreter_metrics; mod interpreter_network_policies_show; mod interpreter_network_policy_alter; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 6c05911bbfaa..1cf3339e292c 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -90,6 +90,8 @@ use common_sql::executor::HashJoin; use common_sql::executor::Lambda; use common_sql::executor::Limit; use common_sql::executor::MaterializedCte; +use common_sql::executor::MergeInto; +use common_sql::executor::MergeIntoSource; use common_sql::executor::MutationAggregate; use common_sql::executor::PhysicalPlan; use common_sql::executor::Project; @@ -113,6 +115,8 @@ use common_storage::DataOperator; use common_storages_factory::Table; use common_storages_fuse::operations::build_row_fetcher_pipeline; use common_storages_fuse::operations::common::TransformSerializeSegment; +use common_storages_fuse::operations::merge_into::MergeIntoNotMatchedProcessor; +use common_storages_fuse::operations::merge_into::MergeIntoSplitProcessor; use common_storages_fuse::operations::replace_into::BroadcastProcessor; use common_storages_fuse::operations::replace_into::ReplaceIntoProcessor; use common_storages_fuse::operations::replace_into::UnbranchedReplaceIntoProcessor; @@ -260,6 +264,10 @@ impl PipelineBuilder { PhysicalPlan::AsyncSourcer(async_sourcer) => self.build_async_sourcer(async_sourcer), PhysicalPlan::Deduplicate(deduplicate) => self.build_deduplicate(deduplicate), PhysicalPlan::ReplaceInto(replace) => self.build_replace_into(replace), + PhysicalPlan::MergeInto(merge_into) => self.build_merge_into(merge_into), + PhysicalPlan::MergeIntoSource(merge_into_source) => { + self.build_merge_into_source(merge_into_source) + } } } @@ -279,6 +287,19 @@ impl PipelineBuilder { Ok(cast_needed) } + fn build_merge_into_source(&mut self, merge_into_source: &MergeIntoSource) -> Result<()> { + let MergeIntoSource { input, row_id_idx } = merge_into_source; + + self.build_pipeline(input)?; + self.main_pipeline.try_resize(1)?; + let merge_into_split_processor = MergeIntoSplitProcessor::create(*row_id_idx, false)?; + + self.main_pipeline + .add_pipe(merge_into_split_processor.into_pipe()); + + Ok(()) + } + fn build_deduplicate(&mut self, deduplicate: &Deduplicate) -> Result<()> { let Deduplicate { input, @@ -292,6 +313,7 @@ impl PipelineBuilder { table_schema, need_insert, } = deduplicate; + let tbl = self .ctx .build_table_by_table_info(catalog_info, table_info, None)?; @@ -384,6 +406,101 @@ impl PipelineBuilder { Ok(()) } + fn build_merge_into(&mut self, merge_into: &MergeInto) -> Result<()> { + let MergeInto { + input, + table_info, + catalog_info, + unmatched, + matched, + row_id_idx, + segments, + } = merge_into; + self.build_pipeline(input)?; + let tbl = self + .ctx + .build_table_by_table_info(catalog_info, table_info, None)?; + let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( + unmatched.clone(), + input.output_schema()?, + self.ctx.get_function_context()?, + )?; + + let table = FuseTable::try_from_table(tbl.as_ref())?; + let block_thresholds = table.get_block_thresholds(); + + let cluster_stats_gen = + table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds)?; + + // append data for unmatched data + let serialize_block_transform = TransformSerializeBlock::try_create( + self.ctx.clone(), + InputPort::create(), + OutputPort::create(), + table, + cluster_stats_gen, + )?; + let block_builder = serialize_block_transform.get_block_builder(); + + let serialize_segment_transform = TransformSerializeSegment::new( + self.ctx.clone(), + InputPort::create(), + OutputPort::create(), + table, + block_thresholds, + ); + + let pipe_items = vec![ + create_dummy_item(), + merge_into_not_matched_processor.into_pipe_item(), + ]; + + self.main_pipeline.add_pipe(Pipe::create( + self.main_pipeline.output_len(), + self.main_pipeline.output_len(), + pipe_items, + )); + + let pipe_items = vec![ + create_dummy_item(), + serialize_block_transform.into_pipe_item(), + ]; + + self.main_pipeline.add_pipe(Pipe::create( + self.main_pipeline.output_len(), + self.main_pipeline.output_len(), + pipe_items, + )); + + let mut pipe_items = Vec::with_capacity(2); + + let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); + // for matched update and delete + pipe_items.push(table.matched_mutator( + self.ctx.clone(), + block_builder, + io_request_semaphore, + *row_id_idx, + matched.clone(), + input.output_schema()?, + segments.clone(), + )?); + pipe_items.push(serialize_segment_transform.into_pipe_item()); + + self.main_pipeline.add_pipe(Pipe::create( + self.main_pipeline.output_len(), + self.main_pipeline.output_len(), + pipe_items, + )); + + // todo:(JackTan25): process filling default columns + // because the datablock we receive here may have different + // schema, so we can't just add build_filling_default_columns + // to solve it simply. we will add new processor in the later pr. + Ok(()) + } + fn build_replace_into(&mut self, replace: &ReplaceInto) -> Result<()> { let ReplaceInto { input, diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index fa89b6674168..a87f9f485c1a 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -349,6 +349,7 @@ impl InteractiveWorkerBase { context.attach_query_str(plan.to_string(), extras.statement.to_mask_sql()); let interpreter = InterpreterFactory::get(context.clone(), &plan).await; + let has_result_set = plan.has_result_set(); match interpreter { diff --git a/src/query/service/src/test_kits/table_test_fixture.rs b/src/query/service/src/test_kits/table_test_fixture.rs index 54651a532a05..a209f6e8932f 100644 --- a/src/query/service/src/test_kits/table_test_fixture.rs +++ b/src/query/service/src/test_kits/table_test_fixture.rs @@ -663,8 +663,12 @@ pub async fn do_update( } else { (None, vec![]) }; - let update_list = - plan.generate_update_list(ctx.clone(), table.schema().into(), col_indices.clone())?; + let update_list = plan.generate_update_list( + ctx.clone(), + table.schema().into(), + col_indices.clone(), + true, + )?; let computed_list = plan.generate_stored_computed_list(ctx.clone(), Arc::new(table.schema().into()))?; diff --git a/src/query/service/tests/it/storages/testdata/settings_table.txt b/src/query/service/tests/it/storages/testdata/settings_table.txt index 6a1f6be3c1fb..551e5999f086 100644 --- a/src/query/service/tests/it/storages/testdata/settings_table.txt +++ b/src/query/service/tests/it/storages/testdata/settings_table.txt @@ -12,6 +12,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System | 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' | | 'enable_distributed_replace_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of replace into.' | 'UInt64' | | 'enable_dphyp' | '1' | '1' | 'SESSION' | 'Enables dphyp join order algorithm.' | 'UInt64' | +| 'enable_experimental_merge_into' | '0' | '0' | 'SESSION' | 'Enable unstable merge into.' | 'UInt64' | | 'enable_hive_parquet_predict_pushdown' | '1' | '1' | 'SESSION' | 'Enable hive parquet predict pushdown by setting this variable to 1, default value: 1' | 'UInt64' | | 'enable_query_result_cache' | '0' | '0' | 'SESSION' | 'Enables caching query results to improve performance for identical queries.' | 'UInt64' | | 'enable_recluster_after_write' | '1' | '1' | 'SESSION' | 'Enables re-clustering after write(copy/replace-into).' | 'UInt64' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 7d14b531311c..87157a737230 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -326,6 +326,12 @@ impl DefaultSettings { possible_values: None, display_in_show_settings: true, }), + ("enable_experimental_merge_into", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enable unstable merge into.", + possible_values: None, + display_in_show_settings: true, + }), ("enable_distributed_replace_into", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enable distributed execution of replace into.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index a0cfafb27bcb..c8adbdcbde51 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -384,6 +384,14 @@ impl Settings { self.try_set_u64("enable_distributed_copy_into", u64::from(val)) } + pub fn get_enable_experimental_merge_into(&self) -> Result { + Ok(self.try_get_u64("enable_experimental_merge_into")? != 0) + } + + pub fn set_enable_experimental_merge_into(&self, val: bool) -> Result<()> { + self.try_set_u64("enable_experimental_merge_into", u64::from(val)) + } + pub fn get_enable_distributed_replace(&self) -> Result { Ok(self.try_get_u64("enable_distributed_replace_into")? != 0) } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 8c242b6c9f68..9b30928606db 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -211,6 +211,8 @@ fn to_format_tree( PhysicalPlan::AsyncSourcer(_) => Ok(FormatTreeNode::new("AsyncSourcer".to_string())), PhysicalPlan::Deduplicate(_) => Ok(FormatTreeNode::new("Deduplicate".to_string())), PhysicalPlan::ReplaceInto(_) => Ok(FormatTreeNode::new("Replace".to_string())), + PhysicalPlan::MergeInto(_) => Ok(FormatTreeNode::new("MergeInto".to_string())), + PhysicalPlan::MergeIntoSource(_) => Ok(FormatTreeNode::new("MergeIntoSource".to_string())), PhysicalPlan::CteScan(plan) => cte_scan_to_format_tree(plan), PhysicalPlan::MaterializedCte(plan) => { materialized_cte_to_format_tree(plan, metadata, profs) diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 4b9f61281e3e..7684f6faeb76 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -60,6 +60,8 @@ use crate::IndexType; pub type ColumnID = String; +pub type MatchExpr = Vec<(Option, Option>)>; + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct TableScan { /// A unique id of operator in a `PhysicalPlan` tree. @@ -758,6 +760,27 @@ impl UnionAll { } } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MergeIntoSource { + // join result: source_columns, target_columns, target_table._row_id + pub input: Box, + pub row_id_idx: u32, +} + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MergeInto { + pub input: Box, + pub table_info: TableInfo, + pub catalog_info: CatalogInfo, + // (DataSchemaRef, Option, Vec,Vec) => (source_schema, condition, value_exprs) + pub unmatched: Vec<(DataSchemaRef, Option, Vec)>, + // the first option stands for the condition + // the second option stands for update/delete + pub matched: MatchExpr, + pub row_id_idx: usize, + pub segments: Vec<(usize, Location)>, +} + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct CopyIntoTable { pub catalog_info: CatalogInfo, @@ -1000,6 +1023,9 @@ pub enum PhysicalPlan { AsyncSourcer(AsyncSourcerPlan), Deduplicate(Deduplicate), ReplaceInto(ReplaceInto), + // MergeInto + MergeIntoSource(MergeIntoSource), + MergeInto(MergeInto), } impl PhysicalPlan { @@ -1039,6 +1065,8 @@ impl PhysicalPlan { PhysicalPlan::MaterializedCte(v) => v.plan_id, PhysicalPlan::ConstantTableScan(v) => v.plan_id, PhysicalPlan::DeletePartial(_) + | PhysicalPlan::MergeInto(_) + | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::MutationAggregate(_) | PhysicalPlan::CopyIntoTable(_) | PhysicalPlan::AsyncSourcer(_) @@ -1078,7 +1106,9 @@ impl PhysicalPlan { PhysicalPlan::CteScan(plan) => plan.output_schema(), PhysicalPlan::MaterializedCte(plan) => plan.output_schema(), PhysicalPlan::ConstantTableScan(plan) => plan.output_schema(), + PhysicalPlan::MergeIntoSource(plan) => plan.input.output_schema(), PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::MergeInto(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) => Ok(DataSchemaRef::default()), } @@ -1113,6 +1143,8 @@ impl PhysicalPlan { PhysicalPlan::AsyncSourcer(_) => "AsyncSourcer".to_string(), PhysicalPlan::Deduplicate(_) => "Deduplicate".to_string(), PhysicalPlan::ReplaceInto(_) => "Replace".to_string(), + PhysicalPlan::MergeInto(_) => "MergeInto".to_string(), + PhysicalPlan::MergeIntoSource(_) => "MergeIntoSource".to_string(), PhysicalPlan::CteScan(_) => "PhysicalCteScan".to_string(), PhysicalPlan::MaterializedCte(_) => "PhysicalMaterializedCte".to_string(), PhysicalPlan::ConstantTableScan(_) => "PhysicalConstantTableScan".to_string(), @@ -1161,6 +1193,8 @@ impl PhysicalPlan { PhysicalPlan::AsyncSourcer(_) => Box::new(std::iter::empty()), PhysicalPlan::Deduplicate(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ReplaceInto(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::MergeInto(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::MergeIntoSource(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::MaterializedCte(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), @@ -1198,6 +1232,8 @@ impl PhysicalPlan { | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) + | PhysicalPlan::MergeInto(_) + | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::ConstantTableScan(_) | PhysicalPlan::CteScan(_) => None, } diff --git a/src/query/sql/src/executor/physical_plan_display.rs b/src/query/sql/src/executor/physical_plan_display.rs index 92d12ebcaa25..d630cf2056ab 100644 --- a/src/query/sql/src/executor/physical_plan_display.rs +++ b/src/query/sql/src/executor/physical_plan_display.rs @@ -24,6 +24,8 @@ use super::CopyIntoTable; use super::Deduplicate; use super::DeletePartial; use super::DistributedInsertSelect; +use super::MergeInto; +use super::MergeIntoSource; use super::MutationAggregate; use super::ProjectSet; use super::ReplaceInto; @@ -94,6 +96,8 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::AsyncSourcer(async_sourcer) => write!(f, "{}", async_sourcer)?, PhysicalPlan::Deduplicate(deduplicate) => write!(f, "{}", deduplicate)?, PhysicalPlan::ReplaceInto(replace) => write!(f, "{}", replace)?, + PhysicalPlan::MergeIntoSource(merge_into_source) => write!(f, "{}", merge_into_source)?, + PhysicalPlan::MergeInto(merge_into) => write!(f, "{}", merge_into)?, PhysicalPlan::CteScan(cte_scan) => write!(f, "{}", cte_scan)?, PhysicalPlan::MaterializedCte(plan) => write!(f, "{}", plan)?, PhysicalPlan::ConstantTableScan(scan) => write!(f, "{}", scan)?, @@ -451,6 +455,18 @@ impl Display for ReplaceInto { } } +impl Display for MergeInto { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MergeInto") + } +} + +impl Display for MergeIntoSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MergeIntoSource") + } +} + impl Display for Lambda { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let scalars = self diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index ca0d310b7af0..c92446eb2791 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -31,6 +31,8 @@ use super::Filter; use super::HashJoin; use super::Lambda; use super::Limit; +use super::MergeInto; +use super::MergeIntoSource; use super::MutationAggregate; use super::PhysicalPlan; use super::Project; @@ -79,6 +81,8 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::AsyncSourcer(plan) => self.replace_async_sourcer(plan), PhysicalPlan::Deduplicate(plan) => self.replace_deduplicate(plan), PhysicalPlan::ReplaceInto(plan) => self.replace_replace_into(plan), + PhysicalPlan::MergeInto(plan) => self.replace_merge_into(plan), + PhysicalPlan::MergeIntoSource(plan) => self.replace_merge_into_source(plan), PhysicalPlan::MaterializedCte(plan) => self.replace_materialized_cte(plan), PhysicalPlan::ConstantTableScan(plan) => self.replace_constant_table_scan(plan), } @@ -394,6 +398,22 @@ pub trait PhysicalPlanReplacer { })) } + fn replace_merge_into(&mut self, plan: &MergeInto) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::MergeInto(MergeInto { + input: Box::new(input), + ..plan.clone() + })) + } + + fn replace_merge_into_source(&mut self, plan: &MergeIntoSource) -> Result { + let input = self.replace(&plan.input)?; + Ok(PhysicalPlan::MergeIntoSource(MergeIntoSource { + input: Box::new(input), + ..plan.clone() + })) + } + fn replace_project_set(&mut self, plan: &ProjectSet) -> Result { let input = self.replace(&plan.input)?; Ok(PhysicalPlan::ProjectSet(ProjectSet { @@ -523,6 +543,12 @@ impl PhysicalPlan { PhysicalPlan::ReplaceInto(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } + PhysicalPlan::MergeIntoSource(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } + PhysicalPlan::MergeInto(plan) => { + Self::traverse(&plan.input, pre_visit, visit, post_visit); + } PhysicalPlan::MaterializedCte(plan) => { Self::traverse(&plan.left, pre_visit, visit, post_visit); Self::traverse(&plan.right, pre_visit, visit, post_visit); diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs index f17fad24a114..da7a0937e305 100644 --- a/src/query/sql/src/executor/profile.rs +++ b/src/query/sql/src/executor/profile.rs @@ -508,6 +508,8 @@ fn flatten_plan_node_profile( | PhysicalPlan::MutationAggregate(_) | PhysicalPlan::CopyIntoTable(_) | PhysicalPlan::AsyncSourcer(_) + | PhysicalPlan::MergeInto(_) + | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) => unreachable!(), } diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index d336d4793e1b..310fd1593bcb 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -379,6 +379,14 @@ impl<'a> Binder { } self.bind_replace(bind_context, stmt).await? } + Statement::MergeInto(stmt) => { + if let Some(hints) = &stmt.hints { + if let Some(e) = self.opt_hints_set_var(bind_context, hints).await.err() { + warn!("In Merge resolve optimize hints {:?} failed, err: {:?}", hints, e); + } + } + self.bind_merge_into(bind_context, stmt).await? + }, Statement::Delete { hints, table_reference, diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs new file mode 100644 index 000000000000..fa2c6e0268f4 --- /dev/null +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -0,0 +1,391 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + +use common_ast::ast::Join; +use common_ast::ast::JoinCondition; +use common_ast::ast::JoinOperator::LeftOuter; +use common_ast::ast::MatchOperation; +use common_ast::ast::MatchedClause; +use common_ast::ast::MergeIntoStmt; +use common_ast::ast::TableReference; +use common_ast::ast::UnmatchedClause; +use common_catalog::plan::InternalColumn; +use common_catalog::plan::InternalColumnType; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::DataType; +use common_expression::TableSchemaRef; +use common_expression::ROW_ID_COL_NAME; +use indexmap::IndexMap; + +use super::wrap_cast_scalar; +use crate::binder::Binder; +use crate::binder::InternalColumnBinding; +use crate::normalize_identifier; +use crate::optimizer::SExpr; +use crate::plans::MatchedEvaluator; +use crate::plans::MergeInto; +use crate::plans::Plan; +use crate::plans::UnmatchedEvaluator; +use crate::BindContext; +use crate::ColumnEntry; +use crate::IndexType; +use crate::ScalarBinder; +use crate::ScalarExpr; + +// implementation of merge into for now: +// use an left outer join for target_source and source. +impl Binder { + #[allow(warnings)] + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_merge_into( + &mut self, + bind_context: &mut BindContext, + stmt: &MergeIntoStmt, + ) -> Result { + if !self + .ctx + .get_settings() + .get_enable_experimental_merge_into() + .unwrap_or_default() + { + return Err(ErrorCode::Unimplemented( + "merge into is unstable for now, you can use 'set enable_experimental_merge_into = 1' to set up it", + )); + } + let MergeIntoStmt { + catalog, + database, + table_ident, + source, + alias_target, + join_expr, + merge_options, + .. + } = stmt; + + if merge_options.is_empty() { + return Err(ErrorCode::BadArguments( + "at least one matched or unmatched clause for merge into", + )); + } + + let (matched_clauses, unmatched_clauses) = stmt.split_clauses(); + let mut unmatched_evaluators = + Vec::::with_capacity(unmatched_clauses.len()); + let mut matched_evaluators = Vec::::with_capacity(matched_clauses.len()); + // check clause semantic + MergeIntoStmt::check_multi_match_clauses_semantic(&matched_clauses)?; + MergeIntoStmt::check_multi_unmatch_clauses_semantic(&unmatched_clauses)?; + + let (catalog_name, database_name, table_name) = + self.normalize_object_identifier_triple(catalog, database, table_ident); + + let table = self + .ctx + .get_table(&catalog_name, &database_name, &table_name) + .await?; + let table_id = table.get_id(); + let table_schema = table.schema(); + // Todo: (JackTan25) support computed expr + for filed in table.schema().fields() { + if filed.computed_expr().is_some() { + return Err(ErrorCode::Unimplemented( + "merge into doesn't support computed expr for now", + )); + } + } + // get target_table_reference + let target_table = TableReference::Table { + span: None, + catalog: catalog.clone(), + database: database.clone(), + table: table_ident.clone(), + alias: alias_target.clone(), + travel_point: None, + pivot: None, + unpivot: None, + }; + + // get_source_table_reference + let source_data = source.transform_table_reference(); + + // bind source data + let (source_expr, mut left_context) = self + .bind_merge_into_source(bind_context, None, &source.clone()) + .await?; + + // add all left source columns for read + let mut columns_set = left_context.column_set(); + + // bind table for target table + let (mut target_expr, mut right_context) = self + .bind_single_table(&mut left_context, &target_table) + .await?; + + // add internal_column (_row_id) + let table_index = self + .metadata + .read() + .get_table_index(Some(database_name.as_str()), table_name.as_str()) + .expect("can't get target_table binding"); + + let row_id_column_binding = InternalColumnBinding { + database_name: Some(database_name.clone()), + table_name: Some(table_name.clone()), + internal_column: InternalColumn { + column_name: ROW_ID_COL_NAME.to_string(), + column_type: InternalColumnType::RowId, + }, + }; + + let column_binding = right_context + .add_internal_column_binding(&row_id_column_binding, self.metadata.clone())?; + + target_expr = + SExpr::add_internal_column_index(&target_expr, table_index, column_binding.index); + + self.metadata + .write() + .set_table_row_id_index(table_index, column_binding.index); + // add row_id_idx + columns_set.insert(column_binding.index); + + // add join,use left outer join in V1, we use _row_id to check_duplicate join row. + let join = Join { + op: LeftOuter, + condition: JoinCondition::On(Box::new(join_expr.clone())), + left: Box::new(source_data.clone()), + right: Box::new(target_table), + }; + + let (join_sexpr, bind_ctx) = self + .bind_join( + bind_context, + left_context, + right_context.clone(), + source_expr, + target_expr, + &join, + ) + .await?; + + let name_resolution_ctx = self.name_resolution_ctx.clone(); + let mut scalar_binder = ScalarBinder::new( + &mut right_context, + self.ctx.clone(), + &name_resolution_ctx, + self.metadata.clone(), + &[], + HashMap::new(), + Box::new(IndexMap::new()), + ); + // add join condition used column idx + columns_set = columns_set + .union(&scalar_binder.bind(join_expr).await?.0.used_columns()) + .cloned() + .collect(); + + let column_entries = self.metadata.read().columns_by_table_index(table_index); + + // bind clause column + for clause in &matched_clauses { + matched_evaluators.push( + self.bind_matched_clause( + &mut scalar_binder, + clause, + &mut columns_set, + table_schema.clone(), + &column_entries, + ) + .await?, + ); + } + + // add eval exprs for not match + for clause in &unmatched_clauses { + unmatched_evaluators.push( + self.bind_unmatched_clause( + &mut scalar_binder, + clause, + &mut columns_set, + table_schema.clone(), + ) + .await?, + ); + } + + Ok(Plan::MergeInto(Box::new(MergeInto { + catalog: catalog_name.to_string(), + database: database_name.to_string(), + table: table_name, + table_id, + bind_context: Box::new(bind_ctx.clone()), + meta_data: self.metadata.clone(), + input: Box::new(join_sexpr.clone()), + columns_set: Box::new(columns_set), + matched_evaluators, + unmatched_evaluators, + target_table_idx: table_index, + }))) + } + + async fn bind_matched_clause<'a>( + &mut self, + scalar_binder: &mut ScalarBinder<'a>, + clause: &MatchedClause, + columns: &mut HashSet, + schema: TableSchemaRef, + column_entries: &Vec, + ) -> Result { + let condition = if let Some(expr) = &clause.selection { + let (scalar_expr, _) = scalar_binder.bind(expr).await?; + for idx in scalar_expr.used_columns() { + columns.insert(idx); + } + Some(scalar_expr) + } else { + None + }; + + if let MatchOperation::Update { update_list } = &clause.operation { + let mut update_columns = HashMap::with_capacity(update_list.len()); + for update_expr in update_list { + let (scalar_expr, _) = scalar_binder.bind(&update_expr.expr).await?; + let col_name = + normalize_identifier(&update_expr.name, &self.name_resolution_ctx).name; + + columns.insert(self.find_column_index(column_entries, &col_name)?); + + let index = schema.index_of(&col_name)?; + + if update_columns.contains_key(&index) { + return Err(ErrorCode::BadArguments(format!( + "Multiple assignments in the single statement to column `{}`", + col_name + ))); + } + + let field = schema.field(index); + if field.computed_expr().is_some() { + return Err(ErrorCode::BadArguments(format!( + "The value specified for computed column '{}' is not allowed", + field.name() + ))); + } + + if matches!(scalar_expr, ScalarExpr::SubqueryExpr(_)) { + return Err(ErrorCode::Internal( + "update_list in update clause does not support subquery temporarily", + )); + } + update_columns.insert(index, scalar_expr.clone()); + + for idx in scalar_expr.used_columns() { + columns.insert(idx); + } + } + + Ok(MatchedEvaluator { + condition, + update: Some(update_columns), + }) + } else { + Ok(MatchedEvaluator { + condition, + update: None, + }) + } + } + + async fn bind_unmatched_clause<'a>( + &mut self, + scalar_binder: &mut ScalarBinder<'a>, + clause: &UnmatchedClause, + columns: &mut HashSet, + table_schema: TableSchemaRef, + ) -> Result { + let condition = if let Some(expr) = &clause.selection { + let (scalar_expr, _) = scalar_binder.bind(expr).await?; + for idx in scalar_expr.used_columns() { + columns.insert(idx); + } + Some(scalar_expr) + } else { + None + }; + + if clause.insert_operation.values.is_empty() { + return Err(ErrorCode::SemanticError( + "Values lists must have at least one row".to_string(), + )); + } + + let mut values = Vec::with_capacity(clause.insert_operation.values.len()); + + // we need to get source schema, and use it for filling columns. + let source_schema = if let Some(fields) = clause.insert_operation.columns.clone() { + self.schema_project(&table_schema, &fields)? + } else { + table_schema.clone() + }; + + if source_schema != table_schema { + return Err(ErrorCode::BadArguments( + "for now, we need to make sure the input schema same with table schema", + )); + } + + for (idx, expr) in clause.insert_operation.values.iter().enumerate() { + let (mut scalar_expr, _) = scalar_binder.bind(expr).await?; + // type cast + scalar_expr = wrap_cast_scalar( + &scalar_expr, + &scalar_expr.data_type()?, + &DataType::from(source_schema.field(idx).data_type()), + )?; + + values.push(scalar_expr.clone()); + for idx in scalar_expr.used_columns() { + columns.insert(idx); + } + } + + Ok(UnmatchedEvaluator { + source_schema: Arc::new(source_schema.into()), + condition, + values, + }) + } + + fn find_column_index( + &self, + column_entries: &Vec, + col_name: &str, + ) -> Result { + for column_entry in column_entries { + if col_name == column_entry.name() { + return Ok(column_entry.index()); + } + } + Err(ErrorCode::BadArguments(format!( + "not found col name: {}", + col_name + ))) + } +} diff --git a/src/query/sql/src/planner/binder/mod.rs b/src/query/sql/src/planner/binder/mod.rs index 625df722f45e..ab4bf29e93eb 100644 --- a/src/query/sql/src/planner/binder/mod.rs +++ b/src/query/sql/src/planner/binder/mod.rs @@ -32,6 +32,7 @@ mod kill; mod lambda; mod limit; mod location; +mod merge_into; mod presign; mod project; mod project_set; @@ -49,7 +50,6 @@ mod table_args; mod update; mod values; mod window; - pub use aggregate::AggregateInfo; pub use bind_context::*; pub use binder::Binder; diff --git a/src/query/sql/src/planner/binder/replace.rs b/src/query/sql/src/planner/binder/replace.rs index 983eca365598..ddc7747a9fe7 100644 --- a/src/query/sql/src/planner/binder/replace.rs +++ b/src/query/sql/src/planner/binder/replace.rs @@ -49,15 +49,9 @@ impl Binder { .. } = stmt; - let catalog_name = catalog.as_ref().map_or_else( - || self.ctx.get_current_catalog(), - |ident| normalize_identifier(ident, &self.name_resolution_ctx).name, - ); - let database_name = database.as_ref().map_or_else( - || self.ctx.get_current_database(), - |ident| normalize_identifier(ident, &self.name_resolution_ctx).name, - ); - let table_name = normalize_identifier(table, &self.name_resolution_ctx).name; + let (catalog_name, database_name, table_name) = + self.normalize_object_identifier_triple(catalog, database, table); + let table = self .ctx .get_table(&catalog_name, &database_name, &table_name) diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index fd52f7ffec60..bf263f7a0397 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -23,6 +23,7 @@ use chrono::TimeZone; use chrono::Utc; use common_ast::ast::Indirection; use common_ast::ast::Join; +use common_ast::ast::MergeSource; use common_ast::ast::SelectStmt; use common_ast::ast::SelectTarget; use common_ast::ast::Statement; @@ -572,6 +573,25 @@ impl Binder { } } + #[async_backtrace::framed] + pub(crate) async fn bind_merge_into_source( + &mut self, + bind_context: &mut BindContext, + _span: Span, + source: &MergeSource, + ) -> Result<(SExpr, BindContext)> { + // merge source has three kinds type + // a. values b. streamingV2 c. query + match source { + MergeSource::Select { query } => self.bind_query(bind_context, query).await, + MergeSource::StreamingV2 { + settings: _, + on_error_mode: _, + start: _, + } => unimplemented!(), + } + } + #[async_backtrace::framed] pub(crate) async fn bind_stage_table( &mut self, diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index c96b65c8e09b..e1aec460154a 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -121,6 +121,7 @@ impl Plan { // Insert Plan::Insert(insert) => Ok(format!("{:?}", insert)), Plan::Replace(replace) => Ok(format!("{:?}", replace)), + Plan::MergeInto(merge_into) => Ok(format!("{:?}", merge_into)), Plan::Delete(delete) => format_delete(delete), Plan::Update(update) => Ok(format!("{:?}", update)), diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs new file mode 100644 index 000000000000..08e8265fdc96 --- /dev/null +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::HashSet; + +use common_expression::DataSchemaRef; +use common_expression::FieldIndex; +use common_meta_types::MetaId; + +use crate::optimizer::SExpr; +use crate::BindContext; +use crate::IndexType; +use crate::MetadataRef; +use crate::ScalarExpr; + +// for unmatched clause, we need to calculate the +#[derive(Clone)] +pub struct UnmatchedEvaluator { + pub source_schema: DataSchemaRef, + pub condition: Option, + pub values: Vec, +} + +#[derive(Clone)] +pub struct MatchedEvaluator { + pub condition: Option, + // table_schema.idx -> update_expression + // Some => update + // None => delete + pub update: Option>, +} + +#[derive(Clone)] +pub struct MergeInto { + pub catalog: String, + pub database: String, + pub table: String, + pub table_id: MetaId, + pub input: Box, + pub bind_context: Box, + pub columns_set: Box>, + pub meta_data: MetadataRef, + pub matched_evaluators: Vec, + pub unmatched_evaluators: Vec, + pub target_table_idx: usize, +} + +impl std::fmt::Debug for MergeInto { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + todo!() + } +} diff --git a/src/query/sql/src/planner/plans/mod.rs b/src/query/sql/src/planner/plans/mod.rs index 8f5bbc49cc49..2cac567cdd20 100644 --- a/src/query/sql/src/planner/plans/mod.rs +++ b/src/query/sql/src/planner/plans/mod.rs @@ -30,6 +30,8 @@ mod kill; mod lambda; mod limit; mod materialized_cte; +mod merge_into; + pub mod operator; mod pattern; mod plan; @@ -47,7 +49,6 @@ mod sort; mod union_all; mod update; mod window; - pub use aggregate::*; pub use call::CallPlan; pub use constant_table_scan::ConstantTableScan; @@ -68,6 +69,9 @@ pub use kill::KillPlan; pub use lambda::*; pub use limit::*; pub use materialized_cte::MaterializedCte; +pub use merge_into::MatchedEvaluator; +pub use merge_into::MergeInto; +pub use merge_into::UnmatchedEvaluator; pub use operator::*; pub use pattern::PatternPlan; pub use plan::*; diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 99812f917f17..dc5ba621174d 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -109,7 +109,6 @@ pub enum RelOperator { MaterializedCte(MaterializedCte), Lambda(Lambda), ConstantTableScan(ConstantTableScan), - Pattern(PatternPlan), } diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 6d5b7d612682..5297fb11da13 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -31,6 +31,7 @@ use super::DescDatamaskPolicyPlan; use super::DropDatamaskPolicyPlan; use super::DropIndexPlan; use super::DropShareEndpointPlan; +use super::MergeInto; use super::ModifyTableColumnPlan; use super::RenameTableColumnPlan; use super::SetOptionsPlan; @@ -192,7 +193,7 @@ pub enum Plan { Replace(Box), Delete(Box), Update(Box), - + MergeInto(Box), // Views CreateView(Box), AlterView(Box), @@ -413,6 +414,7 @@ impl Display for Plan { Plan::DropNetworkPolicy(_) => write!(f, "DropNetworkPolicy"), Plan::DescNetworkPolicy(_) => write!(f, "DescNetworkPolicy"), Plan::ShowNetworkPolicies(_) => write!(f, "ShowNetworkPolicies"), + Plan::MergeInto(_) => write!(f, "MergeInto"), } } } diff --git a/src/query/sql/src/planner/plans/update.rs b/src/query/sql/src/planner/plans/update.rs index 694c8b46bfbe..e7f572162e5a 100644 --- a/src/query/sql/src/planner/plans/update.rs +++ b/src/query/sql/src/planner/plans/update.rs @@ -64,6 +64,7 @@ impl UpdatePlan { ctx: Arc, schema: DataSchema, col_indices: Vec, + use_column_name_index: bool, ) -> Result)>> { let column = ColumnBindingBuilder::new( PREDICATE_COLUMN_NAME.to_string(), @@ -113,9 +114,13 @@ impl UpdatePlan { arguments: vec![predicate.clone(), left, right], }) }; - let expr = scalar - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); + let expr = scalar.as_expr()?.project_column_ref(|col| { + if use_column_name_index { + col.column_name.clone() + } else { + col.index.to_string() + } + }); let (expr, _) = ConstantFolder::fold(&expr, &ctx.get_function_context()?, &BUILTIN_FUNCTIONS); acc.push((*index, expr.as_remote_expr())); diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs new file mode 100644 index 000000000000..7dbfd9ab1898 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::base::tokio::sync::Semaphore; +use common_catalog::table_context::TableContext; +use common_exception::Result; +use common_expression::DataSchemaRef; +use common_pipeline_core::pipe::PipeItem; +use common_sql::executor::MatchExpr; +use storages_common_table_meta::meta::Location; + +use super::merge_into::MatchedAggregator; +use super::mutation::SegmentIndex; +use crate::io::BlockBuilder; +use crate::io::ReadSettings; +use crate::FuseTable; + +impl FuseTable { + // todo: (JackTan25) add pipeline picture + #[allow(clippy::too_many_arguments)] + pub fn matched_mutator( + &self, + ctx: Arc, + block_builder: BlockBuilder, + io_request_semaphore: Arc, + row_id_idx: usize, + matched: MatchExpr, + input_schema: DataSchemaRef, + segment_locations: Vec<(SegmentIndex, Location)>, + ) -> Result { + let read_settings = ReadSettings::from_ctx(&ctx)?; + let aggregator = MatchedAggregator::create( + ctx.clone(), + row_id_idx, + matched, + self.table_info.schema(), + input_schema, + self.get_operator(), + self.get_write_settings(), + read_settings, + block_builder, + io_request_semaphore, + segment_locations, + )?; + Ok(aggregator.into_pipe_item()) + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/mod.rs b/src/query/storages/fuse/src/operations/merge_into/mod.rs new file mode 100644 index 000000000000..cc172a749722 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod mutator; +mod processors; + +pub use mutator::MatchedAggregator; +pub use processors::MergeIntoNotMatchedProcessor; +pub use processors::MergeIntoSplitProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs new file mode 100644 index 000000000000..dbe01e4de969 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -0,0 +1,517 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::Arc; + +use ahash::AHashMap; +use common_arrow::arrow::bitmap::MutableBitmap; +use common_arrow::arrow::buffer::Buffer; +use common_base::base::tokio::sync::Semaphore; +use common_base::runtime::GlobalIORuntime; +use common_base::runtime::TrySpawn; +use common_catalog::plan::split_prefix; +use common_catalog::plan::split_row_id; +use common_catalog::plan::Projection; +use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::NumberColumn; +use common_expression::Column; +use common_expression::DataBlock; +use common_expression::DataSchemaRef; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::TableSchemaRef; +use common_functions::BUILTIN_FUNCTIONS; +use common_sql::evaluator::BlockOperator; +use common_sql::executor::MatchExpr; +use common_storage::common_metrics::merge_into::metrics_inc_merge_into_replace_blocks_counter; +use itertools::Itertools; +use log::info; +use opendal::Operator; +use storages_common_cache::LoadParams; +use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::Location; +use storages_common_table_meta::meta::SegmentInfo; + +use crate::io::write_data; +use crate::io::BlockBuilder; +use crate::io::BlockReader; +use crate::io::CompactSegmentInfoReader; +use crate::io::MetaReaders; +use crate::io::ReadSettings; +use crate::io::WriteSettings; +use crate::operations::acquire_task_permit; +use crate::operations::common::MutationLogEntry; +use crate::operations::common::MutationLogs; +use crate::operations::merge_into::mutator::SplitByExprMutator; +use crate::operations::mutation::BlockIndex; +use crate::operations::mutation::SegmentIndex; +use crate::operations::read_block; +use crate::operations::BlockMetaIndex; + +enum MutationKind { + Update(UpdateDataBlockMutation), + Delete(DeleteDataBlockMutation), +} + +struct UpdateDataBlockMutation { + op: BlockOperator, + split_mutator: SplitByExprMutator, +} + +struct DeleteDataBlockMutation { + split_mutator: SplitByExprMutator, +} + +struct AggregationContext { + row_id_idx: usize, + ops: Vec, + func_ctx: FunctionContext, + data_accessor: Operator, + write_settings: WriteSettings, + read_settings: ReadSettings, + block_builder: BlockBuilder, + block_reader: Arc, +} + +type RemainMap = HashMap, Vec)>; +type UpdatedMap = HashMap, DataBlock)>; + +pub struct MatchedAggregator { + io_request_semaphore: Arc, + segment_reader: CompactSegmentInfoReader, + segment_locations: AHashMap, + // (update_idx,(updated_columns,remain_columns)) + remain_projections_map: Arc, + // block_mutator, store new data after update, + // BlockMetaIndex => (update_idx,(block_offsets,new_data)) + // todo: (JackTan25) need to add precomputed expr for update to optimize + updatede_block: HashMap, + // store the row_id which is deleted/updated + block_mutation_row_offset: HashMap>, + aggregation_ctx: Arc, +} + +impl MatchedAggregator { + #[allow(clippy::too_many_arguments)] + pub fn create( + ctx: Arc, + row_id_idx: usize, + matched: MatchExpr, + target_table_schema: TableSchemaRef, + input_schema: DataSchemaRef, + data_accessor: Operator, + write_settings: WriteSettings, + read_settings: ReadSettings, + block_builder: BlockBuilder, + io_request_semaphore: Arc, + segment_locations: Vec<(SegmentIndex, Location)>, + ) -> Result { + let segment_reader = + MetaReaders::segment_info_reader(data_accessor.clone(), target_table_schema.clone()); + let block_reader = { + let projection = + Projection::Columns((0..target_table_schema.num_fields()).collect_vec()); + BlockReader::create( + data_accessor.clone(), + target_table_schema.clone(), + projection, + ctx.clone(), + false, + ) + }?; + + let mut ops = Vec::::new(); + let mut remain_projections_map = HashMap::new(); + for (expr_idx, item) in matched.iter().enumerate() { + // delete + if item.1.is_none() { + let filter = item.0.as_ref().map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS)); + ops.push(MutationKind::Delete(DeleteDataBlockMutation { + split_mutator: SplitByExprMutator::create( + filter.clone(), + ctx.get_function_context()?, + ), + })) + } else { + let update_lists = item.1.as_ref().unwrap(); + let mut set = HashSet::new(); + let mut remain_projections = Vec::new(); + let mut collected_projections = Vec::new(); + let input_len = input_schema.num_fields(); + let eval_projections: HashSet = + (input_len..update_lists.len() + input_len).collect(); + + for (idx, _) in update_lists { + collected_projections.push(*idx); + set.insert(idx); + } + + for idx in 0..target_table_schema.num_fields() { + if !set.contains(&idx) { + remain_projections.push(idx); + } + } + + let exprs: Vec = update_lists + .iter() + .map(|item| item.1.as_expr(&BUILTIN_FUNCTIONS)) + .collect(); + + remain_projections_map + .insert(expr_idx, (collected_projections, remain_projections)); + let filter = item + .0 + .as_ref() + .map(|condition| condition.as_expr(&BUILTIN_FUNCTIONS)); + + ops.push(MutationKind::Update(UpdateDataBlockMutation { + op: BlockOperator::Map { + exprs, + projections: Some(eval_projections), + }, + split_mutator: SplitByExprMutator::create(filter, ctx.get_function_context()?), + })) + } + } + + Ok(Self { + aggregation_ctx: Arc::new(AggregationContext { + row_id_idx, + ops, + func_ctx: ctx.get_function_context()?, + write_settings, + read_settings, + data_accessor, + block_builder, + block_reader, + }), + io_request_semaphore, + segment_reader, + updatede_block: HashMap::new(), + block_mutation_row_offset: HashMap::new(), + remain_projections_map: Arc::new(remain_projections_map), + segment_locations: AHashMap::from_iter(segment_locations.into_iter()), + }) + } + + #[async_backtrace::framed] + pub async fn accumulate(&mut self, data_block: DataBlock) -> Result<()> { + if data_block.is_empty() { + return Ok(()); + } + let mut current_block = data_block; + for (expr_idx, op) in self.aggregation_ctx.ops.iter().enumerate() { + match op { + MutationKind::Update(update_mutation) => { + let (satisfied_block, unsatisfied_block) = + update_mutation.split_mutator.split_by_expr(current_block)?; + + if !satisfied_block.is_empty() { + let row_ids = + get_row_id(&satisfied_block, self.aggregation_ctx.row_id_idx)?; + let updated_block = update_mutation + .op + .execute(&self.aggregation_ctx.func_ctx, satisfied_block)?; + // record the modified block offsets + for (idx, row_id) in row_ids.iter().enumerate() { + let (prefix, offset) = split_row_id(*row_id); + + self.updatede_block + .entry(prefix) + .and_modify(|v| { + let (mut old_offsets, old_block) = v.remove(&expr_idx).unwrap(); + old_offsets.push(offset as usize); + v.insert( + expr_idx, + ( + old_offsets, + DataBlock::concat(&[ + old_block, + updated_block.slice(idx..idx + 1), + ]) + .unwrap(), + ), + ); + }) + .or_insert(|| -> HashMap, DataBlock)> { + let mut m = HashMap::new(); + m.insert( + expr_idx, + (vec![offset as usize], updated_block.slice(idx..idx + 1)), + ); + m + }()); + + self.block_mutation_row_offset + .entry(prefix) + .and_modify(|v| { + v.insert(offset as usize); + }) + .or_insert(vec![offset as usize].into_iter().collect()); + } + } + + if unsatisfied_block.is_empty() { + return Ok(()); + } + + current_block = unsatisfied_block; + } + + MutationKind::Delete(delete_mutation) => { + let (satisfied_block, unsatisfied_block) = + delete_mutation.split_mutator.split_by_expr(current_block)?; + if !satisfied_block.is_empty() { + let row_ids = + get_row_id(&satisfied_block, self.aggregation_ctx.row_id_idx)?; + + // record the modified block offsets + for row_id in row_ids { + let (prefix, offset) = split_row_id(row_id); + + self.block_mutation_row_offset + .entry(prefix) + .and_modify(|v| { + v.insert(offset as usize); + }) + .or_insert(vec![offset as usize].into_iter().collect()); + } + } + + if unsatisfied_block.is_empty() { + return Ok(()); + } + + current_block = unsatisfied_block; + } + } + } + Ok(()) + } + + #[async_backtrace::framed] + pub async fn apply(&mut self) -> Result> { + // 1.get modified segments + let mut segment_infos = HashMap::::new(); + for prefix in self.block_mutation_row_offset.keys() { + let (segment_idx, _) = split_prefix(*prefix); + let segment_idx = segment_idx as usize; + + if let Entry::Vacant(e) = segment_infos.entry(segment_idx) { + let (path, ver) = self.segment_locations.get(&segment_idx).ok_or_else(|| { + ErrorCode::Internal(format!( + "unexpected, segment (idx {}) not found, during applying mutation log", + segment_idx + )) + })?; + + let load_param = LoadParams { + location: path.clone(), + len_hint: None, + ver: *ver, + put_cache: true, + }; + + let compact_segment_info = self.segment_reader.read(&load_param).await?; + let segment_info: SegmentInfo = compact_segment_info.try_into()?; + e.insert(segment_info); + } else { + continue; + } + } + + let io_runtime = GlobalIORuntime::instance(); + let mut mutation_log_handlers = Vec::with_capacity(self.block_mutation_row_offset.len()); + + for item in &self.block_mutation_row_offset { + let (segment_idx, block_idx) = split_prefix(*item.0); + let segment_idx = segment_idx as usize; + let block_idx = block_idx as usize; + let permit = acquire_task_permit(self.io_request_semaphore.clone()).await?; + let aggregation_ctx = self.aggregation_ctx.clone(); + let block_meta = segment_infos.get(&segment_idx).unwrap().blocks[block_idx].clone(); + let remain_projections = self.remain_projections_map.clone(); + let updated_block_info = self.updatede_block.remove(item.0); + let modified_offsets = item.1.clone(); + let handle = io_runtime.spawn(async_backtrace::location!().frame({ + async move { + let mutation_log_entry = aggregation_ctx + .apply_update_and_deletion_to_data_block( + segment_idx, + block_idx, + &block_meta, + updated_block_info, + remain_projections, + modified_offsets, + ) + .await?; + + drop(permit); + Ok::<_, ErrorCode>(mutation_log_entry) + } + })); + mutation_log_handlers.push(handle); + } + + let log_entries = futures::future::try_join_all(mutation_log_handlers) + .await + .map_err(|e| { + ErrorCode::Internal("unexpected, failed to join apply-deletion tasks.") + .add_message_back(e.to_string()) + })?; + let mut mutation_logs = Vec::new(); + for maybe_log_entry in log_entries { + if let Some(segment_mutation_log) = maybe_log_entry? { + mutation_logs.push(segment_mutation_log); + } + } + Ok(Some(MutationLogs { + entries: mutation_logs, + })) + } +} + +impl AggregationContext { + #[async_backtrace::framed] + async fn apply_update_and_deletion_to_data_block( + &self, + segment_idx: SegmentIndex, + block_idx: BlockIndex, + block_meta: &BlockMeta, + block_updated: Option, + remain_projections: Arc, + modified_offsets: HashSet, + ) -> Result> { + info!( + "apply update and delete to segment idx {}, block idx {}", + segment_idx, block_idx, + ); + + let origin_data_block = read_block( + self.write_settings.storage_format, + &self.block_reader, + block_meta, + &self.read_settings, + ) + .await?; + let mut res_blocks = Vec::new(); + let get_block = |data_block: &DataBlock, offsets: &Vec| -> Result { + let mut blocks = Vec::with_capacity(offsets.len()); + for offset in offsets { + blocks.push(data_block.slice(*offset..*offset + 1)); + } + DataBlock::concat(&blocks) + }; + // remain columns for update + if block_updated.is_some() { + for (expr_idx, (offsets, mut updated_block)) in block_updated.unwrap() { + let remain = remain_projections.get(&expr_idx).unwrap().1.clone(); + let mut collected = remain_projections.get(&expr_idx).unwrap().0.clone(); + collected.extend(remain.clone()); + let block_operator = BlockOperator::Project { + projection: remain.clone(), + }; + let remain_block = block_operator + .execute(&self.func_ctx, get_block(&origin_data_block, &offsets)?)?; + for col in remain_block.columns() { + updated_block.add_column(col.clone()); + } + + // sort columns + let mut projection = (0..collected.len()).collect::>(); + projection.sort_by_key(|&i| collected[i]); + let sort_operator = BlockOperator::Project { projection }; + let res_block = sort_operator.execute(&self.func_ctx, updated_block)?; + if !res_block.is_empty() { + res_blocks.push(res_block); + } + } + } + + // apply delete + let mut bitmap = MutableBitmap::new(); + for row in 0..origin_data_block.num_rows() { + if modified_offsets.contains(&row) { + bitmap.push(false); + } else { + bitmap.push(true); + } + } + let res_block = origin_data_block.filter_with_bitmap(&bitmap.into())?; + if !res_block.is_empty() { + res_blocks.push(res_block); + } + + if res_blocks.is_empty() { + return Ok(Some(MutationLogEntry::DeletedBlock { + index: BlockMetaIndex { + segment_idx, + block_idx, + }, + })); + } + let res_block = DataBlock::concat(&res_blocks)?; + + // serialization and compression is cpu intensive, send them to dedicated thread pool + // and wait (asyncly, which will NOT block the executor thread) + let block_builder = self.block_builder.clone(); + let origin_stats = block_meta.cluster_stats.clone(); + let serialized = GlobalIORuntime::instance() + .spawn_blocking(move || { + block_builder.build(res_block, |block, generator| { + info!("serialize block before get cluster_stats:\n {:?}", block); + let cluster_stats = + generator.gen_with_origin_stats(&block, origin_stats.clone())?; + info!("serialize block after get cluster_stats:\n {:?}", block); + Ok((cluster_stats, block)) + }) + }) + .await?; + + // persistent data + let new_block_meta = serialized.block_meta; + let new_block_location = new_block_meta.location.0.clone(); + let new_block_raw_data = serialized.block_raw_data; + let data_accessor = self.data_accessor.clone(); + write_data(new_block_raw_data, &data_accessor, &new_block_location).await?; + + metrics_inc_merge_into_replace_blocks_counter(new_block_meta.row_count as u32); + // generate log + let mutation = MutationLogEntry::ReplacedBlock { + index: BlockMetaIndex { + segment_idx, + block_idx, + }, + block_meta: Arc::new(new_block_meta), + }; + + Ok(Some(mutation)) + } +} + +fn get_row_id(data_block: &DataBlock, row_id_idx: usize) -> Result> { + let row_id_col = data_block.get_by_offset(row_id_idx); + match row_id_col.value.as_column() { + Some(Column::Nullable(boxed)) => match &boxed.column { + Column::Number(NumberColumn::UInt64(data)) => Ok(data.clone()), + _ => Err(ErrorCode::BadArguments("row id is not uint64")), + }, + _ => Err(ErrorCode::BadArguments("row id is not uint64")), + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs new file mode 100644 index 000000000000..e3ab01e4326b --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs @@ -0,0 +1,102 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::ops::Not; + +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::bitmap::MutableBitmap; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::DataType; +use common_expression::types::NumberDataType; +use common_expression::types::NumberScalar; +use common_expression::BlockEntry; +use common_expression::DataBlock; +use common_expression::ScalarRef; + +pub struct MergeIntoSplitMutator { + pub row_id_idx: u32, + pub row_id_set: HashSet, +} + +impl MergeIntoSplitMutator { + pub fn try_create(row_id_idx: u32) -> Self { + Self { + row_id_idx, + row_id_set: HashSet::new(), + } + } + + // (matched_block,not_matched_block) + pub fn split_data_block(&mut self, block: &DataBlock) -> Result<(DataBlock, DataBlock)> { + let row_id_column = &block.columns()[self.row_id_idx as usize]; + assert_eq!( + row_id_column.data_type, + DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))), + ); + + for row_id_offset in 0..block.num_rows() { + self.check_duplicate(row_id_column, row_id_offset)? + } + + // get row_id do check duplicate and get filter + let filter: Bitmap = match &row_id_column.value { + common_expression::Value::Scalar(scalar) => { + let mut mutable_bitmap = MutableBitmap::new(); + if scalar.is_null() { + mutable_bitmap.push(false) + } else { + mutable_bitmap.push(true); + } + mutable_bitmap.into() + } + common_expression::Value::Column(column) => match column { + common_expression::Column::Nullable(nullable_column) => { + nullable_column.validity.clone() + } + _ => { + return Err(ErrorCode::InvalidRowIdIndex( + "row id column should be a nullable column, but it's a normal column", + )); + } + }, + }; + Ok(( + block.clone().filter_with_bitmap(&filter)?, + block.clone().filter_with_bitmap(&filter.not())?, + )) + } + + fn check_duplicate(&mut self, row_id_column: &BlockEntry, row_id_offset: usize) -> Result<()> { + match row_id_column.value.index(row_id_offset).ok_or_else(|| { + ErrorCode::Internal("can't get row_id_col when do merge into operations") + })? { + ScalarRef::Null => Ok(()), + ScalarRef::Number(NumberScalar::UInt64(v)) => { + if self.row_id_set.contains(&v) { + Err(ErrorCode::UnresolvableConflict( + "multi rows from source match one and the same row in the target_table multi times", + )) + } else { + self.row_id_set.insert(v); + Ok(()) + } + } + _ => Err(ErrorCode::Internal( + "row_id_type must be UInt64 for merge into", + )), + } + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/mod.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/mod.rs new file mode 100644 index 000000000000..f13b4a691c6f --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod matched_mutator; +mod merge_into_split_mutator; +mod split_by_expr_mutator; + +pub use matched_mutator::MatchedAggregator; +pub use merge_into_split_mutator::MergeIntoSplitMutator; +pub use split_by_expr_mutator::SplitByExprMutator; diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs new file mode 100644 index 000000000000..62429991e8c4 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/split_by_expr_mutator.rs @@ -0,0 +1,58 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Not; + +use common_exception::Result; +use common_expression::types::BooleanType; +use common_expression::types::DataType; +use common_expression::DataBlock; +use common_expression::Evaluator; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_functions::BUILTIN_FUNCTIONS; +use common_sql::executor::cast_expr_to_non_null_boolean; +pub struct SplitByExprMutator { + expr: Option, + func_ctx: FunctionContext, +} + +impl SplitByExprMutator { + pub fn create(expr: Option, func_ctx: FunctionContext) -> Self { + Self { expr, func_ctx } + } + + // first datablock satisfy expr, the second doesn't + pub fn split_by_expr(&self, data_block: DataBlock) -> Result<(DataBlock, DataBlock)> { + if self.expr.is_none() { + Ok((data_block, DataBlock::empty())) + } else { + let filter: Expr = cast_expr_to_non_null_boolean(self.expr.as_ref().unwrap().clone())?; + assert_eq!(filter.data_type(), &DataType::Boolean); + + let evaluator = Evaluator::new(&data_block, &self.func_ctx, &BUILTIN_FUNCTIONS); + + let predicates = evaluator + .run(&filter) + .map_err(|e| e.add_message("eval filter failed:"))? + .try_downcast::() + .unwrap(); + let filter = predicates.into_column().unwrap(); + Ok(( + data_block.clone().filter_with_bitmap(&filter)?, + data_block.filter_with_bitmap(&filter.not())?, + )) + } + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs new file mode 100644 index 000000000000..c879c3139e55 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod processor_merge_into_not_matched; +mod processor_merge_into_split; +mod transform_matched_mutation_aggregator; + +pub use processor_merge_into_not_matched::MergeIntoNotMatchedProcessor; +pub use processor_merge_into_split::MergeIntoSplitProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs new file mode 100644 index 000000000000..e33365bcd8fd --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs @@ -0,0 +1,182 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +use common_exception::Result; +use common_expression::DataBlock; +use common_expression::DataSchemaRef; +use common_expression::FunctionContext; +use common_expression::RemoteExpr; +use common_functions::BUILTIN_FUNCTIONS; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use common_sql::evaluator::BlockOperator; +use common_storage::common_metrics::merge_into::metrics_inc_merge_into_append_blocks_counter; +use itertools::Itertools; + +use crate::operations::merge_into::mutator::SplitByExprMutator; + +type UnMatchedExprs = Vec<(DataSchemaRef, Option, Vec)>; + +struct InsertDataBlockMutation { + op: BlockOperator, + split_mutator: SplitByExprMutator, +} + +// need to evaluate expression and +pub struct MergeIntoNotMatchedProcessor { + input_port: Arc, + output_port: Arc, + ops: Vec, + input_data: Option, + output_data: Option, + func_ctx: FunctionContext, +} + +impl MergeIntoNotMatchedProcessor { + pub fn create( + unmatched: UnMatchedExprs, + input_schema: DataSchemaRef, + func_ctx: FunctionContext, + ) -> Result { + let mut ops = Vec::::with_capacity(unmatched.len()); + for item in &unmatched { + let eval_projections: HashSet = + (input_schema.num_fields()..input_schema.num_fields() + item.2.len()).collect(); + + ops.push(InsertDataBlockMutation { + op: BlockOperator::Map { + exprs: item + .2 + .iter() + .map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS)) + .collect_vec(), + projections: Some(eval_projections), + }, + split_mutator: { + let filter = item.1.as_ref().map(|expr| expr.as_expr(&BUILTIN_FUNCTIONS)); + SplitByExprMutator::create(filter, func_ctx.clone()) + }, + }); + } + + Ok(Self { + input_port: InputPort::create(), + output_port: OutputPort::create(), + ops, + input_data: None, + output_data: None, + func_ctx, + }) + } + + pub fn into_pipe_item(self) -> PipeItem { + let input = self.input_port.clone(); + let output_port = self.output_port.clone(); + let processor_ptr = ProcessorPtr::create(Box::new(self)); + PipeItem::create(processor_ptr, vec![input], vec![output_port]) + } +} + +impl Processor for MergeIntoNotMatchedProcessor { + fn name(&self) -> String { + "MergeIntoNotMatched".to_owned() + } + + #[doc = " Reference used for downcast."] + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + let finished = self.input_port.is_finished() && self.output_data.is_none(); + if finished { + self.output_port.finish(); + return Ok(Event::Finished); + } + + let mut pushed_something = false; + + if self.output_port.can_push() { + if let Some(not_matched_data) = self.output_data.take() { + self.output_port.push_data(Ok(not_matched_data)); + pushed_something = true + } + } + + if pushed_something { + return Ok(Event::NeedConsume); + } + + if self.input_port.has_data() { + if self.output_data.is_none() { + self.input_data = Some(self.input_port.pull_data().unwrap()?); + Ok(Event::Sync) + } else { + Ok(Event::NeedConsume) + } + } else { + self.input_port.set_need_data(); + Ok(Event::NeedData) + } + } + + fn process(&mut self) -> Result<()> { + if let Some(data_block) = self.input_data.take() { + if data_block.is_empty() { + return Ok(()); + } + // get an empty data_block but have same schema + let mut output_block = None; + let mut current_block = data_block; + for op in &self.ops { + let (satisfied_block, unsatisfied_block) = + op.split_mutator.split_by_expr(current_block)?; + // in V1, we make sure the output_schema of each insert expr result block is the same + // we will fix it in the future. + if !satisfied_block.is_empty() { + if output_block.is_some() { + output_block = Some(DataBlock::concat(&[ + output_block.unwrap(), + op.op.execute(&self.func_ctx, satisfied_block)?, + ])?); + } else { + output_block = Some(op.op.execute(&self.func_ctx, satisfied_block)?) + } + } + + if unsatisfied_block.is_empty() { + break; + } else { + current_block = unsatisfied_block + } + } + // todo:(JackTan25) fill format data block + if output_block.is_some() { + metrics_inc_merge_into_append_blocks_counter( + output_block.as_ref().unwrap().num_rows() as u32, + ); + self.output_data = output_block + } + } + Ok(()) + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs new file mode 100644 index 000000000000..01f741b8a2a5 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs @@ -0,0 +1,167 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use common_exception::Result; +use common_expression::DataBlock; +use common_pipeline_core::pipe::Pipe; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use common_storage::common_metrics::merge_into::metrics_inc_merge_into_matched_rows; +use common_storage::common_metrics::merge_into::metrics_inc_merge_into_unmatched_rows; + +use crate::operations::merge_into::mutator::MergeIntoSplitMutator; + +pub struct MergeIntoSplitProcessor { + input_port: Arc, + output_port_matched: Arc, + output_port_not_matched: Arc, + + input_data: Option, + output_data_matched_data: Option, + output_data_not_matched_data: Option, + // if target table is a empty table we will push all datablocks into + // not match branch. + target_table_empty: bool, + merge_into_split_mutator: MergeIntoSplitMutator, +} + +impl MergeIntoSplitProcessor { + pub fn create(row_id_idx: u32, target_table_empty: bool) -> Result { + let merge_into_split_mutator = MergeIntoSplitMutator::try_create(row_id_idx); + let input_port = InputPort::create(); + let output_port_matched = OutputPort::create(); + let output_port_not_matched = OutputPort::create(); + Ok(Self { + input_port, + output_port_matched, + output_port_not_matched, + input_data: None, + output_data_matched_data: None, + output_data_not_matched_data: None, + target_table_empty, + merge_into_split_mutator, + }) + } + + pub fn into_pipe(self) -> Pipe { + let pipe_item = self.into_pipe_item(); + Pipe::create(1, 2, vec![pipe_item]) + } + + pub fn into_pipe_item(self) -> PipeItem { + let input = self.input_port.clone(); + let output_port_matched = self.output_port_matched.clone(); + let output_port_not_matched = self.output_port_not_matched.clone(); + let processor_ptr = ProcessorPtr::create(Box::new(self)); + PipeItem::create(processor_ptr, vec![input], vec![ + output_port_matched, + output_port_not_matched, + ]) + } +} + +impl Processor for MergeIntoSplitProcessor { + fn name(&self) -> String { + "MergeIntoSplit".to_owned() + } + + #[doc = " Reference used for downcast."] + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + // 1. if there is no data and input_port is finished, this processor has finished + // it's work + let finished = self.input_port.is_finished() + && self.output_data_matched_data.is_none() + && self.output_data_not_matched_data.is_none(); + if finished { + self.output_port_matched.finish(); + self.output_port_not_matched.finish(); + return Ok(Event::Finished); + } + + let mut pushed_something = false; + + // 2. process data stage here + if self.output_port_matched.can_push() { + if let Some(matched_data) = self.output_data_matched_data.take() { + self.output_port_matched.push_data(Ok(matched_data)); + pushed_something = true + } + } + + if self.output_port_not_matched.can_push() { + if let Some(not_matched_data) = self.output_data_not_matched_data.take() { + self.output_port_not_matched.push_data(Ok(not_matched_data)); + pushed_something = true + } + } + + // 3. trigger down stream pipeItem to consume if we pushed data + if pushed_something { + Ok(Event::NeedConsume) + } else { + // 4. we can't pushed data ,so the down stream is not prepared or we have no data at all + // we need to make sure only when the all out_pudt_data are empty ,and we start to split + // datablock held by input_data + if self.input_port.has_data() { + if self.output_data_matched_data.is_none() + && self.output_data_not_matched_data.is_none() + { + // no pending data (being sent to down streams) + self.input_data = Some(self.input_port.pull_data().unwrap()?); + Ok(Event::Sync) + } else { + // data pending + Ok(Event::NeedConsume) + } + } else { + self.input_port.set_need_data(); + Ok(Event::NeedData) + } + } + } + + // Todo:(JackTan25) accutally, we should do insert-only optimization in the future. + fn process(&mut self) -> Result<()> { + if let Some(data_block) = self.input_data.take() { + if self.target_table_empty { + self.output_data_not_matched_data = Some(data_block) + } else { + let (matched_block, not_matched_block) = self + .merge_into_split_mutator + .split_data_block(&data_block)?; + if !matched_block.is_empty() { + metrics_inc_merge_into_matched_rows(matched_block.num_rows() as u32); + self.output_data_matched_data = Some(matched_block); + } + + if !not_matched_block.is_empty() { + metrics_inc_merge_into_unmatched_rows(not_matched_block.num_rows() as u32); + self.output_data_not_matched_data = Some(not_matched_block); + } + } + } + Ok(()) + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs new file mode 100644 index 000000000000..7dc70d903f47 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs @@ -0,0 +1,55 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::Result; +use common_expression::DataBlock; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; + +use crate::operations::merge_into::mutator::MatchedAggregator; + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for MatchedAggregator { + const NAME: &'static str = "MatchedAggregator"; + + #[async_backtrace::framed] + async fn transform(&mut self, data: DataBlock) -> Result> { + self.accumulate(data).await?; + // no partial output + Ok(None) + } + + #[async_backtrace::framed] + async fn on_finish(&mut self, _output: bool) -> Result> { + // apply mutations + let mutation_logs = self.apply().await?; + Ok(mutation_logs.map(|logs| logs.into())) + } +} + +impl MatchedAggregator { + pub fn into_pipe_item(self) -> PipeItem { + let input = InputPort::create(); + let output = OutputPort::create(); + let processor_ptr = + AsyncAccumulatingTransformer::create(input.clone(), output.clone(), self); + PipeItem::create(ProcessorPtr::create(processor_ptr), vec![input], vec![ + output, + ]) + } +} diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 77a50bda99d5..89da3a70c1b9 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -20,6 +20,8 @@ pub mod common; mod compact; mod delete; mod gc; +pub mod merge; +pub mod merge_into; mod mutation; mod navigate; mod read; @@ -32,7 +34,6 @@ mod revert; mod truncate; mod update; pub mod util; - pub use agg_index_sink::AggIndexSink; pub use common::BlockMetaIndex; pub use common::FillInternalColumnProcessor; @@ -45,4 +46,6 @@ pub use mutation::SegmentCompactMutator; pub use mutation::SegmentCompactionState; pub use mutation::SegmentCompactor; pub use read::build_row_fetcher_pipeline; +pub use util::acquire_task_permit; pub use util::column_parquet_metas; +pub use util::read_block; diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 7603243130f0..9fa8d0f691af 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -18,7 +18,6 @@ use std::time::Instant; use ahash::AHashMap; use common_arrow::arrow::bitmap::MutableBitmap; -use common_base::base::tokio::sync::OwnedSemaphorePermit; use common_base::base::tokio::sync::Semaphore; use common_base::base::ProgressValues; use common_base::runtime::GlobalIORuntime; @@ -70,11 +69,13 @@ use crate::metrics::metrics_inc_replace_row_number_totally_loaded; use crate::metrics::metrics_inc_replace_row_number_write; use crate::metrics::metrics_inc_replace_segment_number_after_pruning; use crate::metrics::metrics_inc_replace_whole_block_deletion; +use crate::operations::acquire_task_permit; use crate::operations::common::BlockMetaIndex; use crate::operations::common::MutationLogEntry; use crate::operations::common::MutationLogs; use crate::operations::mutation::BlockIndex; use crate::operations::mutation::SegmentIndex; +use crate::operations::read_block; use crate::operations::replace_into::meta::merge_into_operation_meta::DeletionByColumn; use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoOperation; use crate::operations::replace_into::meta::merge_into_operation_meta::UniqueKeyDigest; @@ -319,7 +320,8 @@ impl MergeIntoOperationAggregator { let segment_info: SegmentInfo = compact_segment_info.try_into()?; for (block_index, keys) in block_deletion { - let permit = aggregation_ctx.acquire_task_permit().await?; + let permit = + acquire_task_permit(aggregation_ctx.io_request_semaphore.clone()).await?; let block_meta = segment_info.blocks[block_index].clone(); let aggregation_ctx = aggregation_ctx.clone(); num_rows_mutated += block_meta.row_count; @@ -397,7 +399,13 @@ impl AggregationContext { return Ok(None); } - let key_columns_data = self.read_block(&self.key_column_reader, block_meta).await?; + let key_columns_data = read_block( + self.write_settings.storage_format, + &self.key_column_reader, + block_meta, + &self.read_settings, + ) + .await?; let num_rows = key_columns_data.num_rows(); @@ -543,20 +551,6 @@ impl AggregationContext { Ok(Some(mutation)) } - #[async_backtrace::framed] - async fn acquire_task_permit(&self) -> Result { - let permit = self - .io_request_semaphore - .clone() - .acquire_owned() - .await - .map_err(|e| { - ErrorCode::Internal("unexpected, io request semaphore is closed. {}") - .add_message_back(e.to_string()) - })?; - Ok(permit) - } - fn overlapped( &self, column_stats: &HashMap, diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs index c3a865ac5414..0b3f0b24116f 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs @@ -24,9 +24,6 @@ use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransfo use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoOperation; pub use crate::operations::replace_into::mutator::merge_into_mutator::MergeIntoOperationAggregator; -/// Takes multiple [MergeIntoOperation]s in, and aggregate them. -/// Applies them to segments(and data blocks belong to this Aggregator) in the `final` stage. -/// Outputs [MutationLogs] logs(to be committed). #[async_trait::async_trait] impl AsyncAccumulatingTransform for MergeIntoOperationAggregator { const NAME: &'static str = "MergeIntoMutationAggregator"; diff --git a/src/query/storages/fuse/src/operations/util.rs b/src/query/storages/fuse/src/operations/util.rs index 1e5fa9c82546..48f73154e3c0 100644 --- a/src/query/storages/fuse/src/operations/util.rs +++ b/src/query/storages/fuse/src/operations/util.rs @@ -13,15 +13,25 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use common_arrow::parquet::metadata::ThriftFileMetaData; +use common_base::base::tokio::sync::OwnedSemaphorePermit; +use common_base::base::tokio::sync::Semaphore; +use common_base::runtime::GlobalIORuntime; use common_exception::ErrorCode; use common_exception::Result; use common_expression::ColumnId; +use common_expression::DataBlock; use common_expression::TableSchemaRef; +use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::ColumnMeta; use storages_common_table_meta::meta::SingleColumnMeta; +use crate::io::BlockReader; +use crate::io::ReadSettings; +use crate::FuseStorageFormat; + pub fn column_parquet_metas( file_meta: &ThriftFileMetaData, schema: &TableSchemaRef, @@ -73,3 +83,49 @@ pub fn column_parquet_metas( } Ok(col_metas) } + +#[async_backtrace::framed] +pub async fn acquire_task_permit( + io_request_semaphore: Arc, +) -> Result { + let permit = io_request_semaphore.acquire_owned().await.map_err(|e| { + ErrorCode::Internal("unexpected, io request semaphore is closed. {}") + .add_message_back(e.to_string()) + })?; + Ok(permit) +} + +pub async fn read_block( + storage_format: FuseStorageFormat, + reader: &BlockReader, + block_meta: &BlockMeta, + read_settings: &ReadSettings, +) -> Result { + let merged_io_read_result = reader + .read_columns_data_by_merge_io( + read_settings, + &block_meta.location.0, + &block_meta.col_metas, + &None, + ) + .await?; + + // deserialize block data + // cpu intensive task, send them to dedicated thread pool + + let block_meta_ptr = block_meta.clone(); + let reader = reader.clone(); + GlobalIORuntime::instance() + .spawn_blocking(move || { + let column_chunks = merged_io_read_result.columns_chunks()?; + reader.deserialize_chunks( + block_meta_ptr.location.0.as_str(), + block_meta_ptr.row_count as usize, + &block_meta_ptr.compression, + &block_meta_ptr.col_metas, + column_chunks, + &storage_format, + ) + }) + .await +} diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into new file mode 100644 index 000000000000..30c5cdb350ab --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into @@ -0,0 +1,163 @@ +statement ok +set enable_experimental_merge_into = 1; + +statement ok +drop table if exists t1; + +statement ok +drop table if exists t2; + +statement ok +create table t1(a int,b string, c string); + +statement ok +create table t2(a int,b string, c string); + +statement ok +insert into t1 values(1,'b1','c1'),(2,'b2','c2'); + +statement ok +insert into t1 values(2,'b3','c3'),(3,'b4','c4'); + +query TTT +select * from t1 order by a; +---- +1 b1 c1 +2 b2 c2 +2 b3 c3 +3 b4 c4 + +statement ok +insert into t2 values(1,'b_5','c_5'),(3,'b_6','c_6'); + +statement ok +insert into t2 values(2,'b_7','c_7'); + +query TTT +select * from t2 order by a; +---- +1 b_5 c_5 +2 b_7 c_7 +3 b_6 c_6 + +# section I: basic test for match and unmatch + +statement error 1006 +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then update set t1.c = t2.c,t1.c = t2.c; + +statement error 1006 +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then update set t1.c = t2.c when not matched then insert (a,b) values(t2.a,t2.b); + +statement ok +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then update set t1.c = t2.c; + +query TTT +select * from t1 order by a; +---- +1 b1 c_5 +2 b2 c_7 +2 b3 c_7 +3 b4 c_6 + +statement ok +insert into t2 values(4,'b_8','c_8'); + +query TTT +select * from t2 order by a; +---- +1 b_5 c_5 +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 + +statement ok +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then update set t1.c = t2.c when not matched then insert (a,b,c) values(t2.a,t2.b,t2.c); + +query TTT +select * from t1 order by a; +---- +1 b1 c_5 +2 b2 c_7 +2 b3 c_7 +3 b4 c_6 +4 b_8 c_8 + +statement ok +insert into t2 values(1,'b_9','c_9'); + +statement error 4001 +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then update set t1.c = t2.c when not matched then insert (a,b,c) values(t2.a,t2.b,t2.c); + +query TTT +select * from t1 order by a; +---- +1 b1 c_5 +2 b2 c_7 +2 b3 c_7 +3 b4 c_6 +4 b_8 c_8 + +statement ok +delete from t2 where a = 1; + +query TTT +select * from t2 order by a; +---- +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 + +statement ok +insert into t2 values(5,'b_9','c_9'); + +query TTT +select * from t2 order by a; +---- +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 +5 b_9 c_9 + +statement ok +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then delete; + +query ITT +select * from t1 order by a; +---- +1 b1 c_5 + +# section 2 multi clauses +statement ok +insert into t1 values(2,'b_1','c_1'),(3,'b_2','c_2'); + +query TTT +select * from t1 order by a; +---- +1 b1 c_5 +2 b_1 c_1 +3 b_2 c_2 + +statement error 1065 +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then delete when matched then update set t1.c = t2.c when not matched and t2.c = 'c_8' then insert (a,b,c) values(t2.a,t2.b,t2.c); + +statement ok +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched and t1.b = 'b_1' then delete when matched then update set t1.c = t2.c when not matched and t2.c = 'c_8' then insert (a,b,c) values(t2.a,t2.b,t2.c); + +query TTT +select * from t1 order by a; +---- +1 b1 c_5 +3 b_2 c_6 +4 b_8 c_8 + +statement ok +merge into t1 using (select * from t2 as t2) on t1.a = t2.a when matched then delete when not matched and t2.c = 'c_9' then insert (a,b,c) values(t2.a,t2.b,t2.c); + +query TTT +select * from t1 order by a; +---- +1 b1 c_5 +5 b_9 c_9 + +statement ok +set enable_experimental_merge_into = 0; \ No newline at end of file