From 2fed2a787679e9022725bb3b7a22de7fee14c955 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 8 Jun 2023 00:31:12 +0800 Subject: [PATCH 01/14] add no_upload flag --- src/storage/src/hummock/store/state_store.rs | 11 ++++++--- src/storage/src/store.rs | 26 ++++++++++++++++++++ src/stream/src/common/table/state_table.rs | 10 ++++---- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 7c06b7fb3e790..acb45a8615268 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -66,6 +66,8 @@ pub struct LocalHummockStorage { /// Read handle. read_version: Arc>, + no_upload: bool, + /// Event sender. event_sender: mpsc::UnboundedSender, @@ -404,9 +406,11 @@ impl LocalHummockStorage { self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); // insert imm to uploader - self.event_sender - .send(HummockEvent::ImmToUploader(imm)) - .unwrap(); + if self.no_upload { + self.event_sender + .send(HummockEvent::ImmToUploader(imm)) + .unwrap(); + } timer.observe_duration(); @@ -437,6 +441,7 @@ impl LocalHummockStorage { table_id: option.table_id, is_consistent_op: option.is_consistent_op, table_option: option.table_option, + no_upload: option.no_upload, instance_guard, read_version, event_sender, diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 3a957671270f7..4123d6fce7fd8 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -409,6 +409,9 @@ pub struct NewLocalOptions { /// `update` and `delete` should match the original stored value. pub is_consistent_op: bool, pub table_option: TableOption, + + /// Used to indicate that we should not upload the data. + pub no_upload: bool, } impl From for NewLocalOptions { @@ -422,6 +425,28 @@ impl From for NewLocalOptions { } impl NewLocalOptions { + pub fn new(table_id: TableId, is_consistent_op: bool, table_option: TableOption) -> Self { + NewLocalOptions { + table_id, + is_consistent_op, + table_option, + no_upload: true, + } + } + + pub fn new_temporary( + table_id: TableId, + is_consistent_op: bool, + table_option: TableOption, + ) -> Self { + NewLocalOptions { + table_id, + is_consistent_op, + table_option, + no_upload: false, + } + } + pub fn for_test(table_id: TableId) -> Self { Self { table_id, @@ -429,6 +454,7 @@ impl NewLocalOptions { table_option: TableOption { retention_seconds: None, }, + no_upload: false, } } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 93ef6898acffa..9ffc788a1c38f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -189,11 +189,11 @@ where let table_option = TableOption::build_table_option(table_catalog.get_properties()); let local_state_store = store - .new_local(NewLocalOptions { + .new_local(NewLocalOptions::new( table_id, is_consistent_op, table_option, - }) + )) .await; let pk_data_types = pk_indices @@ -384,11 +384,11 @@ where is_consistent_op: bool, ) -> Self { let local_state_store = store - .new_local(NewLocalOptions { + .new_local(NewLocalOptions::new( table_id, is_consistent_op, - table_option: TableOption::default(), - }) + TableOption::default(), + )) .await; let pk_data_types = pk_indices From bd4194ec9eace0a17dc7e2d9612bbf24ef59a044 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 8 Jun 2023 09:27:31 +0800 Subject: [PATCH 02/14] rerun From 7566b01cb1a4c50f8e591eb1232d89ad7aebaa0d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 8 Jun 2023 09:53:15 +0800 Subject: [PATCH 03/14] fix --- src/storage/src/store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 4123d6fce7fd8..ce284224b01bf 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -430,7 +430,7 @@ impl NewLocalOptions { table_id, is_consistent_op, table_option, - no_upload: true, + no_upload: false, } } @@ -443,7 +443,7 @@ impl NewLocalOptions { table_id, is_consistent_op, table_option, - no_upload: false, + no_upload: true, } } From d857a87caf02a2749822acf7103f75ad952b3877 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 8 Jun 2023 09:59:59 +0800 Subject: [PATCH 04/14] fix --- src/storage/src/hummock/store/state_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index acb45a8615268..f39c07e45faa9 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -406,7 +406,7 @@ impl LocalHummockStorage { self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); // insert imm to uploader - if self.no_upload { + if !self.no_upload { self.event_sender .send(HummockEvent::ImmToUploader(imm)) .unwrap(); From 5a7782f8b7d697aaa71c8d5c8dca5cfb30fc42e2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 8 Jun 2023 10:36:23 +0800 Subject: [PATCH 05/14] docs --- src/storage/src/hummock/store/state_store.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index f39c07e45faa9..2a5444644b989 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -66,6 +66,8 @@ pub struct LocalHummockStorage { /// Read handle. read_version: Arc>, + /// This could be used when `LocalHummockStorage` is used + /// solely for local storage, e.g. when replicating data for executors in different CNs. no_upload: bool, /// Event sender. From ea8ea5b80460aa9c6058faf59d77bf5198c4e48c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 9 Jun 2023 14:56:46 +0800 Subject: [PATCH 06/14] no_upload -> is_replicated --- src/storage/src/hummock/store/state_store.rs | 6 +++--- src/storage/src/store.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 2a5444644b989..110189db0bc1d 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -68,7 +68,7 @@ pub struct LocalHummockStorage { /// This could be used when `LocalHummockStorage` is used /// solely for local storage, e.g. when replicating data for executors in different CNs. - no_upload: bool, + is_replicated: bool, /// Event sender. event_sender: mpsc::UnboundedSender, @@ -408,7 +408,7 @@ impl LocalHummockStorage { self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); // insert imm to uploader - if !self.no_upload { + if !self.is_replicated { self.event_sender .send(HummockEvent::ImmToUploader(imm)) .unwrap(); @@ -443,7 +443,7 @@ impl LocalHummockStorage { table_id: option.table_id, is_consistent_op: option.is_consistent_op, table_option: option.table_option, - no_upload: option.no_upload, + is_replicated: option.is_replicated, instance_guard, read_version, event_sender, diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index ce284224b01bf..467c8b1493f26 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -411,7 +411,7 @@ pub struct NewLocalOptions { pub table_option: TableOption, /// Used to indicate that we should not upload the data. - pub no_upload: bool, + pub is_replicated: bool, } impl From for NewLocalOptions { @@ -430,11 +430,11 @@ impl NewLocalOptions { table_id, is_consistent_op, table_option, - no_upload: false, + is_replicated: false, } } - pub fn new_temporary( + pub fn new_replicated( table_id: TableId, is_consistent_op: bool, table_option: TableOption, @@ -443,7 +443,7 @@ impl NewLocalOptions { table_id, is_consistent_op, table_option, - no_upload: true, + is_replicated: true, } } @@ -454,7 +454,7 @@ impl NewLocalOptions { table_option: TableOption { retention_seconds: None, }, - no_upload: false, + is_replicated: false, } } } From 3c07f2df32d2b3d24129fa90bfcabc7c1886dc71 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 9 Jun 2023 15:01:49 +0800 Subject: [PATCH 07/14] add is_replicated flag to `HummockReadVersion` --- src/storage/src/hummock/store/version.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 3074bdf850a52..2fd0b251653d7 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -206,10 +206,16 @@ pub struct HummockReadVersion { /// Remote version for committed data. committed: CommittedVersion, + + /// Check if this is replicated. If it is, we should ignore it during + /// global state store read, to avoid duplicated results. + /// Otherwise for local state store, it is fine, see we will see the + /// ReadVersion just for that local state store. + is_replicated: bool, } impl HummockReadVersion { - pub fn new(committed_version: CommittedVersion) -> Self { + fn new_inner(committed_version: CommittedVersion, is_replicated: bool) -> Self { // before build `HummockReadVersion`, we need to get the a initial version which obtained // from meta. want this initialization after version is initialized (now with // notification), so add a assert condition to guarantee correct initialization order @@ -222,9 +228,19 @@ impl HummockReadVersion { }, committed: committed_version, + + is_replicated, } } + pub fn new(committed_version: CommittedVersion) -> Self { + Self::new_inner(committed_version, false) + } + + pub fn new_replicated(committed_version: CommittedVersion) -> Self { + Self::new_inner(committed_version, true) + } + /// Updates the read version with `VersionUpdate`. /// There will be three data types to be processed /// `VersionUpdate::Staging` From 72ebc1abe819654e1fe8f99afa95f72bd34195d7 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 9 Jun 2023 16:30:39 +0800 Subject: [PATCH 08/14] add is_replicated check --- src/storage/src/hummock/store/version.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 2fd0b251653d7..c7bc3611a5ddb 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -437,6 +437,10 @@ impl HummockReadVersion { // add the newly merged imm into front self.staging.merged_imm.push_front(merged_imm); } + + fn is_replicated(&self) -> bool { + self.is_replicated + } } pub fn read_filter_for_batch( From 56e0a63138e4da2ae3257a0c00785a1032a3634e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 9 Jun 2023 16:36:20 +0800 Subject: [PATCH 09/14] hide replicated read versions from global state store read --- src/storage/src/hummock/state_store.rs | 7 ++++++- src/storage/src/hummock/store/version.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index cd5599a71fb2b..12187e06c9c13 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -127,7 +127,12 @@ impl HummockStorage { let read_guard = self.read_version_mapping.read(); read_guard .get(&table_id) - .map(|v| v.values().cloned().collect_vec()) + .map(|v| { + v.values() + .filter(|v| !v.read_arc().is_replicated()) + .cloned() + .collect_vec() + }) .unwrap_or(Vec::new()) }; diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index c7bc3611a5ddb..690baf27b9024 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -438,7 +438,7 @@ impl HummockReadVersion { self.staging.merged_imm.push_front(merged_imm); } - fn is_replicated(&self) -> bool { + pub fn is_replicated(&self) -> bool { self.is_replicated } } From f5f23eb158620a091998d352e79f31ad6d278dd9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 9 Jun 2023 16:48:00 +0800 Subject: [PATCH 10/14] ensure ReadVersion is created with replication options --- .../hummock/event_handler/hummock_event_handler.rs | 6 +++++- src/storage/src/hummock/event_handler/mod.rs | 7 ++++++- src/storage/src/hummock/mod.rs | 1 + src/storage/src/hummock/store/version.rs | 11 +++++------ 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 33c0b0bfb7d61..7d8fd69411248 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -564,10 +564,14 @@ impl HummockEventHandler { HummockEvent::RegisterReadVersion { table_id, new_read_version_sender, + is_replicated, } => { let pinned_version = self.pinned_version.load(); let basic_read_version = Arc::new(RwLock::new( - HummockReadVersion::new((**pinned_version).clone()), + HummockReadVersion::new_with_replication_option( + (**pinned_version).clone(), + is_replicated, + ), )); let instance_id = self.generate_instance_id(); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 1a192192cab98..20ffe0cea0783 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -76,6 +76,7 @@ pub enum HummockEvent { table_id: TableId, new_read_version_sender: oneshot::Sender<(Arc>, LocalInstanceGuard)>, + is_replicated: bool, }, DestroyReadVersion { @@ -116,7 +117,11 @@ impl HummockEvent { HummockEvent::RegisterReadVersion { table_id, new_read_version_sender: _, - } => format!("RegisterReadVersion table_id {:?}", table_id,), + is_replicated, + } => format!( + "RegisterReadVersion table_id {:?}, is_replicated: {:?}", + table_id, is_replicated + ), HummockEvent::DestroyReadVersion { table_id, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 61a94c2914c52..8be31daede4bd 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -234,6 +234,7 @@ impl HummockStorage { .send(HummockEvent::RegisterReadVersion { table_id: option.table_id, new_read_version_sender: tx, + is_replicated: option.is_replicated, }) .unwrap(); diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 690baf27b9024..d819bbe533b2a 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -215,7 +215,10 @@ pub struct HummockReadVersion { } impl HummockReadVersion { - fn new_inner(committed_version: CommittedVersion, is_replicated: bool) -> Self { + pub fn new_with_replication_option( + committed_version: CommittedVersion, + is_replicated: bool, + ) -> Self { // before build `HummockReadVersion`, we need to get the a initial version which obtained // from meta. want this initialization after version is initialized (now with // notification), so add a assert condition to guarantee correct initialization order @@ -234,11 +237,7 @@ impl HummockReadVersion { } pub fn new(committed_version: CommittedVersion) -> Self { - Self::new_inner(committed_version, false) - } - - pub fn new_replicated(committed_version: CommittedVersion) -> Self { - Self::new_inner(committed_version, true) + Self::new_with_replication_option(committed_version, false) } /// Updates the read version with `VersionUpdate`. From dae54d6ed70e3a5efef65f03d62f0e963d997de4 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 9 Jun 2023 16:58:38 +0800 Subject: [PATCH 11/14] more docs --- src/storage/src/hummock/store/state_store.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 110189db0bc1d..e5c4daa48eadd 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -66,8 +66,16 @@ pub struct LocalHummockStorage { /// Read handle. read_version: Arc>, - /// This could be used when `LocalHummockStorage` is used - /// solely for local storage, e.g. when replicating data for executors in different CNs. + /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`. + /// It's used by executors in different CNs to synchronize states. + /// + /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be + /// persisted, so we won't have duplicate data. + /// + /// This also handles a corner case where an executor doing replication + /// is scheduled to the same CN as its Upstream executor. + /// In that case, we use this flag to avoid reading the same data twice, + /// by ignoring the replicated ReadVersion. is_replicated: bool, /// Event sender. From 921a76ca273e4b754fd0792c534baf7fd975809d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 12 Jun 2023 13:07:37 +0800 Subject: [PATCH 12/14] fix conflicts --- src/storage/hummock_trace/src/collector.rs | 1 + src/storage/hummock_trace/src/opts.rs | 1 + src/storage/src/store.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index d33f16f40ae7e..99531d85957a9 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -473,6 +473,7 @@ mod tests { table_option: TracedTableOption { retention_seconds: None, }, + is_replicated: false, }, ), ); diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 5297251cdfb02..623ddc775e968 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -134,6 +134,7 @@ pub struct TracedNewLocalOptions { pub table_id: TracedTableId, pub is_consistent_op: bool, pub table_option: TracedTableOption, + pub is_replicated: bool, } pub type TracedHummockEpoch = u64; diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 467c8b1493f26..63e71776f42ca 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -420,6 +420,7 @@ impl From for NewLocalOptions { table_id: value.table_id.into(), is_consistent_op: value.is_consistent_op, table_option: value.table_option.into(), + is_replicated: value.is_replicated, } } } From 95f9337e2c79e6022d365654a4c94d027ef239f2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 12 Jun 2023 17:08:58 +0800 Subject: [PATCH 13/14] add snapshot test for global state store + local state store (replicated) iteration --- Cargo.lock | 2 + src/storage/hummock_test/Cargo.toml | 3 + src/storage/hummock_test/src/lib.rs | 2 +- .../hummock_test/src/state_store_tests.rs | 196 +++++++++++++++++- 4 files changed, 201 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f4eac68794a1..382025a137bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6331,8 +6331,10 @@ dependencies = [ "async-trait", "bytes", "criterion", + "expect-test", "fail", "futures", + "futures-async-stream", "itertools", "madsim-tokio", "parking_lot 0.12.1", diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index bc7ff418946a0..7e0d82d80b8a3 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -39,10 +39,13 @@ workspace-hack = { path = "../../workspace-hack" } [dev-dependencies] criterion = { version = "0.4", features = ["async_futures"] } +expect-test = "1" futures = { version = "0.3", default-features = false, features = [ "alloc", "executor", ] } + +futures-async-stream = "0.2" risingwave_test_runner = { path = "../../test_runner" } serial_test = "0.9" sync-point = { path = "../../utils/sync-point" } diff --git a/src/storage/hummock_test/src/lib.rs b/src/storage/hummock_test/src/lib.rs index 12bbe52ee8743..8826c58c21fe4 100644 --- a/src/storage/hummock_test/src/lib.rs +++ b/src/storage/hummock_test/src/lib.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +#![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(custom_test_frameworks)] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] #![feature(bound_map)] diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 6e2b2a76a6c1f..c34dadc2395a7 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -17,9 +17,11 @@ use std::ops::Bound::Unbounded; use std::sync::Arc; use bytes::Bytes; +use expect_test::expect; use futures::{pin_mut, TryStreamExt}; +use futures_async_stream::for_await; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::{ @@ -1482,3 +1484,195 @@ async fn test_gc_watermark_and_clear_shared_buffer() { HummockSstableObjectId::MAX ); } + +/// Test the following behaviours: +/// 1. LocalStateStore can read replicated ReadVersion. +/// 2. GlobalStateStore cannot read replicated ReadVersion. +#[tokio::test] +async fn test_replicated_local_hummock_storage() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + + let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; + + let read_options = ReadOptions { + prefix_hint: None, + ignore_range_tombstone: false, + retention_seconds: None, + table_id: TableId { + table_id: TEST_TABLE_ID.table_id, + }, + read_version_from_backup: false, + prefetch_options: Default::default(), + cache_policy: CachePolicy::Fill(CachePriority::High), + }; + + let mut local_hummock_storage = hummock_storage + .new_local(NewLocalOptions::new_replicated( + TEST_TABLE_ID, + false, + TableOption { + retention_seconds: None, + }, + )) + .await; + + let epoch0 = local_hummock_storage + .read_version() + .read() + .committed() + .max_committed_epoch(); + + let epoch1 = epoch0 + 1; + + local_hummock_storage.init(epoch1); + // ingest 16B batch + let mut batch1 = vec![ + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaaa"].concat()), + StorageValue::new_put("1111"), + ), + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"bbbb"].concat()), + StorageValue::new_put("2222"), + ), + ]; + + batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + local_hummock_storage + .ingest_batch( + batch1, + vec![], + WriteOptions { + epoch: epoch1, + table_id: TEST_TABLE_ID, + }, + ) + .await + .unwrap(); + + // Test local state store read for replicated data. + { + assert!(local_hummock_storage.read_version().read().is_replicated()); + + let s = risingwave_storage::store::LocalStateStore::iter( + &local_hummock_storage, + (Unbounded, Unbounded), + read_options.clone(), + ) + .await + .unwrap(); + + let mut actual = vec![]; + + #[for_await] + for v in s { + actual.push(v) + } + + let expected = expect![[r#" + [ + Ok( + ( + FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 }, + b"1111", + ), + ), + Ok( + ( + FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 }, + b"2222", + ), + ), + ] + "#]]; + expected.assert_debug_eq(&actual); + } + + let epoch2 = epoch1 + 1; + + let mut local_hummock_storage_2 = hummock_storage + .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) + .await; + + local_hummock_storage_2.init(epoch2); + + // ingest 16B batch + let mut batch2 = vec![ + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"cccc"].concat()), + StorageValue::new_put("3333"), + ), + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"dddd"].concat()), + StorageValue::new_put("4444"), + ), + ]; + batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + + local_hummock_storage_2 + .ingest_batch( + batch2, + vec![], + WriteOptions { + epoch: epoch2, + table_id: TEST_TABLE_ID, + }, + ) + .await + .unwrap(); + + // Test Global State Store iter, epoch2 + { + let iter = hummock_storage + .iter((Unbounded, Unbounded), epoch2, read_options.clone()) + .await + .unwrap(); + pin_mut!(iter); + + let mut actual = vec![]; + + #[for_await] + for v in iter { + actual.push(v); + } + + let expected = expect![[r#" + [ + Ok( + ( + FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 }, + b"3333", + ), + ), + Ok( + ( + FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 }, + b"4444", + ), + ), + ] + "#]]; + expected.assert_debug_eq(&actual); + } + + // Test Global State Store iter, epoch1 + { + let iter = hummock_storage + .iter((Unbounded, Unbounded), epoch1, read_options) + .await + .unwrap(); + pin_mut!(iter); + + let mut actual = vec![]; + + #[for_await] + for v in iter { + actual.push(v); + } + + let expected = expect![[r#" + [] + "#]]; + expected.assert_debug_eq(&actual); + } +} From 996ed42059a0a6eac62d00d1bf1c36e2f1378fe9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 12 Jun 2023 17:14:45 +0800 Subject: [PATCH 14/14] docs --- src/storage/src/hummock/store/version.rs | 2 +- src/storage/src/store.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index d819bbe533b2a..4b0255017c4d9 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -207,7 +207,7 @@ pub struct HummockReadVersion { /// Remote version for committed data. committed: CommittedVersion, - /// Check if this is replicated. If it is, we should ignore it during + /// Indicate if this is replicated. If it is, we should ignore it during /// global state store read, to avoid duplicated results. /// Otherwise for local state store, it is fine, see we will see the /// ReadVersion just for that local state store. diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 63e71776f42ca..b6d20d5275690 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -410,7 +410,8 @@ pub struct NewLocalOptions { pub is_consistent_op: bool, pub table_option: TableOption, - /// Used to indicate that we should not upload the data. + /// Indicate if this is replicated. If it is, we should not + /// upload its ReadVersions. pub is_replicated: bool, }