Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(frontend): create mview on singleton upstream mview #4153

Merged
merged 13 commits into from
Jul 26, 2022
43 changes: 43 additions & 0 deletions e2e_test/streaming/bug_fixes/mv_on_singleton_mv_4153.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
statement ok
create table t (v int);

statement ok
insert into t values (666), (233), (233);

# Create singleton mview
statement ok
create materialized view mv as select v from t order by v limit 10;

statement ok
create materialized view mom as select * from mv;

query I
select * from mv order by v;
----
233
233
666

statement ok
drop materialized view mom;

statement ok
create materialized view mvjoin as select mv1.v as vvvv from mv as mv1, mv as mv2 where mv1.v = mv2.v;

query I
select * from mvjoin order by vvvv;
----
233
233
233
233
666

statement ok
drop materialized view mvjoin;

statement ok
drop materialized view mv;

statement ok
drop table t;
10 changes: 7 additions & 3 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ message ChainNode {
bool disable_rearrange = 4;
// Whether to place this chain on the same worker node as upstream actors.
bool same_worker_node = 5;
// Whether the upstream materialize is and this chain should be a singleton.
// FIXME: This is a workaround for fragmenter since the distribution info will be lost if there's only one
// fragment in the downstream mview. Remove this when we refactor the fragmenter.
bool is_singleton = 6;
Comment on lines +269 to +270
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the desired way?

Copy link
Member Author

@BugenZhao BugenZhao Jul 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may directly operate on the Fragment struct in the frontend instead of the protobuf struct, so that we may be able to directly find whether a fragment is a singleton or not instead of checking the dispatcher type.

Currently, we cannot check whether is_singleton for the most upstream fragment since there's no exchange before it. That's why we need hacking for TopN and Chain. 😢

cc @st1page

}

// BatchPlanNode is used for mv on mv snapshot read.
Expand Down Expand Up @@ -433,9 +437,9 @@ message StreamActor {
}

enum FragmentType {
SOURCE = 0;
SINK = 1;
OTHERS = 2;
OTHERS = 0;
SOURCE = 1;
SINK = 2;
}

message StreamFragmentGraph {
Expand Down
48 changes: 25 additions & 23 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl BatchSeqScan {
}

pub fn new(logical: LogicalScan, scan_ranges: Vec<ScanRange>) -> Self {
// Use `Single` by default, will be updated later with `clone_with_dist`.
Self::new_inner(logical, Distribution::Single, scan_ranges)
}

Expand All @@ -78,6 +79,11 @@ impl BatchSeqScan {
Distribution::Single
} else {
match self.logical.distribution_key() {
// FIXME: Should be `Single` if no distribution key.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #4164 for this.

// Currently the task will be scheduled to frontend under local mode, which is
// unimplemented yet. Enable this when it's done.
//
// Some(dist_key) if dist_key.is_empty() => Distribution::Single,
Some(dist_key) => Distribution::HashShard(dist_key),
None => Distribution::SomeShard,
}
Expand Down Expand Up @@ -130,18 +136,18 @@ impl fmt::Display for BatchSeqScan {

let verbose = self.base.ctx.is_explain_verbose();

if self.scan_ranges.is_empty() {
write!(
f,
"BatchScan {{ table: {}, columns: [{}] }}",
self.logical.table_name(),
match verbose {
true => self.logical.column_names_with_table_prefix(),
false => self.logical.column_names(),
}
.join(", ")
)
} else {
write!(
f,
"BatchScan {{ table: {}, columns: [{}]",
self.logical.table_name(),
match verbose {
true => self.logical.column_names_with_table_prefix(),
false => self.logical.column_names(),
}
.join(", "),
)?;

if !self.scan_ranges.is_empty() {
let order_names = match verbose {
true => self.logical.order_names_with_table_prefix(),
false => self.logical.order_names(),
Expand All @@ -161,18 +167,14 @@ impl fmt::Display for BatchSeqScan {
}
range_strs.push(range_str.join(", "));
}
write!(
f,
"BatchScan {{ table: {}, columns: [{}], scan_ranges: [{}] }}",
self.logical.table_name(),
match verbose {
true => self.logical.column_names_with_table_prefix(),
false => self.logical.column_names(),
}
.join(", "),
range_strs.join(" OR ")
)
write!(f, ", scan_ranges: [{}]", range_strs.join(" OR "))?;
}

if verbose {
write!(f, ", distribution: {}", self.distribution())?;
}

write!(f, " }}")
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,10 @@ impl LogicalScan {

/// The mapped distribution key of the scan operator.
///
/// The column indices in it is the position in the `required_col_idx`,
/// instead of the position in all the columns of the table
/// (which is the table's distribution key).
/// The column indices in it is the position in the `required_col_idx`,instead of the position
/// in all the columns of the table (which is the table's distribution key).
///
/// Return `None` if the table's distribution key are not all in the `required_col_idx`.
pub fn distribution_key(&self) -> Option<Vec<usize>> {
let tb_idx_to_op_idx = self
.required_col_idx
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl fmt::Display for StreamIndexScan {
self.logical.column_names()
}
.join(", "),
self.base.pk_indices
self.base.pk_indices,
)
}
}
Expand Down Expand Up @@ -149,6 +149,7 @@ impl StreamIndexScan {
.iter()
.map(|x| x.column_id.get_id())
.collect(),
is_singleton: false,
})),
pk_indices,
operator_id: if auto_fields {
Expand Down
26 changes: 22 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,23 @@ impl StreamTableScan {
let ctx = logical.base.ctx.clone();

let batch_plan_id = ctx.next_plan_node_id();

let distribution = {
let distribution_key = logical
.distribution_key()
.expect("distribution key of stream chain must exist in output columns");
if distribution_key.is_empty() {
Distribution::Single
} else {
// Follows upstream distribution from TableCatalog
Distribution::HashShard(distribution_key)
}
};
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
logical.base.pk_indices.clone(),
// follows upstream distribution from TableCatalog
Distribution::HashShard(logical.distribution_key().unwrap()),
distribution,
logical.table_desc().appendonly,
);
Self {
Expand Down Expand Up @@ -73,6 +84,7 @@ impl fmt::Display for StreamTableScan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let verbose = self.base.ctx.is_explain_verbose();
let mut builder = f.debug_struct("StreamTableScan");

builder
.field("table", &format_args!("{}", self.logical.table_name()))
.field(
Expand All @@ -86,8 +98,13 @@ impl fmt::Display for StreamTableScan {
.join(", ")
),
)
.field("pk_indices", &format_args!("{:?}", self.base.pk_indices))
.finish()
.field("pk_indices", &format_args!("{:?}", self.base.pk_indices));

if verbose {
builder.field("distribution", &self.distribution());
}

builder.finish()
}
}

Expand Down Expand Up @@ -158,6 +175,7 @@ impl StreamTableScan {
.iter()
.map(|x| x.column_id.get_id())
.collect(),
is_singleton: *self.distribution() == Distribution::Single,
})),
pk_indices,
operator_id: if auto_fields {
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,16 @@ impl Distribution {
} as i32,
distribution: match self {
Distribution::Single => None,
Distribution::HashShard(key) => Some(DistributionProst::HashInfo(HashInfo {
output_count,
key: key.iter().map(|num| *num as u32).collect(),
})),
Distribution::HashShard(key) => {
assert!(
!key.is_empty(),
"hash key should not be empty, use `Single` instead"
);
Comment on lines +105 to +108
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #4165 for this.

Some(DistributionProst::HashInfo(HashInfo {
output_count,
key: key.iter().map(|num| *num as u32).collect(),
}))
}
// TODO: add round robin distribution
Distribution::SomeShard => None,
Distribution::Broadcast => None,
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ impl LocalQueryExecution {
let mut node_body = execution_plan_node.node.clone();
match &mut node_body {
NodeBody::RowSeqScan(ref mut scan_node) => {
let partition = partition.unwrap();
scan_node.vnode_bitmap = Some(partition.vnode_bitmap);
scan_node.scan_ranges = partition.scan_ranges;
if let Some(partition) = partition {
scan_node.vnode_bitmap = Some(partition.vnode_bitmap);
scan_node.scan_ranges = partition.scan_ranges;
}
}
NodeBody::SysRowSeqScan(_) => {}
_ => unreachable!(),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl StreamFragment {
Self {
fragment_id,
fragment_type: FragmentType::Others,
// FIXME: is it okay to use `false` as default value?
is_singleton: false,
BowenXiao1999 marked this conversation as resolved.
Show resolved Hide resolved
node: None,
table_ids_cnt: 0,
Expand Down
11 changes: 6 additions & 5 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,13 @@ impl StreamFragmenter {
// TODO: Force singleton for TopN as a workaround. We should implement two phase TopN.
NodeBody::TopN(_) => current_fragment.is_singleton = true,

NodeBody::Chain(ref node) => {
// FIXME: workaround for single-fragment mview on singleton upstream mview.
NodeBody::Chain(node) => {
// memorize table id for later use
state
.dependent_table_ids
.insert(TableId::new(node.table_id));
current_fragment.is_singleton = node.is_singleton;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the flag node.is_singleton for chain_node is alwasy true? Or it depends on the whether the upstream mview is singleton or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or it depends on the whether the upstream mview is singleton or not.

✔️

}

_ => {}
Expand All @@ -220,8 +222,8 @@ impl StreamFragmenter {
return self.build_delta_join(state, current_fragment, stream_node);
} else {
panic!(
"only inner join without non-equal condition is supported for delta joins"
);
"only inner join without non-equal condition is supported for delta joins"
);
}
}
}
Expand Down Expand Up @@ -253,8 +255,7 @@ impl StreamFragmenter {
match child_node.get_node_body()? {
NodeBody::Exchange(_) if child_node.input.is_empty() => {
// When exchange node is generated when doing rewrites, it could be having
// zero input. In this case, we won't recursively
// visit its children.
// zero input. In this case, we won't recursively visit its children.
Ok(child_node)
}
// Exchange node indicates a new child fragment.
Expand Down
33 changes: 33 additions & 0 deletions src/frontend/test_runner/tests/testdata/mv_on_mv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,36 @@
StreamTableScan { table: m1, columns: [v1, v2, t1._row_id], pk_indices: [2] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: m2, columns: [v1, v2, t1._row_id], pk_indices: [2] }
- id: create_singleton_mv
sql: |
create table t (v int);
create materialized view mv as select v from t order by v limit 10;
- id: select_from_singleton_mv
before:
- create_singleton_mv
sql: |
select v from mv; -- FIXME: there should not be a `Single` exchange here
batch_plan: |
BatchExchange { order: [], dist: Single }
BatchScan { table: mv, columns: [v] }
- id: single_fragment_mv_on_singleton_mv
before:
- create_singleton_mv
sql: |
select v from mv;
stream_plan: |
StreamMaterialize { columns: [v, mv.t._row_id(hidden)], pk_columns: [mv.t._row_id] }
StreamTableScan { table: mv, columns: [v, t._row_id], pk_indices: [1] }
- id: mv_on_singleton_mv
before:
- create_singleton_mv
sql: |
select mv1.v as v from mv as mv1, mv as mv2 where mv1.v = mv2.v;
stream_plan: |
StreamMaterialize { columns: [v, mv.t._row_id(hidden), mv.t._row_id#1(hidden)], pk_columns: [mv.t._row_id, mv.t._row_id#1] }
StreamExchange { dist: HashShard([1, 2]) }
StreamHashJoin { type: Inner, predicate: $0 = $2, output_indices: [0, 1, 3] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: mv, columns: [v, t._row_id], pk_indices: [1] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: mv, columns: [v, t._row_id], pk_indices: [1] }
Loading