Skip to content

Commit

Permalink
Merge branch 'main' into fix-8082-list-from-proto
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Feb 21, 2023
2 parents 616b8c3 + 03a00ae commit fa6b9f7
Show file tree
Hide file tree
Showing 44 changed files with 319 additions and 427 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 1 addition & 49 deletions dashboard/proto/gen/stream_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions e2e_test/extended_query/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ create materialized view mv1 as select sum(v) as sum_v1 from t;
statement ok
insert into t (v) values (1);

query TT
show create table t;
----
public.t CREATE TABLE t (v INT)

query I
select * from t;
----
Expand Down
9 changes: 9 additions & 0 deletions e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ create source s with (
properties.bootstrap.server = '127.0.0.1:29092'
) row format json;

statement ok
create materialized view mv_1 as select * from s

statement error other relation\(s\) depend on it
drop source s

statement ok
drop materialized view mv_1

statement ok
drop source s

Expand Down
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ message StreamFragmentGraph {
// edges between fragments.
repeated StreamFragmentEdge edges = 2;

repeated uint32 dependent_table_ids = 3;
repeated uint32 dependent_relation_ids = 3;
uint32 table_ids_cnt = 4;
StreamEnvironment env = 5;
// If none, default parallelism will be applied.
Expand Down
6 changes: 0 additions & 6 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,10 @@ import "stream_plan.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message HangingChannel {
common.ActorInfo upstream = 1;
common.ActorInfo downstream = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated stream_plan.StreamActor actors = 2;
repeated HangingChannel hanging_channels = 3;
}

message UpdateActorsResponse {
Expand Down
4 changes: 1 addition & 3 deletions scripts/source/test_data/kafka_1_partition_mv_topic.1
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,4 @@
{"v1":7,"v2":"name0"}
{"v1":0,"v2":"name9"}
{"v1":3,"v2":"name2"}
{"v1":7,"v2":"name5"}
{"v1":1,"v2":"name7"}
{"v1":3,"v2":"name9"}
[{"v1":7,"v2":"name5"},{"v1":1,"v2":"name7"},{"v1":3,"v2":"name9"}]
3 changes: 1 addition & 2 deletions scripts/source/test_data/kafka_1_partition_topic.1
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}]
3 changes: 1 addition & 2 deletions scripts/source/test_data/kafka_3_partition_topic.3
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}]
3 changes: 1 addition & 2 deletions scripts/source/test_data/kafka_4_partition_topic.4
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{"v1": 1, "v2": "1"}
{"v1": 2, "v2": "22"}
{"v1": 3, "v2": "333"}
{"v1": 4, "v2": "4444"}
[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}]
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,4 @@
{"v1": 93, "v2": "QE53BJ", "v3": [93, 93, 93], "v4": {"v5": 93, "v6": 94}}
{"v1": 94, "v2": "9Q7W89", "v3": [94, 94, 94], "v4": {"v5": 94, "v6": 95}}
{"v1": 95, "v2": "VGDBS1", "v3": [95, 95, 95], "v4": {"v5": 95, "v6": 96}}
{"v1": 96, "v2": "KK6WEX", "v3": [96, 96, 96], "v4": {"v5": 96, "v6": 97}}
{"v1": 97, "v2": "XRTK3Y", "v3": [97, 97, 97], "v4": {"v5": 97, "v6": 98}}
{"v1": 98, "v2": "ZQ2TCL", "v3": [98, 98, 98], "v4": {"v5": 98, "v6": 99}}
{"v1": 99, "v2": "15UCX7", "v3": [99, 99, 99], "v4": {"v5": 99, "v6": 100}}
[{"v1": 96, "v2": "KK6WEX", "v3": [96, 96, 96], "v4": {"v5": 96, "v6": 97}},{"v1": 97, "v2": "XRTK3Y", "v3": [97, 97, 97], "v4": {"v5": 97, "v6": 98}},{"v1": 98, "v2": "ZQ2TCL", "v3": [98, 98, 98], "v4": {"v5": 98, "v6": 99}},{"v1": 99, "v2": "15UCX7", "v3": [99, 99, 99], "v4": {"v5": 99, "v6": 100}}]
5 changes: 1 addition & 4 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ impl StreamService for StreamServiceImpl {
request: Request<UpdateActorsRequest>,
) -> std::result::Result<Response<UpdateActorsResponse>, Status> {
let req = request.into_inner();
let res = self
.mgr
.update_actors(&req.actors, &req.hanging_channels)
.await;
let res = self.mgr.update_actors(&req.actors).await;
match res {
Err(e) => {
error!("failed to update stream actor {}", e);
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/parser/canal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,5 @@
mod simd_json_parser;

mod operators;
mod util;

pub use simd_json_parser::*;
2 changes: 1 addition & 1 deletion src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use risingwave_expr::vector_op::cast::{
};
use simd_json::{BorrowedValue, StaticNode, ValueAccess};

use super::util::at_least_one_ok;
use crate::parser::canal::operators::*;
use crate::parser::util::at_least_one_ok;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::SourceColumnDesc;
use crate::{ensure_rust_type, ensure_str, impl_common_parser_logic};
Expand Down
49 changes: 0 additions & 49 deletions src/connector/src/parser/canal/util.rs

This file was deleted.

Loading

0 comments on commit fa6b9f7

Please sign in to comment.