Skip to content

Commit

Permalink
feat(sink): prune out hidden columns on sink (#8099)
Browse files Browse the repository at this point in the history
- Prune out hidden columns on sink
- Reject upsert sink without pk after pruning
- Refine `StreamSink` explain format

Approved-By: tabVersion
Approved-By: st1page
  • Loading branch information
xx01cyx authored Feb 22, 2023
1 parent 229a3c7 commit 6c5f68f
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 14 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-1cn-1fe

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sleep 1
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
create table t(v int);
create table t (v int) with ( appendonly = 'true' );

statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( connector = 'kafka' )
explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' )

statement ok
drop table t;
2 changes: 1 addition & 1 deletion e2e_test/ddl/invalid_operation.slt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ drop view not_exists.not_exists.not_exists;

# 4.1. table
statement ok
create table t (v int);
create table t (v int primary key);

statement error Use `DROP TABLE`
drop materialized view t;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( connector = 'kafka' );
explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );

statement ok
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka' );
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );

# Create a mview with duplicated name.
statement error
Expand Down
53 changes: 53 additions & 0 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
statement ok
create table t1 (v1 int, v2 int);

statement error No primary key for the upsert sink
create sink s1 from t1 with (connector = 'console');

statement ok
create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console');

statement ok
create table t2 (v1 int, v2 int primary key);

statement ok
create sink s2 from t2 with (connector = 'console');

statement error No primary key for the upsert sink
create sink s3 as select avg(v1) from t2 with (connector = 'console');

statement ok
create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');

statement ok
create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console');

statement error The sink cannot be append-only
create sink s5 from t2 with (connector = 'console', format = 'append_only');

statement ok
create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true');

statement ok
drop sink s1

statement ok
drop sink s2

statement ok
drop sink s3

statement ok
drop sink s4

statement ok
drop sink s5

statement ok
drop table t1

statement ok
drop table t2
2 changes: 1 addition & 1 deletion e2e_test/sink/blackhole_sink.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE TABLE t5 (v1 int, v2 int);
CREATE TABLE t5 (v1 int primary key, v2 int);

statement ok
CREATE MATERIALIZED VIEW mv5 AS SELECT * FROM t5;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub mod tests {
let sql = r#"CREATE SINK snk1 FROM mv1
WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table =
'<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
mysql.password = '<password>');"#.to_string();
mysql.password = '<password>', format = 'append_only', force_append_only = 'true');"#.to_string();
frontend.run_sql(sql).await.unwrap();

let session = frontend.session_ref();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mod tests {

#[tokio::test]
async fn test_drop_sink_handler() {
let sql_create_table = "create table t (v1 smallint);";
let sql_create_table = "create table t (v1 smallint primary key);";
let sql_create_mv = "create materialized view mv as select v1 from t;";
let sql_create_sink = "create sink snk from mv with( connector = 'mysql')";
let sql_drop_sink = "drop sink snk;";
Expand Down
24 changes: 21 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use risingwave_common::util::iter_util::ZipEqDebug;

use self::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use self::plan_node::{
BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamRowIdGen,
StreamSink,
BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject,
StreamRowIdGen, StreamSink,
};
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
Expand All @@ -49,6 +49,7 @@ use self::plan_visitor::{
use self::property::RequiredDist;
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::InputRef;
use crate::optimizer::plan_node::{
BatchExchange, ColumnPruningContext, PlanNodeType, PlanTreeNode, PredicatePushdownContext,
RewriteExprsRecursive,
Expand Down Expand Up @@ -746,7 +747,24 @@ impl PlanRoot {
definition: String,
properties: WithOptions,
) -> Result<StreamSink> {
let stream_plan = self.gen_stream_plan()?;
let mut stream_plan = self.gen_stream_plan()?;

// Add a project node if there is hidden column(s).
let input_fields = stream_plan.schema().fields();
if input_fields.len() != self.out_fields.count_ones(..) {
let exprs = input_fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if self.out_fields.contains(idx) {
Some(InputRef::new(idx, field.data_type.clone()).into())
} else {
None
}
})
.collect_vec();
stream_plan = StreamProject::new(LogicalProject::new(stream_plan, exprs)).into();
}

StreamSink::create(
stream_plan,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::PlanRef;
use crate::optimizer::property::{Direction, FieldOrder, Order};

pub(crate) fn derive_columns(
schema: &Schema,
input_schema: &Schema,
out_names: Vec<String>,
user_cols: &FixedBitSet,
) -> Result<Vec<ColumnCatalog>> {
Expand All @@ -39,7 +39,7 @@ pub(crate) fn derive_columns(
}

let mut out_name_iter = out_names.into_iter();
let columns = schema
let columns = input_schema
.fields()
.iter()
.enumerate()
Expand Down
38 changes: 37 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_connector::sink::{
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;

use super::derive::{derive_columns, derive_pk};
use super::utils::IndicesDisplay;
use super::{ExprRewritable, PlanBase, PlanRef, StreamNode};
use crate::optimizer::plan_node::PlanTreeNodeUnary;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
Expand Down Expand Up @@ -102,6 +103,14 @@ impl StreamSink {
let sink_type = Self::derive_sink_type(input.append_only(), &properties)?;
let (pk, stream_key) = derive_pk(input, user_order_by, &columns);

if sink_type == SinkType::Upsert && pk.is_empty() {
return Err(ErrorCode::SinkError(Box::new(Error::new(
ErrorKind::InvalidInput,
"No primary key for the upsert sink. Please include the primary key explicitly in sink definition or make the sink append-only.",
)))
.into());
}

Ok(SinkDesc {
id: SinkId::placeholder(),
name,
Expand Down Expand Up @@ -140,7 +149,7 @@ impl StreamSink {
(_, false, true) => {
Err(ErrorCode::SinkError(Box::new(Error::new(
ErrorKind::InvalidInput,
"Cannot force the sink to be append-only without \"format='append_only'\"in WITH options",
"Cannot force the sink to be append-only without \"format='append_only'\"in WITH options.",
)))
.into())
}
Expand All @@ -164,6 +173,33 @@ impl_plan_tree_node_for_unary! { StreamSink }
impl fmt::Display for StreamSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = f.debug_struct("StreamSink");

let sink_type = if self.sink_desc.sink_type.is_append_only() {
"append-only"
} else {
"upsert"
};
let column_names = self
.sink_desc
.columns
.iter()
.map(|col| col.column_desc.name.clone())
.collect_vec()
.join(", ");
builder
.field("type", &format_args!("{}", sink_type))
.field("columns", &format_args!("[{}]", column_names));

if self.sink_desc.sink_type.is_upsert() {
builder.field(
"pk",
&IndicesDisplay {
indices: &self.sink_desc.pk.iter().map(|k| k.column_idx).collect_vec(),
input_schema: &self.base.schema,
},
);
}

builder.finish()
}
}
Expand Down

0 comments on commit 6c5f68f

Please sign in to comment.