Skip to content

Commit

Permalink
feat: support distributed merge_into (#13151)
Browse files Browse the repository at this point in the history
* add settings

* right join for merge into first

* add distribution optimization for merge into join

* split merge into plan

* fix update identify error

* finish distibuted baisc codes

* fix typo

* uniform row_kind and mutation_log

* fix MixRowKindAndLog serialize and deserialize

* add tests

* fix check

* fix check

* fix check

* fix test

* fix test

* fix

* remove memory size limit

* optmizie merge source and add row_number processor

* fix delete bug

* add row number plan

* fix row number

* refactor merge into pipeline

* split row_number and log, try to get hash table source data

* finish distributed codes, need to get data from hashtable

* finish not macthed append data

* fix filter

* fix filter

* fix distributed bugs,many bugs, need to support insert

* fix bugs

* fix check and clean codes

* fix check

* add more tests

* fix flaky

* fix test result

* fix order

* clean codes

* remove local builder branch

* refactor logic

* clean codes
  • Loading branch information
JackTan25 authored Oct 31, 2023
1 parent 1bd8a7d commit 3c22f3f
Show file tree
Hide file tree
Showing 60 changed files with 2,042 additions and 144 deletions.
5 changes: 0 additions & 5 deletions src/query/ast/src/ast/statements/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,13 @@ use crate::ast::TableReference;

#[derive(Debug, Clone, PartialEq)]
pub struct MergeUpdateExpr {
pub catalog: Option<Identifier>,
pub table: Option<Identifier>,
pub name: Identifier,
pub expr: Expr,
}

impl Display for MergeUpdateExpr {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
if self.catalog.is_some() {
write!(f, "{}.", self.catalog.clone().unwrap())?;
}

if self.table.is_some() {
write!(f, "{}.", self.table.clone().unwrap())?;
}
Expand Down
9 changes: 2 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2850,12 +2850,7 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {

pub fn merge_update_expr(i: Input) -> IResult<MergeUpdateExpr> {
map(
rule! { ( #dot_separated_idents_1_to_3 ~ "=" ~ ^#expr ) },
|((catalog, table, name), _, expr)| MergeUpdateExpr {
catalog,
table,
name,
expr,
},
rule! { ( #dot_separated_idents_1_to_2 ~ "=" ~ ^#expr ) },
|((table, name), _, expr)| MergeUpdateExpr { table, name, expr },
)(i)
}
1 change: 1 addition & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub const SEGMENT_NAME_COLUMN_ID: u32 = u32::MAX - 2;
pub const SNAPSHOT_NAME_COLUMN_ID: u32 = u32::MAX - 3;

pub const ROW_ID_COL_NAME: &str = "_row_id";
pub const ROW_NUMBER_COL_NAME: &str = "_row_number";
pub const SNAPSHOT_NAME_COL_NAME: &str = "_snapshot_name";
pub const SEGMENT_NAME_COL_NAME: &str = "_segment_name";
pub const BLOCK_NAME_COL_NAME: &str = "_block_name";
Expand Down
6 changes: 6 additions & 0 deletions src/query/pipeline/core/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,10 @@ impl TransformPipeBuilder {
}
self.items = items
}

pub fn add_items(&mut self, items: Vec<PipeItem>) {
for item in items {
self.items.push(item)
}
}
}
6 changes: 5 additions & 1 deletion src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,14 +821,18 @@ impl FragmentCoordinator {
self.initialized = true;

let pipeline_ctx = QueryContext::create_from(ctx);

let pipeline_builder = PipelineBuilder::create(
pipeline_ctx.get_function_context()?,
pipeline_ctx.get_settings(),
pipeline_ctx,
enable_profiling,
SharedProcessorProfiles::default(),
);
self.pipeline_build_res = Some(pipeline_builder.finalize(&self.physical_plan)?);

let res = pipeline_builder.finalize(&self.physical_plan)?;

self.pipeline_build_res = Some(res);
}

Ok(())
Expand Down
131 changes: 101 additions & 30 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ConstantFolder;
use common_expression::DataBlock;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_expression::FieldIndex;
use common_expression::RemoteExpr;
use common_expression::SendableDataBlockStream;
use common_expression::ROW_NUMBER_COL_NAME;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableInfo;
use common_sql::executor::CommitSink;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind::Merge;
use common_sql::executor::MergeInto;
use common_sql::executor::MergeIntoAppendNotMatched;
use common_sql::executor::MergeIntoSource;
use common_sql::executor::MutationKind;
use common_sql::executor::PhysicalPlan;
Expand Down Expand Up @@ -163,11 +168,19 @@ impl MergeIntoInterpreter {
} = &self.plan;

// check mutability
let table = self.ctx.get_table(catalog, database, table_name).await?;
table.check_mutable()?;
let check_table = self.ctx.get_table(catalog, database, table_name).await?;
check_table.check_mutable()?;

let table_name = table_name.clone();
let input = input.clone();
let (exchange, input) = if let RelOperator::Exchange(exchange) = input.plan() {
(Some(exchange), Box::new(input.child(0)?.clone()))
} else {
(None, input)
};

let optimized_input = self
.build_static_filter(input, meta_data, self.ctx.clone())
.build_static_filter(&input, meta_data, self.ctx.clone())
.await?;
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);

Expand All @@ -192,6 +205,7 @@ impl MergeIntoInterpreter {
};

let mut found_row_id = false;
let mut row_number_idx = None;
for (idx, data_field) in join_output_schema.fields().iter().enumerate() {
if *data_field.name() == row_id_idx.to_string() {
row_id_idx = idx;
Expand All @@ -200,13 +214,24 @@ impl MergeIntoInterpreter {
}
}

if exchange.is_some() {
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
}

// we can't get row_id_idx, throw an exception
if !found_row_id {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_id_idx when running merge into",
));
}

if exchange.is_some() && row_number_idx.is_none() {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_number_idx when running merge into",
));
}

let table = self.ctx.get_table(catalog, database, &table_name).await?;
let fuse_table =
table
.as_any()
Expand Down Expand Up @@ -340,27 +365,62 @@ impl MergeIntoInterpreter {
.insert(*field_index, join_output_schema.index_of(value).unwrap());
}

// recv datablocks from matched upstream and unmatched upstream
// transform and append dat
let merge_into = PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched,
matched,
field_index_of_input_schema,
row_id_idx,
segments: base_snapshot
.segments
.clone()
.into_iter()
.enumerate()
.collect(),
}));
let segments: Vec<_> = base_snapshot
.segments
.clone()
.into_iter()
.enumerate()
.collect();

let commit_input = if exchange.is_none() {
// recv datablocks from matched upstream and unmatched upstream
// transform and append dat
PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched,
matched,
field_index_of_input_schema,
row_id_idx,
segments,
distributed: false,
output_schema: DataSchemaRef::default(),
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(merge_into_source.clone()),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched: unmatched.clone(),
matched,
field_index_of_input_schema,
row_id_idx,
segments,
distributed: true,
output_schema: DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
}));

PhysicalPlan::MergeIntoAppendNotMatched(Box::new(MergeIntoAppendNotMatched {
input: Box::new(PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(merge_append),
kind: Merge,
keys: vec![],
ignore_exchange: false,
})),
table_info: table_info.clone(),
catalog_info: catalog_.info(),
unmatched: unmatched.clone(),
input_schema: merge_into_source.output_schema()?,
}))
};

// build mutation_aggregate
let physical_plan = PhysicalPlan::CommitSink(Box::new(CommitSink {
input: Box::new(merge_into),
input: Box::new(commit_input),
snapshot: base_snapshot,
table_info: table_info.clone(),
catalog_info: catalog_.info(),
Expand Down Expand Up @@ -388,16 +448,17 @@ impl MergeIntoInterpreter {
// EvalScalar(source_join_side_expr)
// \
// SourcePlan
let source_plan = join.child(0)?;

let source_plan = join.child(1)?;
let join_op = match join.plan() {
RelOperator::Join(j) => j,
_ => unreachable!(),
};
if join_op.left_conditions.len() != 1 || join_op.right_conditions.len() != 1 {
return Ok(Box::new(join.clone()));
}
let source_side_expr = &join_op.left_conditions[0];
let target_side_expr = &join_op.right_conditions[0];
let source_side_expr = &join_op.right_conditions[0];
let target_side_expr = &join_op.left_conditions[0];

// eval source side join expr
let source_side_join_expr_index = metadata.write().add_derived_column(
Expand All @@ -421,10 +482,19 @@ impl MergeIntoInterpreter {
index: source_side_join_expr_index,
}],
};
let eval_target_side_condition_sexpr = SExpr::create_unary(
Arc::new(eval_source_side_join_expr_op.into()),
Arc::new(source_plan.clone()),
);
let eval_target_side_condition_sexpr = if let RelOperator::Exchange(_) = source_plan.plan()
{
// there is another row_number operator here
SExpr::create_unary(
Arc::new(eval_source_side_join_expr_op.into()),
Arc::new(source_plan.child(0)?.child(0)?.clone()),
)
} else {
SExpr::create_unary(
Arc::new(eval_source_side_join_expr_op.into()),
Arc::new(source_plan.clone()),
)
};

// eval min/max of source side join expr
let min_display_name = format!("min({:?})", source_side_expr);
Expand Down Expand Up @@ -545,10 +615,10 @@ impl MergeIntoInterpreter {
});

let filters = vec![gte_min, lte_max];
let mut target_plan = join.child(1)?.clone();
let mut target_plan = join.child(0)?.clone();
Self::push_down_filters(&mut target_plan, &filters)?;
let new_sexpr =
join.replace_children(vec![Arc::new(source_plan.clone()), Arc::new(target_plan)]);
join.replace_children(vec![Arc::new(target_plan), Arc::new(source_plan.clone())]);
Ok(Box::new(new_sexpr))
}

Expand Down Expand Up @@ -585,6 +655,7 @@ impl MergeIntoInterpreter {
RelOperator::Lambda(_) => {}
RelOperator::ConstantTableScan(_) => {}
RelOperator::Pattern(_) => {}
RelOperator::AddRowNumber(_) => {}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/pipelines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ pub use pipe::SourcePipeBuilder;
pub use pipe::TransformPipeBuilder;
pub use pipeline::Pipeline;
pub use pipeline_build_res::PipelineBuildResult;
pub use pipeline_build_res::PipelineBuilderData;
pub use pipeline_builder::PipelineBuilder;
pub use pipeline_builder::ValueSource;
18 changes: 18 additions & 0 deletions src/query/service/src/pipelines/pipeline_build_res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ use std::sync::Arc;

use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataField;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::Pipeline;
use common_pipeline_core::SourcePipeBuilder;
use common_pipeline_sources::OneBlockSource;
use common_profile::SharedProcessorProfiles;

use super::processors::transforms::hash_join::HashJoinBuildState;
use crate::api::DefaultExchangeInjector;
use crate::api::ExchangeInjector;

#[derive(Clone)]
pub struct PipelineBuilderData {
pub input_join_state: Option<Arc<HashJoinBuildState>>,
pub input_probe_schema: Option<Vec<DataField>>,
}

pub struct PipelineBuildResult {
pub main_pipeline: Pipeline,
// Containing some sub queries pipelines, must be complete pipeline
Expand All @@ -35,6 +43,8 @@ pub struct PipelineBuildResult {
pub prof_span_set: SharedProcessorProfiles,

pub exchange_injector: Arc<dyn ExchangeInjector>,
/// for local fragment data sharing
pub builder_data: PipelineBuilderData,
}

impl PipelineBuildResult {
Expand All @@ -44,6 +54,10 @@ impl PipelineBuildResult {
sources_pipelines: vec![],
prof_span_set: SharedProcessorProfiles::default(),
exchange_injector: DefaultExchangeInjector::create(),
builder_data: PipelineBuilderData {
input_join_state: None,
input_probe_schema: None,
},
}
}

Expand All @@ -63,6 +77,10 @@ impl PipelineBuildResult {
sources_pipelines: vec![],
prof_span_set: SharedProcessorProfiles::default(),
exchange_injector: DefaultExchangeInjector::create(),
builder_data: PipelineBuilderData {
input_join_state: None,
input_probe_schema: None,
},
})
}

Expand Down
Loading

1 comment on commit 3c22f3f

@vercel
Copy link

@vercel vercel bot commented on 3c22f3f Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.