Skip to content

Commit

Permalink
Merge branch 'main' into wcy/opendal_batch_delete
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 22, 2023
2 parents a9abc84 + 8dff620 commit a10d67b
Show file tree
Hide file tree
Showing 29 changed files with 367 additions and 101 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ echo "--- starting risingwave cluster with connector node"
cargo make ci-start ci-1cn-1fe

echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sleep 1
Expand Down
54 changes: 40 additions & 14 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions e2e_test/batch/explain.slt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
statement ok
create table t(v int);
create table t (v int) with ( appendonly = 'true' );

statement ok
explain create index i on t(v);

statement ok
explain create sink sink_t from t with ( connector = 'kafka' )
explain create sink sink_t from t with ( connector = 'kafka', format = 'append_only' )

statement ok
drop table t;
2 changes: 1 addition & 1 deletion e2e_test/ddl/invalid_operation.slt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ drop view not_exists.not_exists.not_exists;

# 4.1. table
statement ok
create table t (v int);
create table t (v int primary key);

statement error Use `DROP TABLE`
drop materialized view t;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/ddl/table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ statement ok
explain select v2 from ddl_t;

statement ok
explain create sink sink_t from ddl_t with ( connector = 'kafka' );
explain create sink sink_t from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );

statement ok
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka' );
explain create sink sink_as as select sum(v2) as sum from ddl_t with ( connector = 'kafka', format = 'append_only', force_append_only = 'true' );

# Create a mview with duplicated name.
statement error
Expand Down
53 changes: 53 additions & 0 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
statement ok
create table t1 (v1 int, v2 int);

statement error No primary key for the upsert sink
create sink s1 from t1 with (connector = 'console');

statement ok
create sink s1 as select v1, v2, _row_id from t1 with (connector = 'console');

statement ok
create table t2 (v1 int, v2 int primary key);

statement ok
create sink s2 from t2 with (connector = 'console');

statement error No primary key for the upsert sink
create sink s3 as select avg(v1) from t2 with (connector = 'console');

statement ok
create sink s3 as select avg(v1) from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');

statement ok
create sink s4 as select avg(v1), v2 from t2 group by v2 with (connector = 'console');

statement error The sink cannot be append-only
create sink s5 from t2 with (connector = 'console', format = 'append_only');

statement ok
create sink s5 from t2 with (connector = 'console', format = 'append_only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s6 from t2 with (connector = 'console', format = 'upsert', force_append_only = 'true');

statement ok
drop sink s1

statement ok
drop sink s2

statement ok
drop sink s3

statement ok
drop sink s4

statement ok
drop sink s5

statement ok
drop table t1

statement ok
drop table t2
2 changes: 1 addition & 1 deletion e2e_test/sink/blackhole_sink.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
CREATE TABLE t5 (v1 int, v2 int);
CREATE TABLE t5 (v1 int primary key, v2 int);

statement ok
CREATE MATERIALIZED VIEW mv5 AS SELECT * FROM t5;
Expand Down
11 changes: 9 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,12 @@ enum DispatcherType {
NO_SHUFFLE = 4;
}

// The property of an edge in the fragment graph.
// This is essientially a "logical" version of `Dispatcher`. See the doc of `Dispatcher` for more details.
message DispatchStrategy {
DispatcherType type = 1;
repeated uint32 column_indices = 2;
repeated uint32 dist_key_indices = 2;
repeated uint32 output_indices = 3;
}

// A dispatcher redistribute messages.
Expand All @@ -564,7 +567,11 @@ message Dispatcher {
DispatcherType type = 1;
// Indices of the columns to be used for hashing.
// For dispatcher types other than HASH, this is ignored.
repeated uint32 column_indices = 2;
repeated uint32 dist_key_indices = 2;
// Indices of the columns to output.
// In most cases, this contains all columns in the input. But for some cases like MV on MV or
// schema change, we may only output a subset of the columns.
repeated uint32 output_indices = 6;
// The hash mapping for consistent hash.
// For dispatcher types other than HASH, this is ignored.
ActorMapping hash_mapping = 3;
Expand Down
1 change: 0 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ profile:
- use: kafka
persist-data: true


3etcd-3meta:
steps:
- use: etcd
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub mod tests {
let sql = r#"CREATE SINK snk1 FROM mv1
WITH (connector = 'mysql', mysql.endpoint = '127.0.0.1:3306', mysql.table =
'<table_name>', mysql.database = '<database_name>', mysql.user = '<user_name>',
mysql.password = '<password>');"#.to_string();
mysql.password = '<password>', format = 'append_only', force_append_only = 'true');"#.to_string();
frontend.run_sql(sql).await.unwrap();

let session = frontend.session_ref();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mod tests {

#[tokio::test]
async fn test_drop_sink_handler() {
let sql_create_table = "create table t (v1 smallint);";
let sql_create_table = "create table t (v1 smallint primary key);";
let sql_create_mv = "create materialized view mv as select v1 from t;";
let sql_create_sink = "create sink snk from mv with( connector = 'mysql')";
let sql_drop_sink = "drop sink snk;";
Expand Down
24 changes: 21 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use risingwave_common::util::iter_util::ZipEqDebug;

use self::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer};
use self::plan_node::{
BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamRowIdGen,
StreamSink,
BatchProject, Convention, LogicalProject, StreamDml, StreamMaterialize, StreamProject,
StreamRowIdGen, StreamSink,
};
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
Expand All @@ -49,6 +49,7 @@ use self::plan_visitor::{
use self::property::RequiredDist;
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::InputRef;
use crate::optimizer::plan_node::{
BatchExchange, ColumnPruningContext, PlanNodeType, PlanTreeNode, PredicatePushdownContext,
RewriteExprsRecursive,
Expand Down Expand Up @@ -746,7 +747,24 @@ impl PlanRoot {
definition: String,
properties: WithOptions,
) -> Result<StreamSink> {
let stream_plan = self.gen_stream_plan()?;
let mut stream_plan = self.gen_stream_plan()?;

// Add a project node if there is hidden column(s).
let input_fields = stream_plan.schema().fields();
if input_fields.len() != self.out_fields.count_ones(..) {
let exprs = input_fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if self.out_fields.contains(idx) {
Some(InputRef::new(idx, field.data_type.clone()).into())
} else {
None
}
})
.collect_vec();
stream_plan = StreamProject::new(LogicalProject::new(stream_plan, exprs)).into();
}

StreamSink::create(
stream_plan,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::PlanRef;
use crate::optimizer::property::{Direction, FieldOrder, Order};

pub(crate) fn derive_columns(
schema: &Schema,
input_schema: &Schema,
out_names: Vec<String>,
user_cols: &FixedBitSet,
) -> Result<Vec<ColumnCatalog>> {
Expand All @@ -39,7 +39,7 @@ pub(crate) fn derive_columns(
}

let mut out_name_iter = out_names.into_iter();
let columns = schema
let columns = input_schema
.fields()
.iter()
.enumerate()
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,11 @@ pub fn to_stream_prost_body(
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &base.dist {
dist_key_indices: match &base.dist {
Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(),
_ => vec![],
},
output_indices: (0..base.schema().len() as u32).collect(),
}),
}),
Node::DynamicFilter(me) => {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ impl StreamNode for StreamExchange {
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &self.base.dist {
dist_key_indices: match &self.base.dist {
Distribution::HashShard(keys) => keys.iter().map(|num| *num as u32).collect(),
_ => vec![],
},
output_indices: (0..self.schema().len() as u32).collect(),
}),
})
}
Expand Down
Loading

0 comments on commit a10d67b

Please sign in to comment.