diff --git a/Cargo.lock b/Cargo.lock index ac0d23280ddc6..f66e42c71d301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6188,8 +6188,10 @@ dependencies = [ "async-trait", "bytes", "criterion", + "expect-test", "fail", "futures", + "futures-async-stream", "itertools", "madsim-tokio", "parking_lot 0.12.1", diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index bc7ff418946a0..7e0d82d80b8a3 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -39,10 +39,13 @@ workspace-hack = { path = "../../workspace-hack" } [dev-dependencies] criterion = { version = "0.4", features = ["async_futures"] } +expect-test = "1" futures = { version = "0.3", default-features = false, features = [ "alloc", "executor", ] } + +futures-async-stream = "0.2" risingwave_test_runner = { path = "../../test_runner" } serial_test = "0.9" sync-point = { path = "../../utils/sync-point" } diff --git a/src/storage/hummock_test/src/lib.rs b/src/storage/hummock_test/src/lib.rs index 12bbe52ee8743..8826c58c21fe4 100644 --- a/src/storage/hummock_test/src/lib.rs +++ b/src/storage/hummock_test/src/lib.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +#![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(custom_test_frameworks)] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] #![feature(bound_map)] diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index ca1a541b6b09b..2333a8e932f00 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -17,9 +17,11 @@ use std::ops::Bound::Unbounded; use std::sync::Arc; use bytes::Bytes; +use expect_test::expect; use futures::{pin_mut, TryStreamExt}; +use futures_async_stream::for_await; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::{ @@ -1484,3 +1486,195 @@ async fn test_gc_watermark_and_clear_shared_buffer() { HummockSstableObjectId::MAX ); } + +/// Test the following behaviours: +/// 1. LocalStateStore can read replicated ReadVersion. +/// 2. GlobalStateStore cannot read replicated ReadVersion. +#[tokio::test] +async fn test_replicated_local_hummock_storage() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + + let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; + + let read_options = ReadOptions { + prefix_hint: None, + ignore_range_tombstone: false, + retention_seconds: None, + table_id: TableId { + table_id: TEST_TABLE_ID.table_id, + }, + read_version_from_backup: false, + prefetch_options: Default::default(), + cache_policy: CachePolicy::Fill(CachePriority::High), + }; + + let mut local_hummock_storage = hummock_storage + .new_local(NewLocalOptions::new_replicated( + TEST_TABLE_ID, + false, + TableOption { + retention_seconds: None, + }, + )) + .await; + + let epoch0 = local_hummock_storage + .read_version() + .read() + .committed() + .max_committed_epoch(); + + let epoch1 = epoch0 + 1; + + local_hummock_storage.init(epoch1); + // ingest 16B batch + let mut batch1 = vec![ + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaaa"].concat()), + StorageValue::new_put("1111"), + ), + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"bbbb"].concat()), + StorageValue::new_put("2222"), + ), + ]; + + batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + local_hummock_storage + .ingest_batch( + batch1, + vec![], + WriteOptions { + epoch: epoch1, + table_id: TEST_TABLE_ID, + }, + ) + .await + .unwrap(); + + // Test local state store read for replicated data. + { + assert!(local_hummock_storage.read_version().read().is_replicated()); + + let s = risingwave_storage::store::LocalStateStore::iter( + &local_hummock_storage, + (Unbounded, Unbounded), + read_options.clone(), + ) + .await + .unwrap(); + + let mut actual = vec![]; + + #[for_await] + for v in s { + actual.push(v) + } + + let expected = expect![[r#" + [ + Ok( + ( + FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 }, + b"1111", + ), + ), + Ok( + ( + FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 }, + b"2222", + ), + ), + ] + "#]]; + expected.assert_debug_eq(&actual); + } + + let epoch2 = epoch1 + 1; + + let mut local_hummock_storage_2 = hummock_storage + .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) + .await; + + local_hummock_storage_2.init(epoch2); + + // ingest 16B batch + let mut batch2 = vec![ + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"cccc"].concat()), + StorageValue::new_put("3333"), + ), + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"dddd"].concat()), + StorageValue::new_put("4444"), + ), + ]; + batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + + local_hummock_storage_2 + .ingest_batch( + batch2, + vec![], + WriteOptions { + epoch: epoch2, + table_id: TEST_TABLE_ID, + }, + ) + .await + .unwrap(); + + // Test Global State Store iter, epoch2 + { + let iter = hummock_storage + .iter((Unbounded, Unbounded), epoch2, read_options.clone()) + .await + .unwrap(); + pin_mut!(iter); + + let mut actual = vec![]; + + #[for_await] + for v in iter { + actual.push(v); + } + + let expected = expect![[r#" + [ + Ok( + ( + FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 }, + b"3333", + ), + ), + Ok( + ( + FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 }, + b"4444", + ), + ), + ] + "#]]; + expected.assert_debug_eq(&actual); + } + + // Test Global State Store iter, epoch1 + { + let iter = hummock_storage + .iter((Unbounded, Unbounded), epoch1, read_options) + .await + .unwrap(); + pin_mut!(iter); + + let mut actual = vec![]; + + #[for_await] + for v in iter { + actual.push(v); + } + + let expected = expect![[r#" + [] + "#]]; + expected.assert_debug_eq(&actual); + } +} diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index d33f16f40ae7e..99531d85957a9 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -473,6 +473,7 @@ mod tests { table_option: TracedTableOption { retention_seconds: None, }, + is_replicated: false, }, ), ); diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 5297251cdfb02..623ddc775e968 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -134,6 +134,7 @@ pub struct TracedNewLocalOptions { pub table_id: TracedTableId, pub is_consistent_op: bool, pub table_option: TracedTableOption, + pub is_replicated: bool, } pub type TracedHummockEpoch = u64; diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 33c0b0bfb7d61..7d8fd69411248 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -564,10 +564,14 @@ impl HummockEventHandler { HummockEvent::RegisterReadVersion { table_id, new_read_version_sender, + is_replicated, } => { let pinned_version = self.pinned_version.load(); let basic_read_version = Arc::new(RwLock::new( - HummockReadVersion::new((**pinned_version).clone()), + HummockReadVersion::new_with_replication_option( + (**pinned_version).clone(), + is_replicated, + ), )); let instance_id = self.generate_instance_id(); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 1a192192cab98..20ffe0cea0783 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -76,6 +76,7 @@ pub enum HummockEvent { table_id: TableId, new_read_version_sender: oneshot::Sender<(Arc>, LocalInstanceGuard)>, + is_replicated: bool, }, DestroyReadVersion { @@ -116,7 +117,11 @@ impl HummockEvent { HummockEvent::RegisterReadVersion { table_id, new_read_version_sender: _, - } => format!("RegisterReadVersion table_id {:?}", table_id,), + is_replicated, + } => format!( + "RegisterReadVersion table_id {:?}, is_replicated: {:?}", + table_id, is_replicated + ), HummockEvent::DestroyReadVersion { table_id, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 8ef29bc423b1f..3eb02072e3407 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -230,6 +230,7 @@ impl HummockStorage { .send(HummockEvent::RegisterReadVersion { table_id: option.table_id, new_read_version_sender: tx, + is_replicated: option.is_replicated, }) .unwrap(); diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index e5a2cd110ec34..79e4216377e51 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -127,7 +127,12 @@ impl HummockStorage { let read_guard = self.read_version_mapping.read(); read_guard .get(&table_id) - .map(|v| v.values().cloned().collect_vec()) + .map(|v| { + v.values() + .filter(|v| !v.read_arc().is_replicated()) + .cloned() + .collect_vec() + }) .unwrap_or(Vec::new()) }; diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index a0f113c83a96d..13c0e6e1d7f29 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -65,6 +65,18 @@ pub struct LocalHummockStorage { /// Read handle. read_version: Arc>, + /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`. + /// It's used by executors in different CNs to synchronize states. + /// + /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be + /// persisted, so we won't have duplicate data. + /// + /// This also handles a corner case where an executor doing replication + /// is scheduled to the same CN as its Upstream executor. + /// In that case, we use this flag to avoid reading the same data twice, + /// by ignoring the replicated ReadVersion. + is_replicated: bool, + /// Event sender. event_sender: mpsc::UnboundedSender, @@ -401,9 +413,11 @@ impl LocalHummockStorage { self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); // insert imm to uploader - self.event_sender - .send(HummockEvent::ImmToUploader(imm)) - .unwrap(); + if !self.is_replicated { + self.event_sender + .send(HummockEvent::ImmToUploader(imm)) + .unwrap(); + } timer.observe_duration(); @@ -433,6 +447,7 @@ impl LocalHummockStorage { table_id: option.table_id, is_consistent_op: option.is_consistent_op, table_option: option.table_option, + is_replicated: option.is_replicated, instance_guard, read_version, event_sender, diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 91351f706516b..a6685439fb2eb 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -201,10 +201,19 @@ pub struct HummockReadVersion { /// Remote version for committed data. committed: CommittedVersion, + + /// Indicate if this is replicated. If it is, we should ignore it during + /// global state store read, to avoid duplicated results. + /// Otherwise for local state store, it is fine, see we will see the + /// ReadVersion just for that local state store. + is_replicated: bool, } impl HummockReadVersion { - pub fn new(committed_version: CommittedVersion) -> Self { + pub fn new_with_replication_option( + committed_version: CommittedVersion, + is_replicated: bool, + ) -> Self { // before build `HummockReadVersion`, we need to get the a initial version which obtained // from meta. want this initialization after version is initialized (now with // notification), so add a assert condition to guarantee correct initialization order @@ -217,9 +226,15 @@ impl HummockReadVersion { }, committed: committed_version, + + is_replicated, } } + pub fn new(committed_version: CommittedVersion) -> Self { + Self::new_with_replication_option(committed_version, false) + } + /// Updates the read version with `VersionUpdate`. /// There will be three data types to be processed /// `VersionUpdate::Staging` @@ -416,6 +431,10 @@ impl HummockReadVersion { // add the newly merged imm into front self.staging.merged_imm.push_front(merged_imm); } + + pub fn is_replicated(&self) -> bool { + self.is_replicated + } } pub fn read_filter_for_batch( diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 3a957671270f7..b6d20d5275690 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -409,6 +409,10 @@ pub struct NewLocalOptions { /// `update` and `delete` should match the original stored value. pub is_consistent_op: bool, pub table_option: TableOption, + + /// Indicate if this is replicated. If it is, we should not + /// upload its ReadVersions. + pub is_replicated: bool, } impl From for NewLocalOptions { @@ -417,11 +421,34 @@ impl From for NewLocalOptions { table_id: value.table_id.into(), is_consistent_op: value.is_consistent_op, table_option: value.table_option.into(), + is_replicated: value.is_replicated, } } } impl NewLocalOptions { + pub fn new(table_id: TableId, is_consistent_op: bool, table_option: TableOption) -> Self { + NewLocalOptions { + table_id, + is_consistent_op, + table_option, + is_replicated: false, + } + } + + pub fn new_replicated( + table_id: TableId, + is_consistent_op: bool, + table_option: TableOption, + ) -> Self { + NewLocalOptions { + table_id, + is_consistent_op, + table_option, + is_replicated: true, + } + } + pub fn for_test(table_id: TableId) -> Self { Self { table_id, @@ -429,6 +456,7 @@ impl NewLocalOptions { table_option: TableOption { retention_seconds: None, }, + is_replicated: false, } } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 93ef6898acffa..9ffc788a1c38f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -189,11 +189,11 @@ where let table_option = TableOption::build_table_option(table_catalog.get_properties()); let local_state_store = store - .new_local(NewLocalOptions { + .new_local(NewLocalOptions::new( table_id, is_consistent_op, table_option, - }) + )) .await; let pk_data_types = pk_indices @@ -384,11 +384,11 @@ where is_consistent_op: bool, ) -> Self { let local_state_store = store - .new_local(NewLocalOptions { + .new_local(NewLocalOptions::new( table_id, is_consistent_op, - table_option: TableOption::default(), - }) + TableOption::default(), + )) .await; let pk_data_types = pk_indices