Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into xxchan/scale-source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Aug 5, 2024
1 parent 17e712a commit 1c9c6e5
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 45 deletions.
3 changes: 3 additions & 0 deletions e2e_test/commands/risectl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

RUST_LOG="error" .risingwave/bin/risingwave/risectl "$@"
109 changes: 90 additions & 19 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ create source s0 (v1 int, v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

query I
query ?
select count(*) from rw_internal_tables where name like '%s0%';
----
1

sleep 1s
# sleep 1s

# SourceExecutor's ingestion does not start (state table is empty), even after sleep
system ok
Expand All @@ -39,7 +39,7 @@ statement ok
create materialized view mv_1 as select * from s0;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s
# sleep 2s

# SourceExecutor's ingestion started, but it only starts from latest.
system ok
Expand All @@ -52,10 +52,7 @@ internal_table.mjs --name s0 --type source
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""Backfilling"": ""0""}"
1,"{""Backfilling"": ""0""}"
2,"{""Backfilling"": ""0""}"
3,"{""Backfilling"": ""0""}"
(empty)


# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
Expand All @@ -67,23 +64,23 @@ create materialized view mv_2 as select * from s0;

sleep 2s

query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_2;
----
1 a
Expand Down Expand Up @@ -111,7 +108,7 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
Expand All @@ -123,7 +120,7 @@ select v1, v2 from s0;
4 d
4 dd

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
Expand Down Expand Up @@ -171,17 +168,17 @@ cat <<EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
EOF
done

sleep 3s
# sleep 3s

query IT rowsort
query ?? rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12

query IT rowsort
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
Expand Down Expand Up @@ -210,12 +207,86 @@ internal_table.mjs --name mv_1 --type sourcebackfill
3,"""Finished"""


# scale down
statement ok
ALTER MATERIALIZED VIEW mv_1 SET PARALLELISM TO 2;
# # Note: the parallelism depends on the risedev profile.
# # So scale tests below are commented out.

# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5


# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # scale down
# statement ok
# ALTER MATERIALIZED VIEW mv_1 SET PARALLELISM TO 2;

# # should have no effect, because of NoShuffle
# # TODO: support ALTER SOURCE SET PARALLELISM, then we can
# query ???
# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
# ----
# mv_1 {MVIEW,SOURCE_SCAN} 5
# mv_2 {MVIEW,SOURCE_SCAN} 5
# s0 {SOURCE} 5

# system ok
# risectl meta source-split-info --ignore-id
# ----
# Table
# Fragment (Source)
# Actor (1 splits): [0]
# Actor (1 splits): [2]
# Actor (1 splits): [3]
# Actor (1 splits): [1]
# Actor (0 splits): []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []
# Table
# Fragment (SourceScan)
# Actor (1 splits): [0] <- Upstream Actor #1055: [0]
# Actor (1 splits): [2] <- Upstream Actor #1056: [2]
# Actor (1 splits): [3] <- Upstream Actor #1057: [3]
# Actor (1 splits): [1] <- Upstream Actor #1058: [1]
# Actor (0 splits): [] <- Upstream Actor #1059: []


# # Manual test: change the parallelism of the compute node, kill and restart, and check
# # risedev ctl meta source-split-info --ignore-id
# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"

halt

statement ok
drop source s0 cascade;
37 changes: 26 additions & 11 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetCluster
Ok(response)
}

pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
let GetClusterInfoResponse {
worker_nodes: _,
source_infos: _,
Expand Down Expand Up @@ -80,9 +80,11 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
if table_fragment.actor_splits.is_empty() {
continue;
}

println!("Table #{}", table_fragment.table_id);

if ignore_id {
println!("Table");
} else {
println!("Table #{}", table_fragment.table_id);
}
for fragment in table_fragment.fragments.values() {
let fragment_type_mask = fragment.fragment_type_mask;
if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
Expand All @@ -97,8 +99,12 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
}

println!(
"\tFragment #{} ({})",
fragment.fragment_id,
"\tFragment{} ({})",
if ignore_id {
"".to_string()
} else {
format!(" #{}", fragment.fragment_id)
},
if fragment_type_mask == FragmentTypeFlag::Source as u32 {
"Source"
} else {
Expand All @@ -108,8 +114,12 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
for actor in &fragment.actors {
if let Some(splits) = actor_splits_map.get(&actor.actor_id) {
println!(
"\t\tActor #{:<3} ({} splits): [{}]{}",
actor.actor_id,
"\t\tActor{} ({} splits): [{}]{}",
if ignore_id {
"".to_string()
} else {
format!(" #{:<3}", actor.actor_id,)
},
splits.len(),
splits,
if !actor.upstream_actor_id.is_empty() {
Expand All @@ -120,16 +130,21 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
let upstream_splits =
actor_splits_map.get(&actor.upstream_actor_id[0]).unwrap();
format!(
" <- Upstream Actor #{}: [{}]",
actor.upstream_actor_id[0], upstream_splits
" <- Upstream Actor{}: [{}]",
if ignore_id {
"".to_string()
} else {
format!(" #{}", actor.upstream_actor_id[0])
},
upstream_splits
)
} else {
"".to_string()
}
);
} else {
println!(
"\t\tActor #{:<3} (not found in actor_splits)",
"\t\tError: Actor #{:<3} (not found in actor_splits)",
actor.actor_id,
)
}
Expand Down
9 changes: 6 additions & 3 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,10 @@ enum MetaCommands {
/// get cluster info
ClusterInfo,
/// get source split info
SourceSplitInfo,
SourceSplitInfo {
#[clap(long)]
ignore_id: bool,
},
/// Reschedule the actors in the stream graph
///
/// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
Expand Down Expand Up @@ -799,8 +802,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
Commands::Meta(MetaCommands::SourceSplitInfo) => {
cmd_impl::meta::source_split_info(context).await?
Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
cmd_impl::meta::source_split_info(context, ignore_id).await?
}
Commands::Meta(MetaCommands::Reschedule {
from,
Expand Down
Loading

0 comments on commit 1c9c6e5

Please sign in to comment.