Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support distributed merge_into #13151

Merged
merged 72 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
1f13597
add settings
JackTan25 Oct 9, 2023
879ee6e
right join for merge into first
JackTan25 Oct 10, 2023
92538e1
add distribution optimization for merge into join
JackTan25 Oct 13, 2023
19a1999
split merge into plan
JackTan25 Oct 16, 2023
60f79f7
fix update identify error
JackTan25 Oct 16, 2023
662af10
finish distibuted baisc codes
JackTan25 Oct 16, 2023
76e1352
fix typo
JackTan25 Oct 16, 2023
e4f7450
uniform row_kind and mutation_log
JackTan25 Oct 17, 2023
cd92873
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 17, 2023
93a1cdf
fix MixRowKindAndLog serialize and deserialize
JackTan25 Oct 18, 2023
0a070b1
add tests
JackTan25 Oct 18, 2023
35ff1f5
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 18, 2023
f258ac0
fix check
JackTan25 Oct 18, 2023
fe15639
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 18, 2023
f292341
fix check
JackTan25 Oct 18, 2023
137b52f
fix check
JackTan25 Oct 18, 2023
f39bfd1
fix test
JackTan25 Oct 18, 2023
9b114f4
fix test
JackTan25 Oct 18, 2023
d177d15
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 18, 2023
7bcf760
fix
JackTan25 Oct 18, 2023
636825f
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 18, 2023
6bb03e6
remove memory size limit
JackTan25 Oct 19, 2023
df8acbf
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 19, 2023
b262af2
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 19, 2023
c08cf13
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 21, 2023
1c64166
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 23, 2023
ceae57f
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 23, 2023
0d0eba9
optmizie merge source and add row_number processor
JackTan25 Oct 23, 2023
74c1ec1
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 23, 2023
ab2469d
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 23, 2023
db00e4e
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 24, 2023
e95a280
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 24, 2023
f1f2f94
fix delete bug
JackTan25 Oct 24, 2023
bd3be35
add row number plan
JackTan25 Oct 24, 2023
a2fec57
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 24, 2023
b60479e
fix row number
JackTan25 Oct 25, 2023
c79a04b
refactor merge into pipeline
JackTan25 Oct 25, 2023
551d854
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 25, 2023
126da66
split row_number and log, try to get hash table source data
JackTan25 Oct 26, 2023
4112417
finish distributed codes, need to get data from hashtable
JackTan25 Oct 26, 2023
2450df7
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 26, 2023
622a13a
finish not macthed append data
JackTan25 Oct 27, 2023
8a56035
fix conflict
JackTan25 Oct 27, 2023
a15df12
fix filter
JackTan25 Oct 27, 2023
18d25d0
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 27, 2023
672218e
fix filter
JackTan25 Oct 27, 2023
03ac2a8
Merge branch 'main' of https://github.com/datafuselabs/databend into …
JackTan25 Oct 27, 2023
2ba7b7d
fix distributed bugs,many bugs, need to support insert
JackTan25 Oct 28, 2023
9eebe16
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 28, 2023
4e168db
fix bugs
JackTan25 Oct 29, 2023
73e6210
fix check and clean codes
JackTan25 Oct 29, 2023
6ffdee9
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 29, 2023
1930589
fix check
JackTan25 Oct 29, 2023
f9200b4
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 29, 2023
2236178
add more tests
JackTan25 Oct 29, 2023
8f1739d
fix flaky
JackTan25 Oct 29, 2023
68aa9be
fix test result
JackTan25 Oct 29, 2023
2da2a21
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 30, 2023
d7fc27b
fix order
JackTan25 Oct 30, 2023
4afa8a2
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 30, 2023
d1150e6
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 30, 2023
edffda8
clean codes
JackTan25 Oct 30, 2023
7796bdf
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 30, 2023
deb4d1f
remove local builder branch
JackTan25 Oct 30, 2023
c646f3b
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 30, 2023
9fcf4b4
refactor logic
JackTan25 Oct 30, 2023
30f5b3b
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 30, 2023
fa3e09e
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 31, 2023
cb08da0
clean codes
JackTan25 Oct 31, 2023
269c0de
Merge branch 'distributed_merge_into' of https://github.com/JackTan25…
JackTan25 Oct 31, 2023
60fcaee
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 31, 2023
4defaec
Merge branch 'main' into distributed_merge_into
JackTan25 Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/query/ast/src/ast/statements/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,13 @@ use crate::ast::TableReference;

#[derive(Debug, Clone, PartialEq)]
pub struct MergeUpdateExpr {
pub catalog: Option<Identifier>,
pub table: Option<Identifier>,
pub name: Identifier,
pub expr: Expr,
}

impl Display for MergeUpdateExpr {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
if self.catalog.is_some() {
write!(f, "{}.", self.catalog.clone().unwrap())?;
}

if self.table.is_some() {
write!(f, "{}.", self.table.clone().unwrap())?;
}
Expand Down
9 changes: 2 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2850,12 +2850,7 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {

pub fn merge_update_expr(i: Input) -> IResult<MergeUpdateExpr> {
map(
rule! { ( #dot_separated_idents_1_to_3 ~ "=" ~ ^#expr ) },
|((catalog, table, name), _, expr)| MergeUpdateExpr {
catalog,
table,
name,
expr,
},
rule! { ( #dot_separated_idents_1_to_2 ~ "=" ~ ^#expr ) },
|((table, name), _, expr)| MergeUpdateExpr { table, name, expr },
)(i)
}
1 change: 1 addition & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub const SEGMENT_NAME_COLUMN_ID: u32 = u32::MAX - 2;
pub const SNAPSHOT_NAME_COLUMN_ID: u32 = u32::MAX - 3;

pub const ROW_ID_COL_NAME: &str = "_row_id";
pub const ROW_NUMBER_COL_NAME: &str = "_row_number";
pub const SNAPSHOT_NAME_COL_NAME: &str = "_snapshot_name";
pub const SEGMENT_NAME_COL_NAME: &str = "_segment_name";
pub const BLOCK_NAME_COL_NAME: &str = "_block_name";
Expand Down
6 changes: 6 additions & 0 deletions src/query/pipeline/core/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,10 @@ impl TransformPipeBuilder {
}
self.items = items
}

pub fn add_items(&mut self, items: Vec<PipeItem>) {
for item in items {
self.items.push(item)
}
}
}
6 changes: 5 additions & 1 deletion src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,14 +821,18 @@ impl FragmentCoordinator {
self.initialized = true;

let pipeline_ctx = QueryContext::create_from(ctx);

let pipeline_builder = PipelineBuilder::create(
pipeline_ctx.get_function_context()?,
pipeline_ctx.get_settings(),
pipeline_ctx,
enable_profiling,
SharedProcessorProfiles::default(),
);
self.pipeline_build_res = Some(pipeline_builder.finalize(&self.physical_plan)?);

let res = pipeline_builder.finalize(&self.physical_plan)?;

self.pipeline_build_res = Some(res);
}

Ok(())
Expand Down
131 changes: 101 additions & 30 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ConstantFolder;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_expression::FieldIndex;
use common_expression::RemoteExpr;
use common_expression::SendableDataBlockStream;
use common_expression::ROW_NUMBER_COL_NAME;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableInfo;
use common_sql::executor::CommitSink;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind::Merge;
use common_sql::executor::MergeInto;
use common_sql::executor::MergeIntoAppendNotMatched;
use common_sql::executor::MergeIntoSource;
use common_sql::executor::MutationKind;
use common_sql::executor::PhysicalPlan;
Expand Down Expand Up @@ -163,11 +168,19 @@ impl MergeIntoInterpreter {
} = &self.plan;

// check mutability
let table = self.ctx.get_table(catalog, database, table_name).await?;
table.check_mutable()?;
let check_table = self.ctx.get_table(catalog, database, table_name).await?;
check_table.check_mutable()?;

let table_name = table_name.clone();
let input = input.clone();
let (exchange, input) = if let RelOperator::Exchange(exchange) = input.plan() {
(Some(exchange), Box::new(input.child(0)?.clone()))
} else {
(None, input)
};

let optimized_input = self
.build_static_filter(input, meta_data, self.ctx.clone())
.build_static_filter(&input, meta_data, self.ctx.clone())
.await?;
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);

Expand All @@ -192,6 +205,7 @@ impl MergeIntoInterpreter {
};

let mut found_row_id = false;
let mut row_number_idx = None;
for (idx, data_field) in join_output_schema.fields().iter().enumerate() {
if *data_field.name() == row_id_idx.to_string() {
row_id_idx = idx;
Expand All @@ -200,13 +214,24 @@ impl MergeIntoInterpreter {
}
}

if exchange.is_some() {
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
}

// we can't get row_id_idx, throw an exception
if !found_row_id {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_id_idx when running merge into",
));
}

if exchange.is_some() && row_number_idx.is_none() {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_number_idx when running merge into",
));
}

let table = self.ctx.get_table(catalog, database, &table_name).await?;
let fuse_table =
table
.as_any()
Expand Down Expand Up @@ -340,27 +365,62 @@ impl MergeIntoInterpreter {
.insert(*field_index, join_output_schema.index_of(value).unwrap());
}

// recv datablocks from matched upstream and unmatched upstream
// transform and append dat
let merge_into = PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched,
matched,
field_index_of_input_schema,
row_id_idx,
segments: base_snapshot
.segments
.clone()
.into_iter()
.enumerate()
.collect(),
}));
let segments: Vec<_> = base_snapshot
.segments
.clone()
.into_iter()
.enumerate()
.collect();

let commit_input = if exchange.is_none() {
// recv datablocks from matched upstream and unmatched upstream
// transform and append dat
PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched,
matched,
field_index_of_input_schema,
row_id_idx,
segments,
distributed: false,
output_schema: DataSchemaRef::default(),
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source.clone()),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched: unmatched.clone(),
matched,
field_index_of_input_schema,
row_id_idx,
segments,
distributed: true,
output_schema: DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
}));

PhysicalPlan::MergeIntoAppendNotMatched(Box::new(MergeIntoAppendNotMatched {
input: Box::new(PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(merge_append),
kind: Merge,
keys: vec![],
ignore_exchange: false,
})),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched: unmatched.clone(),
input_schema: merge_into_source.output_schema()?,
}))
};

// build mutation_aggregate
let physical_plan = PhysicalPlan::CommitSink(Box::new(CommitSink {
input: Box::new(merge_into),
input: Box::new(commit_input),
snapshot: base_snapshot,
table_info: table_info.clone(),
catalog_info: catalog_.info(),
Expand Down Expand Up @@ -388,16 +448,17 @@ impl MergeIntoInterpreter {
// EvalScalar(source_join_side_expr)
// \
// SourcePlan
let source_plan = join.child(0)?;

let source_plan = join.child(1)?;
let join_op = match join.plan() {
RelOperator::Join(j) => j,
_ => unreachable!(),
};
if join_op.left_conditions.len() != 1 || join_op.right_conditions.len() != 1 {
return Ok(Box::new(join.clone()));
}
let source_side_expr = &join_op.left_conditions[0];
let target_side_expr = &join_op.right_conditions[0];
let source_side_expr = &join_op.right_conditions[0];
let target_side_expr = &join_op.left_conditions[0];

// eval source side join expr
let source_side_join_expr_index = metadata.write().add_derived_column(
Expand All @@ -421,10 +482,19 @@ impl MergeIntoInterpreter {
index: source_side_join_expr_index,
}],
};
let eval_target_side_condition_sexpr = SExpr::create_unary(
Arc::new(eval_source_side_join_expr_op.into()),
Arc::new(source_plan.clone()),
);
let eval_target_side_condition_sexpr = if let RelOperator::Exchange(_) = source_plan.plan()
{
// there is another row_number operator here
SExpr::create_unary(
Arc::new(eval_source_side_join_expr_op.into()),
Arc::new(source_plan.child(0)?.child(0)?.clone()),
)
} else {
SExpr::create_unary(
Arc::new(eval_source_side_join_expr_op.into()),
Arc::new(source_plan.clone()),
)
};

// eval min/max of source side join expr
let min_display_name = format!("min({:?})", source_side_expr);
Expand Down Expand Up @@ -545,10 +615,10 @@ impl MergeIntoInterpreter {
});

let filters = vec![gte_min, lte_max];
let mut target_plan = join.child(1)?.clone();
let mut target_plan = join.child(0)?.clone();
Self::push_down_filters(&mut target_plan, &filters)?;
let new_sexpr =
join.replace_children(vec![Arc::new(source_plan.clone()), Arc::new(target_plan)]);
join.replace_children(vec![Arc::new(target_plan), Arc::new(source_plan.clone())]);
Ok(Box::new(new_sexpr))
}

Expand Down Expand Up @@ -585,6 +655,7 @@ impl MergeIntoInterpreter {
RelOperator::Lambda(_) => {}
RelOperator::ConstantTableScan(_) => {}
RelOperator::Pattern(_) => {}
RelOperator::AddRowNumber(_) => {}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/pipelines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ pub use pipe::SourcePipeBuilder;
pub use pipe::TransformPipeBuilder;
pub use pipeline::Pipeline;
pub use pipeline_build_res::PipelineBuildResult;
pub use pipeline_build_res::PipelineBuilderData;
pub use pipeline_builder::PipelineBuilder;
pub use pipeline_builder::ValueSource;
18 changes: 18 additions & 0 deletions src/query/service/src/pipelines/pipeline_build_res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ use std::sync::Arc;

use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataField;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::Pipeline;
use common_pipeline_core::SourcePipeBuilder;
use common_pipeline_sources::OneBlockSource;
use common_profile::SharedProcessorProfiles;

use super::processors::transforms::hash_join::HashJoinBuildState;
use crate::api::DefaultExchangeInjector;
use crate::api::ExchangeInjector;

#[derive(Clone)]
pub struct PipelineBuilderData {
pub input_join_state: Option<Arc<HashJoinBuildState>>,
pub input_probe_schema: Option<Vec<DataField>>,
}

pub struct PipelineBuildResult {
pub main_pipeline: Pipeline,
// Containing some sub queries pipelines, must be complete pipeline
Expand All @@ -35,6 +43,8 @@ pub struct PipelineBuildResult {
pub prof_span_set: SharedProcessorProfiles,

pub exchange_injector: Arc<dyn ExchangeInjector>,
/// for local fragment data sharing
pub builder_data: PipelineBuilderData,
}

impl PipelineBuildResult {
Expand All @@ -44,6 +54,10 @@ impl PipelineBuildResult {
sources_pipelines: vec![],
prof_span_set: SharedProcessorProfiles::default(),
exchange_injector: DefaultExchangeInjector::create(),
builder_data: PipelineBuilderData {
input_join_state: None,
input_probe_schema: None,
},
}
}

Expand All @@ -63,6 +77,10 @@ impl PipelineBuildResult {
sources_pipelines: vec![],
prof_span_set: SharedProcessorProfiles::default(),
exchange_injector: DefaultExchangeInjector::create(),
builder_data: PipelineBuilderData {
input_join_state: None,
input_probe_schema: None,
},
})
}

Expand Down
Loading
Loading