From ff480ff8029fa2851c4fe43fa1ac6fa4a6baa722 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 6 Aug 2024 00:53:05 +0800 Subject: [PATCH] add sim test --- .typos.toml | 1 + Cargo.lock | 1 + .../source/source_backfill_executor.rs | 27 +- src/tests/simulation/Cargo.toml | 1 + src/tests/simulation/src/cluster.rs | 29 +- src/tests/simulation/src/ctl_ext.rs | 32 ++- .../tests/integration_tests/scale/mod.rs | 1 + .../integration_tests/scale/shared_source.rs | 262 ++++++++++++++++++ 8 files changed, 323 insertions(+), 31 deletions(-) create mode 100644 src/tests/simulation/tests/integration_tests/scale/shared_source.rs diff --git a/.typos.toml b/.typos.toml index 4d4bbfca1c08..498d954a55d8 100644 --- a/.typos.toml +++ b/.typos.toml @@ -36,4 +36,5 @@ extend-exclude = [ # We don't want to fix "fals" here, but may want in other places. # Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316 "src/common/src/cast/mod.rs", + "src/tests/simulation/tests/integration_tests/scale/shared_source.rs", ] diff --git a/Cargo.lock b/Cargo.lock index 4ade97747bcf..3ca84e75df10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11355,6 +11355,7 @@ dependencies = [ "madsim-etcd-client", "madsim-rdkafka", "madsim-tokio", + "maplit", "paste", "pin-project", "pretty_assertions", diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 4a34eabe97e1..992747e63ac1 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -584,7 +584,10 @@ impl SourceBackfillExecutorInner { } let mut splits: HashSet = backfill_stage.states.keys().cloned().collect(); - + tracing::info!( + actor_id = self.actor_ctx.id, + "source backfill finished. Enter forward stage" + ); // All splits finished backfilling. Now we only forward the source data. #[for_await] for msg in input { @@ -641,13 +644,21 @@ impl SourceBackfillExecutorInner { /// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to /// this actor, we still need to backfill it. async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult { + let persisted_states = self.backfill_state_store.scan().await?; + if persisted_states.is_empty() { + return Ok(false); + } + let actor_id = self.actor_ctx.id; + tracing::debug!( + actor_id, + "checking whether source backfill is finished, persisted_states: {:?}, states: {:?}", + persisted_states, + states + ); Ok(states .values() .all(|state| matches!(state, BackfillState::Finished)) - && self - .backfill_state_store - .scan() - .await? + && persisted_states .into_iter() .all(|state| matches!(state, BackfillState::Finished))) } @@ -801,8 +812,10 @@ impl SourceBackfillExecutorInner { BackfillState::Finished => {} _ => { return Err(anyhow::anyhow!( - "Unexpected backfill state: {:?}", - backfill_state + "Unexpected backfill state in update_state_if_changed_forward_stage: {:?}, target_splits: {:?}, current_splits: {:?}", + backfill_state, + target_splits, + current_splits ) .into()); } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 6d881f203f67..53dd5e493d3c 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -25,6 +25,7 @@ glob = "0.3" itertools = { workspace = true } lru = { workspace = true } madsim = "0.2.27" +maplit = "1" paste = "1" pin-project = "1.1" pretty_assertions = "1" diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 6c9db8c48170..9355628a406d 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -153,27 +153,16 @@ impl Configuration { /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled, /// so table scan will use `no_shuffle`. pub fn for_scale_no_shuffle() -> Self { - // Embed the config file and create a temporary file at runtime. The file will be deleted - // automatically when it's dropped. - let config_path = { - let mut file = - tempfile::NamedTempFile::new().expect("failed to create temp config file"); - file.write_all(include_bytes!("risingwave-scale.toml")) - .expect("failed to write config file"); - file.into_temp_path() - }; + let mut conf = Self::for_scale(); + conf.per_session_queries = + vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into(); + conf + } - Configuration { - config_path: ConfigPath::Temp(config_path.into()), - frontend_nodes: 2, - compute_nodes: 3, - meta_nodes: 3, - compactor_nodes: 2, - compute_node_cores: 2, - per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()] - .into(), - ..Default::default() - } + pub fn for_scale_shared_source() -> Self { + let mut conf = Self::for_scale(); + conf.per_session_queries = vec!["SET RW_ENABLE_SHARED_SOURCE = true;".into()].into(); + conf } pub fn for_auto_parallelism( diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 9b57673e49c1..a3c492146543 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -14,7 +14,7 @@ #![cfg_attr(not(madsim), expect(unused_imports))] -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ffi::OsString; use std::fmt::Write; use std::sync::Arc; @@ -23,17 +23,17 @@ use anyhow::{anyhow, Result}; use cfg_or_panic::cfg_or_panic; use clap::Parser; use itertools::Itertools; -use rand::seq::{IteratorRandom, SliceRandom}; +use rand::seq::IteratorRandom; use rand::{thread_rng, Rng}; use risingwave_common::catalog::TableId; use risingwave_common::hash::WorkerSlotId; +use risingwave_connector::source::{SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use risingwave_pb::meta::GetClusterInfoResponse; use risingwave_pb::stream_plan::StreamNode; -use serde::de::IntoDeserializer; use self::predicate::BoxedPredicate; use crate::cluster::Cluster; @@ -76,7 +76,7 @@ pub mod predicate { Box::new(p) } - /// There exists operators whose identity contains `s` in the fragment. + /// There exists operators whose identity contains `s` in the fragment (case insensitive). pub fn identity_contains(s: impl Into) -> BoxedPredicate { let s: String = s.into(); let p = move |f: &PbFragment| { @@ -363,6 +363,30 @@ impl Cluster { Ok(response) } + #[cfg_or_panic(madsim)] + pub async fn list_source_splits(&self) -> Result { + let info = self.get_cluster_info().await?; + let mut res = BTreeMap::new(); + + for table in info.table_fragments { + let mut table_actor_splits = BTreeMap::new(); + + for (actor_id, splits) in table.actor_splits { + let splits = splits + .splits + .iter() + .map(|split| SplitImpl::try_from(split).unwrap()) + .map(|split| split.id()) + .collect_vec() + .join(","); + table_actor_splits.insert(actor_id, splits); + } + res.insert(table.table_id, table_actor_splits); + } + + Ok(format!("{res:#?}")) + } + // update node schedulability #[cfg_or_panic(madsim)] async fn update_worker_node_schedulability( diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs index f6940f072409..3c7a702dc629 100644 --- a/src/tests/simulation/tests/integration_tests/scale/mod.rs +++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs @@ -20,6 +20,7 @@ mod nexmark_q4; mod nexmark_source; mod no_shuffle; mod schedulability; +mod shared_source; mod singleton_migration; mod sink; mod streaming_parallelism; diff --git a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs new file mode 100644 index 000000000000..ad7e6392dc38 --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs @@ -0,0 +1,262 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use itertools::Itertools; +use maplit::{convert_args, hashmap}; +use risingwave_common::hash::WorkerSlotId; +use risingwave_pb::meta::table_fragments::Fragment; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; + +const CREATE_SOURCE: &str = r#" +CREATE SOURCE s(v1 int, v2 varchar) WITH ( + connector='kafka', + properties.bootstrap.server='192.168.11.1:29092', + topic='shared_source' +) FORMAT PLAIN ENCODE JSON;"#; + +fn print_actor_upstream(fragment: &Fragment) -> String { + format!( + "{}", + fragment.actors.iter().format_with("\n", |actor, f| { + f(&format_args!( + "{} <- {:?}", + actor.actor_id, actor.upstream_actor_id + )) + }) + ) +} + +#[tokio::test] +async fn test_shared_source() -> Result<()> { + tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::ERROR) + .with_env_filter("risingwave_stream::executor::source::source_backfill_executor=DEBUG") + .init(); + + let mut cluster = Cluster::start(Configuration::for_scale_shared_source()).await?; + cluster.create_kafka_topics(convert_args!(hashmap!( + "shared_source" => 4, + ))); + let mut session = cluster.start_session(); + + session.run("set rw_implicit_flush = true;").await?; + + session.run(CREATE_SOURCE).await?; + session + .run("create materialized view mv as select count(*) from s group by v1;") + .await?; + let source_fragment = cluster + .locate_one_fragment([ + identity_contains("Source"), + no_identity_contains("StreamSourceScan"), + ]) + .await?; + let source_workers = source_fragment.all_worker_count().into_keys().collect_vec(); + let source_backfill_fragment = cluster + .locate_one_fragment([identity_contains("StreamSourceScan")]) + .await?; + let source_backfill_workers = source_backfill_fragment + .all_worker_count() + .into_keys() + .collect_vec(); + expect_test::expect![[r#" + 13 <- [1] + 14 <- [2] + 15 <- [3] + 16 <- [4] + 17 <- [5] + 18 <- [6]"#]] + .assert_eq(&print_actor_upstream(&source_backfill_fragment.inner)); + let hash_agg_fragment = cluster + .locate_one_fragment([identity_contains("hashagg")]) + .await?; + let hash_agg_workers = hash_agg_fragment + .all_worker_count() + .into_keys() + .collect_vec(); + expect_test::expect![[r#" + { + 1: { + 1: "1", + 2: "2", + 3: "", + 4: "", + 5: "0", + 6: "3", + }, + 3: { + 13: "1", + 14: "2", + 15: "", + 16: "", + 17: "0", + 18: "3", + }, + }"#]] + .assert_eq(&cluster.list_source_splits().await?); + expect_test::expect![[r#" + 1 1 HASH {2} {} {SOURCE} 6 + 2 3 HASH {4,3} {3} {MVIEW} 6 + 3 3 HASH {5} {1} {SOURCE_SCAN} 6"#]] + .assert_eq(&cluster.run("select * from rw_fragments;").await?); + expect_test::expect![[r#" + 1 CREATED ADAPTIVE + 3 CREATED ADAPTIVE"#]] + .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); + + // SourceBackfill cannot be scaled because of NoShuffle. + assert!( + &cluster + .reschedule( + source_backfill_fragment + .reschedule([WorkerSlotId::new(source_backfill_workers[0], 0)], []), + ) + .await.unwrap_err().to_string().contains("rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"), + ); + + // hash agg can be scaled independently + expect_test::expect![[r#" + Ok( + (), + ) + "#]] + .assert_debug_eq( + &cluster + .reschedule( + hash_agg_fragment.reschedule([WorkerSlotId::new(hash_agg_workers[0], 0)], []), + ) + .await, + ); + expect_test::expect![[r#" + 1 1 HASH {2} {} {SOURCE} 6 + 2 3 HASH {4,3} {3} {MVIEW} 5 + 3 3 HASH {5} {1} {SOURCE_SCAN} 6"#]] + .assert_eq(&cluster.run("select * from rw_fragments;").await?); + + // source is the NoShuffle upstream. It can be scaled, and the downstream SourceBackfill will be scaled together. + expect_test::expect![[r#" + Ok( + (), + ) + "#]] + .assert_debug_eq( + &cluster + .reschedule(source_fragment.reschedule( + [ + WorkerSlotId::new(source_workers[0], 0), + WorkerSlotId::new(source_workers[0], 1), + WorkerSlotId::new(source_workers[2], 0), + ], + [], + )) + .await, + ); + let source_backfill_fragment = cluster + .locate_one_fragment([identity_contains("StreamSourceScan")]) + .await?; + expect_test::expect![[r#" + 13 <- [1] + 15 <- [3] + 16 <- [4]"#]] + .assert_eq(&print_actor_upstream(&source_backfill_fragment.inner)); + expect_test::expect![[r#" + { + 1: { + 1: "2,3", + 3: "0", + 4: "1", + }, + 3: { + 13: "2,3", + 15: "0", + 16: "1", + }, + }"#]] + .assert_eq(&cluster.list_source_splits().await?); + expect_test::expect![[r#" + 1 1 HASH {2} {} {SOURCE} 3 + 2 3 HASH {4,3} {3} {MVIEW} 5 + 3 3 HASH {5} {1} {SOURCE_SCAN} 3"#]] + .assert_eq(&cluster.run("select * from rw_fragments;").await?); + expect_test::expect![[r#" + 1 CREATED CUSTOM + 3 CREATED CUSTOM"#]] + .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); + + // resolve_no_shuffle is OK + expect_test::expect![[r#" + Ok( + (), + ) + "#]] + .assert_debug_eq( + &cluster + .reschedule_resolve_no_shuffle(source_backfill_fragment.reschedule( + [], + [ + WorkerSlotId::new(source_workers[0], 0), + WorkerSlotId::new(source_workers[0], 1), + WorkerSlotId::new(source_workers[2], 0), + WorkerSlotId::new(source_workers[2], 1), + ], + )) + .await, + ); + let source_backfill_fragment = cluster + .locate_one_fragment([identity_contains("StreamSourceScan")]) + .await?; + expect_test::expect![[r#" + 13 <- [1] + 15 <- [3] + 16 <- [4] + 21 <- [25] + 19 <- [23] + 20 <- [24] + 22 <- [26]"#]] + .assert_eq(&print_actor_upstream(&source_backfill_fragment.inner)); + expect_test::expect![[r#" + { + 1: { + 1: "", + 3: "3", + 4: "1", + 23: "", + 24: "2", + 25: "0", + 26: "", + }, + 3: { + 13: "", + 15: "3", + 16: "1", + 19: "", + 20: "2", + 21: "0", + 22: "", + }, + }"#]] + .assert_eq(&cluster.list_source_splits().await?); + expect_test::expect![[r#" + 1 1 HASH {2} {} {SOURCE} 7 + 2 3 HASH {4,3} {3} {MVIEW} 5 + 3 3 HASH {5} {1} {SOURCE_SCAN} 7"#]] + .assert_eq(&cluster.run("select * from rw_fragments;").await?); + expect_test::expect![[r#" +1 CREATED CUSTOM +3 CREATED CUSTOM"#]] + .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); + Ok(()) +}