Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): properly handled insert given misaligned source #9208

Merged
merged 17 commits into from
Apr 19, 2023
97 changes: 97 additions & 0 deletions e2e_test/batch/basic/insert_unaligned.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v1 int);

statement ok
insert into t values (10), (20), (30);

statement ok
create table t1 (v1 int, v2 int);

statement ok
insert into t1 (v1) values (1), (2);

query I rowsort
select * from t1;
----
1 NULL
2 NULL

statement ok
insert into t1 (v2) values (1), (2);

query I rowsort
select * from t1;
----
1 NULL
2 NULL
NULL 1
NULL 2

statement ok
insert into t1 select * from t;

query I rowsort
select * from t1;
----
1 NULL
10 NULL
2 NULL
20 NULL
30 NULL
NULL 1
NULL 2

statement ok
insert into t1 (v2) select * from t;

query I rowsort
select * from t1;
----
1 NULL
10 NULL
2 NULL
20 NULL
30 NULL
NULL 1
NULL 10
NULL 2
NULL 20
NULL 30

statement error
insert into t1 (v1, v2) select * from t;

statement error
insert into t1 (v1, v2) values (1);

statement error
insert into t1 (v1, v2) values (1, 2, 3);

statement ok
delete from t1;

statement ok
insert into t1 (v2, v1) values (2, 1);

query I rowsort
select * from t1;
----
1 2

statement ok
insert into t1 (v2, v1) select * from t1;

query I rowsort
select * from t1;
----
1 2
2 1

statement ok
drop table t1;

statement ok
drop table t;
8 changes: 8 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ message InsertNode {
// An optional field and will be `None` for tables without user-defined pk.
// The `BatchInsertExecutor` should add a column with NULL value which will
// be filled in streaming.
y-wei marked this conversation as resolved.
Show resolved Hide resolved
message DefaultColumns {
y-wei marked this conversation as resolved.
Show resolved Hide resolved
message IndexAndExpr {
uint32 index = 1;
expr.ExprNode expr = 2;
}
repeated IndexAndExpr default_column = 1;
}
optional DefaultColumns default_columns = 6;
y-wei marked this conversation as resolved.
Show resolved Hide resolved
optional uint32 row_id_index = 3;
bool returning = 4;
}
Expand Down
34 changes: 34 additions & 0 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ use std::iter::repeat;
use anyhow::Context;
use futures::future::try_join_all;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::serial_array::SerialArray;
use risingwave_common::array::{ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::insert_node::default_columns::IndexAndExpr;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::dml_manager::DmlManagerRef;

Expand All @@ -43,6 +47,7 @@ pub struct InsertExecutor {
schema: Schema,
identity: String,
column_indices: Vec<usize>,
sorted_default_columns: Vec<(usize, BoxedExpression)>,

row_id_index: Option<usize>,
returning: bool,
Expand All @@ -58,6 +63,7 @@ impl InsertExecutor {
chunk_size: usize,
identity: String,
column_indices: Vec<usize>,
sorted_default_columns: Vec<(usize, BoxedExpression)>,
row_id_index: Option<usize>,
returning: bool,
) -> Self {
Expand All @@ -77,6 +83,7 @@ impl InsertExecutor {
},
identity,
column_indices,
sorted_default_columns,
row_id_index,
returning,
}
Expand Down Expand Up @@ -119,6 +126,13 @@ impl InsertExecutor {
columns = ordered_cols
Copy link
Contributor

@broccoliSpicy broccoliSpicy Apr 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a segmentation fault here
ordered_cols[*idx] = columns[i].clone()
if the NULL padding is still not executed here, then the ordered_cols might not have enough capacity to hold the values at the correct index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I believe it's resolved now :)

}

let one_row_chunk = DataChunk::new_dummy(cap);

for (idx, expr) in &self.sorted_default_columns {
let column = Column::new(expr.eval(&one_row_chunk).await?);
columns.insert(*idx, column);
}

// If the user does not specify the primary key, then we need to add a column as the
// primary key.
if let Some(row_id_index) = self.row_id_index {
Expand Down Expand Up @@ -188,6 +202,24 @@ impl BoxedExecutorBuilder for InsertExecutor {
.iter()
.map(|&i| i as usize)
.collect();
let sorted_default_columns = if let Some(default_columns) = &insert_node.default_columns {
let mut default_columns = default_columns
.get_default_column()
.iter()
.cloned()
.map(|IndexAndExpr { index: i, expr: e }| {
(
i as usize,
build_from_prost(&(e.expect("expr should be Some")))
.expect("expr corrputed"),
y-wei marked this conversation as resolved.
Show resolved Hide resolved
)
})
.collect_vec();
default_columns.sort_unstable_by_key(|(i, _)| *i);
default_columns
} else {
vec![]
};

Ok(Box::new(Self::new(
table_id,
Expand All @@ -197,6 +229,7 @@ impl BoxedExecutorBuilder for InsertExecutor {
source.context.get_config().developer.chunk_size,
source.plan_node().get_identity().clone(),
column_indices,
sorted_default_columns,
insert_node.row_id_index.as_ref().map(|index| *index as _),
insert_node.returning,
)))
Expand Down Expand Up @@ -290,6 +323,7 @@ mod tests {
1024,
"InsertExecutor".to_string(),
vec![], // Ignoring insertion order
vec![],
row_id_index,
false,
));
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ async fn test_table_materialize() -> StreamResult<()> {
1024,
"InsertExecutor".to_string(),
vec![], // ignore insertion order
vec![],
Some(row_id_index),
false,
));
Expand Down
97 changes: 90 additions & 7 deletions src/frontend/planner_test/tests/testdata/insert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,55 +27,55 @@
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[1:Float32, 2:Int32, null:Varchar]] }
└─BatchValues { rows: [[1:Float32, 2:Int32]] }
- name: insert values with implicit null (multiple rows)
sql: |
create table t (v1 real, v2 int, v3 varchar);
insert into t values (1, 2), (3, 4);
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[1:Float32, 2:Int32, null:Varchar], [3:Float32, 4:Int32, null:Varchar]] }
└─BatchValues { rows: [[1:Float32, 2:Int32], [3:Float32, 4:Int32]] }
- name: implicit null user defined columns 1
sql: |
create table t (v1 int, v2 int);
insert into t (v1) values (5);
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[5:Int32, null:Int32]] }
└─BatchValues { rows: [[5:Int32]] }
- name: implicit null user defined columns 2
sql: |
create table t (v1 int, v2 int);
insert into t (v2) values (6);
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[6:Int32, null:Int32]] }
└─BatchValues { rows: [[6:Int32]] }
- name: implicit null user defined columns 3
sql: |
create table t (v1 int, v2 int, v3 int);
insert into t (v3) values (6);
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[6:Int32, null:Int32, null:Int32]] }
└─BatchValues { rows: [[6:Int32]] }
- name: implicit null user defined columns 4
sql: |
create table t (v1 int, v2 int, v3 int);
insert into t (v1) values (6);
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[6:Int32, null:Int32, null:Int32]] }
└─BatchValues { rows: [[6:Int32]] }
- name: implicit null user defined columns 5
sql: |
create table t (v1 int, v2 int, v3 int);
insert into t (v2) values (6);
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[6:Int32, null:Int32, null:Int32]] }
└─BatchValues { rows: [[6:Int32]] }
- name: insert values on non-assign-castable types
sql: |
create table t (v1 real, v2 int);
Expand Down Expand Up @@ -234,3 +234,86 @@
create table t (a int, b int);
insert into t values (0,1), (1,2) returning sum(a);
binder_error: 'Bind error: should not have agg/window in the `RETURNING` list'
- name: insert and specify all columns with values
sql: |
create table t (a int, b int);
insert into t (b, a) values (1, 2);
logical_plan: |
LogicalInsert { table: t }
y-wei marked this conversation as resolved.
Show resolved Hide resolved
└─LogicalValues { rows: [[1:Int32, 2:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32, *VALUES*_0.column_1:Int32] } }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[1:Int32, 2:Int32]] }
- name: insert and specify all columns with query
sql: |
create table t (a int, b int);
insert into t (b, a) select * from t;
logical_plan: |
LogicalInsert { table: t }
└─LogicalProject { exprs: [t.a, t.b] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard }
- name: insert and specify some columns with values
sql: |
create table t (a int, b int);
insert into t (a) values (1), (2);
logical_plan: |
LogicalInsert { table: t }
└─LogicalValues { rows: [[1:Int32], [2:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[1:Int32], [2:Int32]] }
- name: insert and specify some columns with query
sql: |
create table t (a int, b int);
insert into t (a) select b from t;
logical_plan: |
LogicalInsert { table: t }
└─LogicalProject { exprs: [t.b] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.b], distribution: SomeShard }
- name: insert with less source columns from values
sql: |
create table t (a int, b int);
insert into t values (1);
logical_plan: |
LogicalInsert { table: t }
└─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [*VALUES*_0.column_0:Int32] } }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchValues { rows: [[1:Int32]] }
- name: insert with less source columns from query
sql: |
create table t (a int, b int);
insert into t select a from t;
logical_plan: |
LogicalInsert { table: t }
└─LogicalProject { exprs: [t.a] }
└─LogicalProject { exprs: [t.a] }
└─LogicalScan { table: t, columns: [t.a, t.b, t._row_id] }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchInsert { table: t }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.a], distribution: SomeShard }
- name: insert with more source columns, reject
sql: |
create table t (a int, b int);
insert into t values (1, 2, 3);
binder_error: 'Bind error: INSERT has more expressions than target columns'
- name: insert with more source columns than target, reject
sql: |
create table t (a int, b int);
insert into t (a) select * from t;
binder_error: 'Bind error: INSERT has more expressions than target columns'
Loading