Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): change create mv field name #2662

Merged
merged 9 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/streaming/extreme_null.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
create materialized view mv2 as select round(avg(v1), 1) as avg_v1, sum(v2) as sum_v2, count(v3) as count_v3 from t1;

statement ok
create materialized view mv3 as select sum(v1) as sum_v1, min(v1) as min_v1, max(v1) as max_v1 from t4 group by v3;
create materialized view mv3 as select sum(v1) as sum_v1, min(v1) as min_v1, max(v1) as max_v1, v3 from t4 group by v3;

query RII
select avg_v1, sum_v2, count_v3 from mv2;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ pub fn check_valid_column_name(column_name: &str) -> Result<()> {

const ROWID_PREFIX: &str = "_row_id";

pub fn gen_row_id_column_name(idx: usize) -> String {
ROWID_PREFIX.to_string() + "#" + &idx.to_string()
pub fn row_id_column_name() -> String {
ROWID_PREFIX.to_string()
}

pub fn is_row_id_column_name(name: &str) -> bool {
Expand All @@ -62,7 +62,7 @@ pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc {
data_type: DataType::Int64,
column_id: ColumnId::new(0),
name: gen_row_id_column_name(0),
name: row_id_column_name(),
field_descs: vec![],
type_name: "".to_string(),
}
Expand Down
12 changes: 7 additions & 5 deletions src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ pub(crate) fn gen_create_index_plan(

// Manually assemble the materialization plan for the index MV.
let materialize = {
let mut required_cols = FixedBitSet::with_capacity(table_desc.columns.len());
required_cols.toggle_range(..);
required_cols.toggle(0);
let mut out_names: Vec<String> =
table_desc.columns.iter().map(|c| c.name.clone()).collect();
out_names.remove(0);

let scan_node = StreamTableScan::new(LogicalScan::new(
table_name,
(0..table_desc.columns.len()).into_iter().collect(),
Expand All @@ -102,11 +109,6 @@ pub(crate) fn gen_create_index_plan(
vec![],
context,
));
let mut required_cols = FixedBitSet::with_capacity(scan_node.schema().len());
required_cols.toggle_range(..);
required_cols.toggle(0);
let mut out_names = scan_node.schema().names();
out_names.remove(0);

PlanRoot::new(
scan_node.into(),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub mod tests {
use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
use risingwave_common::types::DataType;

use crate::catalog::gen_row_id_column_name;
use crate::catalog::row_id_column_name;
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

#[tokio::test]
Expand Down Expand Up @@ -151,7 +151,7 @@ pub mod tests {
let city_type = DataType::Struct {
fields: vec![DataType::Varchar, DataType::Varchar].into(),
};
let row_id_col_name = gen_row_id_column_name(0);
let row_id_col_name = row_id_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Int64,
"country.zipcode" => DataType::Varchar,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub mod tests {
use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
use risingwave_common::types::DataType;

use crate::catalog::gen_row_id_column_name;
use crate::catalog::row_id_column_name;
use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA};

#[tokio::test]
Expand Down Expand Up @@ -178,7 +178,7 @@ pub mod tests {
let city_type = DataType::Struct {
fields: vec![DataType::Varchar, DataType::Varchar].into(),
};
let row_id_col_name = gen_row_id_column_name(0);
let row_id_col_name = row_id_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Int64,
"id" => DataType::Int32,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ mod tests {
use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
use risingwave_common::types::DataType;

use crate::catalog::gen_row_id_column_name;
use crate::catalog::row_id_column_name;
use crate::test_utils::LocalFrontend;

#[tokio::test]
Expand Down Expand Up @@ -200,7 +200,7 @@ mod tests {
.map(|col| (col.name.as_str(), col.data_type.clone()))
.collect::<HashMap<&str, DataType>>();

let row_id_col_name = gen_row_id_column_name(0);
let row_id_col_name = row_id_column_name();
let expected_columns = maplit::hashmap! {
row_id_col_name.as_str() => DataType::Int64,
"v1" => DataType::Int16,
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ impl LogicalScan {
}

pub(super) fn column_names(&self) -> Vec<String> {
self.schema()
.fields()
self.required_col_idx
.iter()
.map(|f| f.name.clone())
.map(|i| self.table_desc.columns[*i].name.clone())
.collect()
}

Expand Down
50 changes: 18 additions & 32 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::HashMap;
use std::fmt;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, Field, OrderedColumnDesc, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, OrderedColumnDesc, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
use risingwave_common::util::sort_util::OrderType;
Expand All @@ -28,7 +28,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use super::{PlanRef, PlanTreeNodeUnary, ToStreamProst};
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::{gen_row_id_column_name, is_row_id_column_name, ColumnId};
use crate::catalog::ColumnId;
use crate::optimizer::plan_node::{PlanBase, PlanNode};
use crate::optimizer::property::{Distribution, Order};

Expand All @@ -45,7 +45,7 @@ impl StreamMaterialize {
fn derive_plan_base(input: &PlanRef) -> Result<PlanBase> {
let ctx = input.ctx();

let schema = Self::derive_schema(input.schema())?;
let schema = input.schema().clone();
let pk_indices = input.pk_indices();

// Materialize executor won't change the append-only behavior of the stream, so it depends
Expand All @@ -59,28 +59,6 @@ impl StreamMaterialize {
))
}

fn derive_schema(schema: &Schema) -> Result<Schema> {
let mut row_id_count = 0;
let fields = schema
.fields()
.iter()
.map(|field| match is_row_id_column_name(&field.name) {
true => {
let field = Field::with_struct(
field.data_type.clone(),
gen_row_id_column_name(row_id_count),
field.sub_fields.clone(),
field.type_name.clone(),
);
row_id_count += 1;
field
}
false => field.clone(),
})
.collect();
Ok(Schema { fields })
}

#[must_use]
pub fn new(input: PlanRef, table: TableCatalog) -> Self {
let base = Self::derive_plan_base(&input).unwrap();
Expand Down Expand Up @@ -114,9 +92,9 @@ impl StreamMaterialize {
let schema = &base.schema;
let pk_indices = &base.pk_indices;

let mut col_names = HashSet::new();
let mut col_names = HashMap::new();
for name in &out_names {
if !col_names.insert(name) {
if col_names.try_insert(name.clone(), 0).is_err() {
return Err(
InternalError(format!("column {} specified more than once", name)).into(),
);
Expand All @@ -132,10 +110,18 @@ impl StreamMaterialize {
column_desc: ColumnDesc::from_field_without_column_id(field),
is_hidden: !user_cols.contains(i),
};
if !c.is_hidden {
let name = out_name_iter.next().unwrap();
c.column_desc.name = name;
}
c.column_desc.name = if !c.is_hidden {
out_name_iter.next().unwrap()
} else {
match col_names.try_insert(field.name.clone(), 0) {
Ok(_) => field.name.clone(),
Err(mut err) => {
let cnt = err.entry.get_mut();
*cnt += 1;
field.name.clone() + "#" + &cnt.to_string()
}
}
};
Comment on lines -135 to +124
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the core change is here.

c
})
.collect_vec();
Expand Down
24 changes: 12 additions & 12 deletions src/frontend/test_runner/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
StreamProject { exprs: [$0, ($2 + ($3 * $4))] }
StreamHashAgg { group_keys: [$0], aggs: [count, min($1), max($2), count($0)] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] }
- sql: |
create table t(v1 int, v2 int, v3 int);
select min(v1) + max(v2) * count(v3) as agg from t;
Expand All @@ -42,7 +42,7 @@
StreamProject { exprs: [($1 + ($2 * $3)), $0, $1, $2, $3] }
StreamSimpleAgg { aggs: [count, min($0), max($1), count($2)] }
StreamExchange { dist: Single }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] }
- sql: |
create table t(v1 int, v2 int);
select v1 from t group by v2;
Expand All @@ -67,7 +67,7 @@
StreamHashAgg { group_keys: [$0], aggs: [count, min($1), sum($2), count($2)] }
StreamProject { exprs: [$2, $0, ($0 + $1), $3] }
StreamExchange { dist: HashShard([2]) }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] }
- sql: |
/* test logical_agg with complex group expression */
create table t(v1 int, v2 int);
Expand All @@ -76,7 +76,7 @@
LogicalProject { exprs: [$1, $2] }
LogicalAgg { group_keys: [0], agg_calls: [min($1), sum($0)] }
LogicalProject { exprs: [($1 + $2), $1] }
LogicalScan { table: t, columns: [_row_id#0, v1, v2] }
LogicalScan { table: t, columns: [_row_id, v1, v2] }
- sql: |
/* test logical_agg with complex group expression */
create table t(v1 int, v2 int, v3 int);
Expand All @@ -85,7 +85,7 @@
LogicalProject { exprs: [$1, $2] }
LogicalAgg { group_keys: [0, 1], agg_calls: [sum($2)] }
LogicalProject { exprs: [(($1 + $2) / $3), $1, ($1 * $2)] }
LogicalScan { table: t, columns: [_row_id#0, v1, v2, v3] }
LogicalScan { table: t, columns: [_row_id, v1, v2, v3] }
- sql: |
/* test logical_agg with complex group expression */
create table t(v1 int, v2 int);
Expand All @@ -94,7 +94,7 @@
LogicalProject { exprs: [$0] }
LogicalAgg { group_keys: [0], agg_calls: [] }
LogicalProject { exprs: [($1 + $2)] }
LogicalScan { table: t, columns: [_row_id#0, v1, v2] }
LogicalScan { table: t, columns: [_row_id, v1, v2] }
- sql: |
/* test logical_agg with complex group expression */
/* should complain about nested agg call */
Expand All @@ -109,7 +109,7 @@
LogicalProject { exprs: [($0 + $1)] }
LogicalAgg { group_keys: [0, 1], agg_calls: [] }
LogicalProject { exprs: [$1, $2] }
LogicalScan { table: t, columns: [_row_id#0, v1, v2] }
LogicalScan { table: t, columns: [_row_id, v1, v2] }
- sql: |
create table t(v1 int, v2 int);
select v1 from t group by v1 + v2;
Expand All @@ -127,7 +127,7 @@
StreamSimpleAgg { aggs: [count, count($0), sum($0)] }
StreamExchange { dist: Single }
StreamProject { exprs: [($0 + $1), $2] }
StreamTableScan { table: t, columns: [v1, v2, _row_id#0], pk_indices: [2] }
StreamTableScan { table: t, columns: [v1, v2, _row_id], pk_indices: [2] }
- sql: |
create table t(v1 int, v2 int, v3 int);
select v1, sum(v2 + v3) / count(v2 + v3) + max(v1) as agg from t group by v1;
Expand All @@ -144,7 +144,7 @@
StreamHashAgg { group_keys: [$0], aggs: [count, sum($1), count($1), max($0)] }
StreamProject { exprs: [$0, ($1 + $2), $3] }
StreamExchange { dist: HashShard([0]) }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id#0], pk_indices: [3] }
StreamTableScan { table: t, columns: [v1, v2, v3, _row_id], pk_indices: [3] }
- sql: |
create table t (v1 real not null);
select v1, count(*) from t group by v1;
Expand Down Expand Up @@ -179,7 +179,7 @@
LogicalFilter { predicate: ($0 > 5:Int32) }
LogicalAgg { group_keys: [0], agg_calls: [] }
LogicalProject { exprs: [$1] }
LogicalScan { table: t, columns: [_row_id#0, v1] }
LogicalScan { table: t, columns: [_row_id, v1] }
- sql: |
/* having with non-group column */
create table t (v1 real not null, v2 int);
Expand All @@ -192,7 +192,7 @@
logical_plan: |
LogicalAgg { group_keys: [0], agg_calls: [] }
LogicalProject { exprs: [$1] }
LogicalScan { table: t, columns: [_row_id#0, v1, v2] }
LogicalScan { table: t, columns: [_row_id, v1, v2] }
- sql: |
/* distinct with agg */
create table t (v1 int, v2 int);
Expand All @@ -202,4 +202,4 @@
LogicalProject { exprs: [$1] }
LogicalAgg { group_keys: [0], agg_calls: [sum($1)] }
LogicalProject { exprs: [$2, $1] }
LogicalScan { table: t, columns: [_row_id#0, v1, v2] }
LogicalScan { table: t, columns: [_row_id, v1, v2] }
Loading