Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Merge-Into V1 #12350

Merged
merged 124 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 110 commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
bc8115e
try to add merge grammer
JackTan25 Aug 3, 2023
916604b
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 4, 2023
f7ccc95
finish parser stage
JackTan25 Aug 4, 2023
9a30ace
finish match_clause and unmatch_clause
JackTan25 Aug 4, 2023
a0dc670
finish display for merge_into
JackTan25 Aug 4, 2023
a1fa3b8
finish grammer parser, start bind stage
JackTan25 Aug 4, 2023
291e813
remove useless codes
JackTan25 Aug 4, 2023
5d30a2d
add MergeIntoPlan
JackTan25 Aug 4, 2023
c0074b9
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 6, 2023
2ac1859
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 7, 2023
9c71100
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 7, 2023
021724b
fix distributed http error
JackTan25 Aug 7, 2023
353f828
support insert values for match
JackTan25 Aug 7, 2023
85131aa
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 7, 2023
ebe2fca
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 9, 2023
53900d5
remove useless codes
JackTan25 Aug 9, 2023
12e5b65
Merge branch 'main' into merge_into_feat
JackTan25 Aug 9, 2023
33f8818
revert match_insert
JackTan25 Aug 9, 2023
e3c1654
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 10, 2023
c6e1de6
refactor merge_into_stmt, build table_reference
JackTan25 Aug 10, 2023
615a568
Merge branch 'main' into merge_into_feat
JackTan25 Aug 10, 2023
f4703b2
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 10, 2023
2e40470
cover match pattern
JackTan25 Aug 10, 2023
5abccad
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 10, 2023
bfb4ca6
cover match pattern
JackTan25 Aug 10, 2023
fe6420e
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 12, 2023
c297af9
stash
JackTan25 Aug 11, 2023
b46c630
try to add merge_into_source_scan
JackTan25 Aug 13, 2023
cceaaf0
add merge_source_scan
JackTan25 Aug 13, 2023
2af4387
Merge branch 'main' into merge_into_feat
JackTan25 Aug 13, 2023
d3783f6
bind join
JackTan25 Aug 13, 2023
27768ae
stash
JackTan25 Aug 13, 2023
ed8836b
refactor merge_source
JackTan25 Aug 14, 2023
4de522b
Merge branch 'main' into merge_into_feat
JackTan25 Aug 14, 2023
8f9ad05
bind clauses
JackTan25 Aug 14, 2023
8941cef
Merge branch 'main' into merge_into_feat
JackTan25 Aug 14, 2023
7b1bbe5
Merge branch 'main' into merge_into_feat
JackTan25 Aug 14, 2023
404827a
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 14, 2023
855a267
Merge branch 'main' into merge_into_feat
JackTan25 Aug 14, 2023
cb244a8
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 14, 2023
0679de7
add new plan node
JackTan25 Aug 14, 2023
719531e
Merge branch 'main' into merge_into_feat
JackTan25 Aug 15, 2023
f8b0a23
add interpreter and refactor bind
JackTan25 Aug 15, 2023
6b3e1de
Merge branch 'main' into merge_into_feat
JackTan25 Aug 15, 2023
b281c88
try to add processor
JackTan25 Aug 15, 2023
3bdd191
add physical plan
JackTan25 Aug 15, 2023
48997fa
fix columns_set
JackTan25 Aug 15, 2023
ba6f793
Merge branch 'main' into merge_into_feat
JackTan25 Aug 16, 2023
c12f1ef
add update/insert expression
JackTan25 Aug 16, 2023
19c4ffa
Merge branch 'main' into merge_into_feat
JackTan25 Aug 16, 2023
be52080
Merge branch 'main' into merge_into_feat
JackTan25 Aug 17, 2023
a5bcfb0
remove unused codes and start to build processor and pipeline
JackTan25 Aug 18, 2023
087d7f3
Merge branch 'main' into merge_into_feat
JackTan25 Aug 18, 2023
3cea83e
Merge branch 'main' into merge_into_feat
JackTan25 Aug 18, 2023
4f8a6ba
Merge branch 'main' into merge_into_feat
JackTan25 Aug 18, 2023
655304d
Merge branch 'main' into merge_into_feat
JackTan25 Aug 19, 2023
73ef4d0
add split operator
JackTan25 Aug 19, 2023
807c3fe
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 19, 2023
947af32
Merge branch 'main' into merge_into_feat
JackTan25 Aug 19, 2023
052d5c0
build source pipeline
JackTan25 Aug 19, 2023
c046a95
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 19, 2023
99f0bb6
finish pipeline build, continue to work on not-matched and matched pr…
JackTan25 Aug 20, 2023
2b4cdfe
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 20, 2023
cae8641
forbidden different schema for now
JackTan25 Aug 20, 2023
bc88dd8
Merge branch 'main' into merge_into_feat
JackTan25 Aug 20, 2023
899c107
refactor expr and finish event schedule
JackTan25 Aug 21, 2023
2c00683
Merge branch 'main' into merge_into_feat
JackTan25 Aug 21, 2023
cd8da12
Merge branch 'main' into merge_into_feat
JackTan25 Aug 21, 2023
8475148
add util split_by_expr
JackTan25 Aug 21, 2023
cfa5bdf
Merge branch 'main' into merge_into_feat
JackTan25 Aug 22, 2023
a660f2b
finish not match insert
JackTan25 Aug 22, 2023
977478b
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 22, 2023
d5f8d61
Merge branch 'main' into merge_into_feat
JackTan25 Aug 23, 2023
e725c77
fix
JackTan25 Aug 23, 2023
8fd9b2b
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 23, 2023
7f1764f
Merge branch 'main' into merge_into_feat
JackTan25 Aug 23, 2023
0ded1bd
Merge branch 'main' into merge_into_feat
JackTan25 Aug 23, 2023
a0117bc
refactor merge into pipeline
JackTan25 Aug 23, 2023
f131ec1
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 23, 2023
241649c
Merge branch 'main' into merge_into_feat
JackTan25 Aug 24, 2023
19d5883
add mutation logentries
JackTan25 Aug 24, 2023
28f21ac
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Aug 24, 2023
bfe91ad
Merge branch 'main' into merge_into_feat
JackTan25 Aug 24, 2023
af8fdb0
add matched mutation
JackTan25 Aug 24, 2023
219f127
Merge branch 'main' into merge_into_feat
JackTan25 Aug 24, 2023
5ad7b99
add setting
JackTan25 Aug 24, 2023
74ddd8e
set not support computed expr
JackTan25 Aug 25, 2023
17faabb
Merge branch 'main' into merge_into_feat
JackTan25 Aug 25, 2023
e50c927
fix bug
JackTan25 Aug 25, 2023
c023968
Merge branch 'main' into merge_into_feat
JackTan25 Aug 25, 2023
6b66183
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 25, 2023
2880d28
fix col_index bug
JackTan25 Aug 25, 2023
44fefe5
fix pipeline bug and add basic tests
JackTan25 Aug 26, 2023
9338278
fix test
JackTan25 Aug 26, 2023
3a05acc
fix typos
JackTan25 Aug 26, 2023
99e308e
fix typos
JackTan25 Aug 26, 2023
8944344
fix clippy
JackTan25 Aug 26, 2023
1f35a1b
add more tests
JackTan25 Aug 26, 2023
9115410
add tests
JackTan25 Aug 26, 2023
72a503b
Merge branch 'main' into merge_into_feat
JackTan25 Aug 27, 2023
0e6b1f6
fix bugs
JackTan25 Aug 27, 2023
19d14f1
use enable_experimental_merge_into adviced by BohuTang instead
JackTan25 Aug 27, 2023
394c16b
add info
JackTan25 Aug 27, 2023
1e59385
fix typo
JackTan25 Aug 27, 2023
cee8d4a
fix ut
JackTan25 Aug 27, 2023
3fd0051
fix native failure
JackTan25 Aug 27, 2023
1ac3801
Merge branch 'main' into merge_into_feat
JackTan25 Aug 28, 2023
3de1489
remove streamingV2Source, need to support streaming in next pr
JackTan25 Aug 28, 2023
e61532e
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 28, 2023
2098cf6
rename vars adviced by b41sh
JackTan25 Aug 28, 2023
7a173aa
Update src/common/exception/src/exception_code.rs
JackTan25 Aug 28, 2023
0ef3ede
remove useless comments
JackTan25 Aug 28, 2023
1201913
Merge branch 'merge_into_feat' of https://github.com/JackTan25/databe…
JackTan25 Aug 28, 2023
17eb8f0
fix
JackTan25 Aug 28, 2023
6b5c939
unify codes, use bitmap to filter
JackTan25 Aug 28, 2023
6a87a93
fix check
JackTan25 Aug 28, 2023
15d171f
unify codes
JackTan25 Aug 28, 2023
91dce14
fix
JackTan25 Aug 29, 2023
b8cae8a
check duplicate
JackTan25 Aug 29, 2023
1dfb53b
check duplicate
JackTan25 Aug 29, 2023
1ade15a
Merge branch 'main' into merge_into_feat
dantengsky Aug 29, 2023
3dc9eba
Merge branch 'main' into merge_into_feat
JackTan25 Aug 29, 2023
c77837c
Merge branch 'main' into merge_into_feat
JackTan25 Aug 29, 2023
a1980d9
Merge branch 'main' into merge_into_feat
JackTan25 Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ build_exceptions! {
BackgroundJobAlreadyExists(1501),
UnknownBackgroundJob(1502),

InValidRowIdIndex(1503),
JackTan25 marked this conversation as resolved.
Show resolved Hide resolved
// Index related errors.
UnsupportedIndex(1601),
RefreshIndexError(1602),
Expand Down
37 changes: 37 additions & 0 deletions src/common/storage/src/common_metrics/merge_into.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use metrics::increment_gauge;

macro_rules! key {
($key: literal) => {
concat!("query_", $key)
};
}

pub fn metrics_inc_merge_into_replace_blocks_counter(c: u32) {
increment_gauge!(key!("merge_into_replace_blocks_counter"), c as f64);
}

pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) {
increment_gauge!(key!("merge_into_append_blocks_counter"), c as f64);
}

pub fn metrics_inc_merge_into_matched_rows(c: u32) {
increment_gauge!(key!("merge_into_matched_rows"), c as f64);
}

pub fn metrics_inc_merge_into_unmatched_rows(c: u32) {
increment_gauge!(key!("merge_into_unmatched_rows"), c as f64);
}
1 change: 1 addition & 0 deletions src/common/storage/src/common_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod copy;
pub mod merge_into;
mod storage_metrics;

pub use storage_metrics::StorageMetrics;
Expand Down
262 changes: 262 additions & 0 deletions src/query/ast/src/ast/statements/merge_into.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

use common_exception::ErrorCode;
use common_exception::Result;

use super::Hint;
use crate::ast::write_comma_separated_list;
use crate::ast::write_period_separated_list;
use crate::ast::Expr;
use crate::ast::Identifier;
use crate::ast::Query;
use crate::ast::TableAlias;
use crate::ast::TableReference;

#[derive(Debug, Clone, PartialEq)]
pub struct MergeUpdateExpr {
pub catalog: Option<Identifier>,
pub table: Option<Identifier>,
pub name: Identifier,
pub expr: Expr,
}

impl Display for MergeUpdateExpr {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
if self.catalog.is_some() {
write!(f, "{}.", self.catalog.clone().unwrap())?;
}

if self.table.is_some() {
write!(f, "{}.", self.table.clone().unwrap())?;
}

write!(f, "{} = {}", self.name, self.expr)
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum MatchOperation {
Update { update_list: Vec<MergeUpdateExpr> },
Delete,
}

#[derive(Debug, Clone, PartialEq)]
pub struct MatchedClause {
pub selection: Option<Expr>,
pub operation: MatchOperation,
}

#[derive(Debug, Clone, PartialEq)]
pub struct InsertOperation {
pub columns: Option<Vec<Identifier>>,
pub values: Vec<Expr>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct UnmatchedClause {
pub selection: Option<Expr>,
pub insert_operation: InsertOperation,
}

#[derive(Debug, Clone, PartialEq)]
pub enum MergeOption {
Match(MatchedClause),
Unmatch(UnmatchedClause),
}

#[derive(Debug, Clone, PartialEq)]
pub struct MergeIntoStmt {
pub hints: Option<Hint>,
pub catalog: Option<Identifier>,
pub database: Option<Identifier>,
pub table_ident: Identifier,
pub source: MergeSource,
// alias_target is belong to target
pub alias_target: Option<TableAlias>,
pub join_expr: Expr,
pub merge_options: Vec<MergeOption>,
}

impl Display for MergeIntoStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "MERGE INTO ")?;
write_period_separated_list(
f,
self.catalog
.iter()
.chain(&self.database)
.chain(Some(&self.table_ident)),
)?;

write!(f, " USING {} ON {}", self.source, self.join_expr)?;

for clause in &self.merge_options {
match clause {
MergeOption::Match(match_clause) => {
write!(f, " WHEN MATCHED ")?;
if let Some(e) = &match_clause.selection {
write!(f, " AND {} ", e)?;
}
write!(f, " THEN ")?;

match &match_clause.operation {
MatchOperation::Update { update_list } => {
write!(f, " UPDATE SET ")?;
write_comma_separated_list(f, update_list)?;
}
MatchOperation::Delete => {
write!(f, " DELETE ")?;
}
}
}
MergeOption::Unmatch(unmatch_clause) => {
write!(f, " WHEN NOT MATCHED ")?;
if let Some(e) = &unmatch_clause.selection {
write!(f, " AND {} ", e)?;
}
write!(f, " THEN INSERT ")?;
if let Some(columns) = &unmatch_clause.insert_operation.columns {
if !columns.is_empty() {
write!(f, " (")?;
write_comma_separated_list(f, columns)?;
write!(f, ")")?;
}
}
write!(f, "VALUES")?;
for (i, value) in unmatch_clause.insert_operation.values.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "(")?;
write_comma_separated_list(f, vec![value])?;
write!(f, ")")?;
}
}
}
}
Ok(())
}
}

impl MergeIntoStmt {
pub fn split_clauses(&self) -> (Vec<MatchedClause>, Vec<UnmatchedClause>) {
let mut match_clauses = Vec::with_capacity(self.merge_options.len());
let mut unmatch_clauses = Vec::with_capacity(self.merge_options.len());
for merge_operation in &self.merge_options {
match merge_operation {
MergeOption::Match(match_clause) => match_clauses.push(match_clause.clone()),
MergeOption::Unmatch(unmatch_clause) => {
unmatch_clauses.push(unmatch_clause.clone())
}
}
}
(match_clauses, unmatch_clauses)
}

pub fn check_multi_match_clauses_semantic(clauses: &Vec<MatchedClause>) -> Result<()> {
// check match_clauses
if clauses.len() > 1 {
for (idx, clause) in clauses.iter().enumerate() {
if clause.selection.is_none() && idx < clauses.len() - 1 {
return Err(ErrorCode::SemanticError(
"when there are multi matched clauses, we must have a condition for every one except the last one".to_string(),
));
}
}
}
Ok(())
}

pub fn check_multi_unmatch_clauses_semantic(clauses: &Vec<UnmatchedClause>) -> Result<()> {
// check unmatch_clauses
if clauses.len() > 1 {
for (idx, clause) in clauses.iter().enumerate() {
if clause.selection.is_none() && idx < clauses.len() - 1 {
return Err(ErrorCode::SemanticError(
"when there are multi unmatched clauses, we must have a condition for every one except the last one".to_string(),
));
}
}
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum MergeSource {
StreamingV2 {
settings: BTreeMap<String, String>,
on_error_mode: Option<String>,
start: usize,
},

Select {
query: Box<Query>,
},
b41sh marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone, PartialEq)]
pub struct StreamingSource {
settings: BTreeMap<String, String>,
on_error_mode: Option<String>,
start: usize,
}

impl MergeSource {
pub fn transform_table_reference(&self) -> TableReference {
match self {
Self::StreamingV2 {
settings: _,
on_error_mode: _,
start: _,
} => unimplemented!(),

Self::Select { query } => TableReference::Subquery {
span: None,
subquery: query.clone(),
alias: None,
},
}
}
}

impl Display for MergeSource {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
MergeSource::StreamingV2 {
settings,
on_error_mode,
start: _,
} => {
write!(f, " FILE_FORMAT = (")?;
for (k, v) in settings.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
write!(
f,
" ON_ERROR = '{}'",
on_error_mode.as_ref().unwrap_or(&"Abort".to_string())
)
}

MergeSource::Select { query } => write!(f, "{query}"),
}
}
}
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod hint;
mod index;
mod insert;
mod kill;
mod merge_into;
mod network_policy;
mod presign;
mod replace;
Expand All @@ -48,6 +49,7 @@ pub use hint::*;
pub use index::*;
pub use insert::*;
pub use kill::*;
pub use merge_into::*;
pub use network_policy::*;
pub use presign::*;
pub use replace::*;
Expand Down
4 changes: 3 additions & 1 deletion src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_meta_app::principal::FileFormatOptionsAst;
use common_meta_app::principal::PrincipalIdentity;
use common_meta_app::principal::UserIdentity;

use super::merge_into::MergeIntoStmt;
use super::*;
use crate::ast::write_comma_separated_list;
use crate::ast::Expr;
Expand Down Expand Up @@ -76,7 +77,7 @@ pub enum Statement {

Insert(InsertStmt),
Replace(ReplaceStmt),

MergeInto(MergeIntoStmt),
Delete {
hints: Option<Hint>,
table_reference: TableReference,
Expand Down Expand Up @@ -298,6 +299,7 @@ impl Display for Statement {
Statement::Query(query) => write!(f, "{query}")?,
Statement::Insert(insert) => write!(f, "{insert}")?,
Statement::Replace(replace) => write!(f, "{replace}")?,
Statement::MergeInto(merge_into) => write!(f, "{merge_into}")?,
Statement::Delete {
table_reference,
selection,
Expand Down
Loading
Loading