diff --git a/rust/compute/src/server.rs b/rust/compute/src/server.rs index e30b77b53e115..18384cc3dfd5c 100644 --- a/rust/compute/src/server.rs +++ b/rust/compute/src/server.rs @@ -114,10 +114,10 @@ pub async fn compute_node_serve( // A hummock compactor is deployed along with compute node for now. if let StateStoreImpl::HummockStateStore(hummock) = state_store.clone() { sub_tasks.push(Compactor::start_compactor( - hummock.inner().storage.options().clone(), - hummock.inner().storage.local_version_manager().clone(), - hummock.inner().storage.hummock_meta_client().clone(), - hummock.inner().storage.sstable_store(), + hummock.inner().options().clone(), + hummock.inner().local_version_manager().clone(), + hummock.inner().hummock_meta_client().clone(), + hummock.inner().sstable_store(), state_store_metrics, )); } diff --git a/rust/ctl/src/common/hummock_service.rs b/rust/ctl/src/common/hummock_service.rs index 053b7890013e8..d1f452bb56c39 100644 --- a/rust/ctl/src/common/hummock_service.rs +++ b/rust/ctl/src/common/hummock_service.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use risingwave_common::config::StorageConfig; -use risingwave_storage::hummock::HummockStateStore; +use risingwave_storage::hummock::HummockStorage; use risingwave_storage::monitor::{HummockMetrics, MonitoredStateStore, StateStoreMetrics}; use risingwave_storage::StateStoreImpl; @@ -50,7 +50,7 @@ impl HummockServiceOpts { }) } - pub async fn create_hummock_store(&self) -> Result> { + pub async fn create_hummock_store(&self) -> Result> { let meta_client = self.meta_opts.create_meta_client().await?; // FIXME: allow specify custom config diff --git a/rust/storage/src/hummock/mod.rs b/rust/storage/src/hummock/mod.rs index 4d0c9bf50a1d4..f91a01eb811c3 100644 --- a/rust/storage/src/hummock/mod.rs +++ b/rust/storage/src/hummock/mod.rs @@ -13,11 +13,12 @@ // limitations under the License. //! Hummock is the state store of the streaming system. - use std::fmt; +use std::future::Future; use std::ops::RangeBounds; use std::sync::Arc; +use async_trait::async_trait; use bytes::Bytes; use itertools::Itertools; @@ -40,7 +41,6 @@ mod shared_buffer; #[cfg(test)] mod snapshot_tests; mod sstable_store; -mod state_store; #[cfg(test)] mod state_store_tests; #[cfg(test)] @@ -56,12 +56,11 @@ use risingwave_pb::hummock::LevelType; use value::*; use self::iterator::{ - BoxedHummockIterator, ConcatIterator, HummockIterator, MergeIterator, ReverseMergeIterator, - UserIterator, + BoxedHummockIterator, ConcatIterator, DirectedUserIterator, HummockIterator, MergeIterator, + ReverseMergeIterator, UserIterator, }; use self::key::{key_with_epoch, user_key, FullKey}; pub use self::sstable_store::*; -pub use self::state_store::*; use self::utils::{bloom_filter_sstables, range_overlap}; use super::monitor::StateStoreMetrics; use crate::hummock::hummock_meta_client::HummockMetaClient; @@ -69,6 +68,9 @@ use crate::hummock::iterator::ReverseUserIterator; use crate::hummock::local_version_manager::LocalVersionManager; use crate::hummock::shared_buffer::shared_buffer_manager::SharedBufferManager; use crate::hummock::utils::validate_epoch; +use crate::storage_value::StorageValue; +use crate::store::{collect_from_iter, *}; +use crate::{define_state_store_associated_type, StateStore, StateStoreIter}; pub type HummockTTL = u64; pub type HummockSSTableId = u64; @@ -153,204 +155,165 @@ impl HummockStorage { Ok(instance) } + #[cfg(not(feature = "blockv2"))] + fn get_builder(options: &StorageConfig) -> SSTableBuilder { + SSTableBuilder::new(SSTableBuilderOptions { + table_capacity: options.sstable_size, + block_size: options.block_size, + bloom_false_positive: options.bloom_false_positive, + checksum_algo: options.checksum_algo, + }) + } + + #[cfg(feature = "blockv2")] + fn get_builder(options: &StorageConfig) -> SSTableBuilder { + SSTableBuilder::new(SSTableBuilderOptions { + capacity: options.sstable_size as usize, + block_capacity: options.block_size as usize, + restart_interval: DEFAULT_RESTART_INTERVAL, + bloom_false_positive: options.bloom_false_positive, + // TODO: Make this configurable. + compression_algorithm: CompressionAlgorithm::None, + }) + } + + pub fn hummock_meta_client(&self) -> &Arc { + &self.hummock_meta_client + } + + pub fn options(&self) -> &Arc { + &self.options + } + + pub fn sstable_store(&self) -> SstableStoreRef { + self.sstable_store.clone() + } + + pub fn local_version_manager(&self) -> &Arc { + &self.local_version_manager + } + + pub fn shared_buffer_manager(&self) -> &SharedBufferManager { + &self.shared_buffer_manager + } +} + +impl fmt::Debug for HummockStorage { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { + todo!() + } +} + +impl StateStore for HummockStorage { + type Iter<'a> = HummockStateStoreIter<'a>; + define_state_store_associated_type!(); + /// Gets the value of a specified `key`. /// The result is based on a snapshot corresponding to the given `epoch`. /// /// If `Ok(Some())` is returned, the key is found. If `Ok(None)` is returned, /// the key is not found. If `Err()` is returned, the searching for the key /// failed due to other non-EOF errors. - pub async fn get(&self, key: &[u8], epoch: u64) -> HummockResult>> { - let mut table_iters: Vec = Vec::new(); - - let version = self.local_version_manager.get_version()?; - // check epoch validity - validate_epoch(version.safe_epoch(), epoch)?; - - // Query shared buffer. Return the value without iterating SSTs if found - if let Some(v) = self - .shared_buffer_manager - .get(key, (version.max_committed_epoch() + 1)..=epoch) - { - self.stats.get_shared_buffer_hit_counts.inc(); - return Ok(v.into_put_value().map(|x| x.to_vec())); - } + fn get<'a>(&'a self, key: &'a [u8], epoch: u64) -> Self::GetFuture<'_> { + async move { + let mut table_iters: Vec = Vec::new(); - let mut table_counts = 0; - for level in &version.levels() { - match level.level_type() { - LevelType::Overlapping => { - let tables = bloom_filter_sstables( - self.local_version_manager - .pick_few_tables(&level.table_ids) - .await?, - key, - self.stats.clone(), - )?; - table_counts += tables.len(); - table_iters.extend(tables.into_iter().map(|table| { - Box::new(SSTableIterator::new(table, self.sstable_store.clone())) - as BoxedHummockIterator - })) - } - LevelType::Nonoverlapping => { - let tables = bloom_filter_sstables( - self.local_version_manager - .pick_few_tables(&level.table_ids) - .await?, - key, - self.stats.clone(), - )?; - table_counts += tables.len(); - table_iters.push(Box::new(ConcatIterator::new( - tables, - self.sstable_store.clone(), - ))) + let version = self.local_version_manager.get_version()?; + // check epoch validity + validate_epoch(version.safe_epoch(), epoch)?; + + // Query shared buffer. Return the value without iterating SSTs if found + if let Some(v) = self + .shared_buffer_manager + .get(key, (version.max_committed_epoch() + 1)..=epoch) + { + self.stats.get_shared_buffer_hit_counts.inc(); + return Ok(v.into_put_value().map(StorageValue::from)); + } + + let mut table_counts = 0; + for level in &version.levels() { + match level.level_type() { + LevelType::Overlapping => { + let tables = bloom_filter_sstables( + self.local_version_manager + .pick_few_tables(&level.table_ids) + .await?, + key, + self.stats.clone(), + )?; + table_counts += tables.len(); + table_iters.extend(tables.into_iter().map(|table| { + Box::new(SSTableIterator::new(table, self.sstable_store.clone())) + as BoxedHummockIterator + })) + } + LevelType::Nonoverlapping => { + let tables = bloom_filter_sstables( + self.local_version_manager + .pick_few_tables(&level.table_ids) + .await?, + key, + self.stats.clone(), + )?; + table_counts += tables.len(); + table_iters.push(Box::new(ConcatIterator::new( + tables, + self.sstable_store.clone(), + ))) + } } } - } - self.stats - .iter_merge_sstable_counts - .observe(table_counts as f64); - let mut it = MergeIterator::new(table_iters, self.stats.clone()); + self.stats + .iter_merge_sstable_counts + .observe(table_counts as f64); + let mut it = MergeIterator::new(table_iters, self.stats.clone()); - // Use `MergeIterator` to seek for the key with the latest version to - // get the latest key. - it.seek(&key_with_epoch(key.to_vec(), epoch)).await?; + // Use `MergeIterator` to seek for the key with the latest version to + // get the latest key. + it.seek(&key_with_epoch(key.to_vec(), epoch)).await?; - // Iterator has sought passed the borders. - if !it.is_valid() { - return Ok(None); - } + // Iterator has sought passed the borders. + if !it.is_valid() { + return Ok(None); + } - // Iterator gets us the key, we tell if it's the key we want - // or key next to it. - let value = match user_key(it.key()) == key { - true => it.value().into_put_value().map(|x| x.to_vec()), - false => None, - }; + // Iterator gets us the key, we tell if it's the key we want + // or key next to it. + let value = match user_key(it.key()) == key { + true => it.value().into_put_value().map(|x| x.to_vec()), + false => None, + }; - Ok(value) + Ok(value.map(StorageValue::from)) + } } - /// Returns an iterator that scan from the begin key to the end key - /// The result is based on a snapshot corresponding to the given `epoch`. - pub async fn range_scan( + fn scan( &self, key_range: R, + limit: Option, epoch: u64, - ) -> HummockResult> + ) -> Self::ScanFuture<'_, R, B> where - R: RangeBounds, - B: AsRef<[u8]>, + R: RangeBounds + Send, + B: AsRef<[u8]> + Send, { - let version = self.local_version_manager.get_version()?; - // Check epoch validity - validate_epoch(version.safe_epoch(), epoch)?; - - // Filter out tables that overlap with given `key_range` - let tables = self.local_version_manager.tables(&version.levels()).await?; - let tables_count = tables.len(); - let overlapped_sstable_iters = tables - .into_iter() - .filter(|t| { - let table_start = user_key(t.meta.smallest_key.as_slice()); - let table_end = user_key(t.meta.largest_key.as_slice()); - range_overlap(&key_range, table_start, table_end, false) - }) - .map(|t| { - Box::new(SSTableIterator::new(t, self.sstable_store.clone())) - as BoxedHummockIterator - }); - - let mi = if version.max_committed_epoch() < epoch { - // Take shared buffers into consideration if the read epoch is above the max committed - // epoch - let overlapped_shared_buffer_iters = self - .shared_buffer_manager - .iters(&key_range, (version.max_committed_epoch() + 1)..=epoch) - .into_iter() - .map(|i| Box::new(i) as BoxedHummockIterator); - MergeIterator::new( - overlapped_shared_buffer_iters.chain(overlapped_sstable_iters), - self.stats.clone(), - ) - } else { - self.stats - .iter_merge_sstable_counts - .observe(tables_count as f64); - MergeIterator::new(overlapped_sstable_iters, self.stats.clone()) - }; - - // TODO: avoid this clone - Ok(UserIterator::new( - mi, - ( - key_range.start_bound().map(|b| b.as_ref().to_owned()), - key_range.end_bound().map(|b| b.as_ref().to_owned()), - ), - epoch, - Some(version), - )) + async move { collect_from_iter(self.iter(key_range, epoch).await?, limit).await } } - /// Returns a reversed iterator that scans from the end key to the begin key - /// The result is based on a snapshot corresponding to the given `epoch`. - pub async fn reverse_range_scan( + fn reverse_scan( &self, key_range: R, + limit: Option, epoch: u64, - ) -> HummockResult> + ) -> Self::ReverseScanFuture<'_, R, B> where - R: RangeBounds, - B: AsRef<[u8]>, + R: RangeBounds + Send, + B: AsRef<[u8]> + Send, { - let version = self.local_version_manager.get_version()?; - // Check epoch validity - validate_epoch(version.safe_epoch(), epoch)?; - - // Filter out tables that overlap with given `key_range` - let overlapped_sstable_iters = self - .local_version_manager - .tables(&version.levels()) - .await? - .into_iter() - .filter(|t| { - let table_start = user_key(t.meta.smallest_key.as_slice()); - let table_end = user_key(t.meta.largest_key.as_slice()); - range_overlap(&key_range, table_start, table_end, true) - }) - .map(|t| { - Box::new(ReverseSSTableIterator::new(t, self.sstable_store.clone())) - as BoxedHummockIterator - }); - - let reverse_merge_iterator = if version.max_committed_epoch() < epoch { - // Take shared buffers into consideration if the read epoch is above the max committed - // epoch - let overlapped_shared_buffer_iters = self - .shared_buffer_manager - .reverse_iters(&key_range, (version.max_committed_epoch() + 1)..=epoch) - .into_iter() - .map(|i| Box::new(i) as BoxedHummockIterator); - ReverseMergeIterator::new( - overlapped_shared_buffer_iters.chain(overlapped_sstable_iters), - self.stats.clone(), - ) - } else { - ReverseMergeIterator::new(overlapped_sstable_iters, self.stats.clone()) - }; - - // TODO: avoid this clone - Ok(ReverseUserIterator::new_with_epoch( - reverse_merge_iterator, - ( - key_range.end_bound().map(|b| b.as_ref().to_owned()), - key_range.start_bound().map(|b| b.as_ref().to_owned()), - ), - epoch, - Some(version), - )) + async move { collect_from_iter(self.reverse_iter(key_range, epoch).await?, limit).await } } /// Writes a batch to storage. The batch should be: @@ -362,101 +325,217 @@ impl HummockStorage { /// changes to be committed before reading and writing new keys to the engine. That is because /// that the table with lower epoch might be committed after a table with higher epoch has /// been committed. If such case happens, the outcome is non-predictable. - pub async fn write_batch( + fn ingest_batch( &self, - kv_pairs: impl Iterator)>, + kv_pairs: Vec<(Bytes, Option)>, epoch: u64, - ) -> HummockResult<()> { - let batch = kv_pairs - .map(|(key, value)| { - ( - Bytes::from(FullKey::from_user_key(key.to_vec(), epoch).into_inner()), - value, - ) - }) - .collect_vec(); - self.shared_buffer_manager.write_batch(batch, epoch)?; - - if !self.options.async_checkpoint_enabled { - return self.shared_buffer_manager.sync(Some(epoch)).await; + ) -> Self::IngestBatchFuture<'_> { + async move { + let batch = kv_pairs + .into_iter() + .map(|(key, value)| { + ( + Bytes::from(FullKey::from_user_key(key.to_vec(), epoch).into_inner()), + value.into(), + ) + }) + .collect_vec(); + self.shared_buffer_manager.write_batch(batch, epoch)?; + + if !self.options.async_checkpoint_enabled { + self.shared_buffer_manager.sync(Some(epoch)).await?; + } + Ok(()) } - Ok(()) } /// Replicates a batch to shared buffer, without uploading to the storage backend. - pub async fn replicate_batch( + fn replicate_batch( &self, - kv_pairs: impl Iterator)>, + kv_pairs: Vec<(Bytes, Option)>, epoch: u64, - ) -> HummockResult<()> { - let batch = kv_pairs - .map(|i| { - ( - Bytes::from(FullKey::from_user_key(i.0.to_vec(), epoch).into_inner()), - i.1, - ) - }) - .collect_vec(); - self.shared_buffer_manager - .replicate_remote_batch(batch, epoch)?; - - // self.sync(epoch).await?; - Ok(()) - } - - pub async fn sync(&self, epoch: Option) -> HummockResult<()> { - self.shared_buffer_manager.sync(epoch).await - } - - #[cfg(not(feature = "blockv2"))] - fn get_builder(options: &StorageConfig) -> SSTableBuilder { - SSTableBuilder::new(SSTableBuilderOptions { - table_capacity: options.sstable_size, - block_size: options.block_size, - bloom_false_positive: options.bloom_false_positive, - checksum_algo: options.checksum_algo, - }) - } - - #[cfg(feature = "blockv2")] - fn get_builder(options: &StorageConfig) -> SSTableBuilder { - SSTableBuilder::new(SSTableBuilderOptions { - capacity: options.sstable_size as usize, - block_capacity: options.block_size as usize, - restart_interval: DEFAULT_RESTART_INTERVAL, - bloom_false_positive: options.bloom_false_positive, - // TODO: Make this configurable. - compression_algorithm: CompressionAlgorithm::None, - }) + ) -> Self::ReplicateBatchFuture<'_> { + async move { + let batch = kv_pairs + .into_iter() + .map(|(key, value)| { + ( + Bytes::from(FullKey::from_user_key(key.to_vec(), epoch).into_inner()), + value.into(), + ) + }) + .collect_vec(); + self.shared_buffer_manager + .replicate_remote_batch(batch, epoch)?; + + Ok(()) + } } - pub fn hummock_meta_client(&self) -> &Arc { - &self.hummock_meta_client + /// Returns an iterator that scan from the begin key to the end key + /// The result is based on a snapshot corresponding to the given `epoch`. + fn iter(&self, key_range: R, epoch: u64) -> Self::IterFuture<'_, R, B> + where + R: RangeBounds + Send, + B: AsRef<[u8]> + Send, + { + async move { + let version = self.local_version_manager.get_version()?; + // Check epoch validity + validate_epoch(version.safe_epoch(), epoch)?; + + // Filter out tables that overlap with given `key_range` + let tables = self.local_version_manager.tables(&version.levels()).await?; + let tables_count = tables.len(); + let overlapped_sstable_iters = tables + .into_iter() + .filter(|t| { + let table_start = user_key(t.meta.smallest_key.as_slice()); + let table_end = user_key(t.meta.largest_key.as_slice()); + range_overlap(&key_range, table_start, table_end, false) + }) + .map(|t| { + Box::new(SSTableIterator::new(t, self.sstable_store.clone())) + as BoxedHummockIterator + }) + .collect_vec(); + + let mi = if version.max_committed_epoch() < epoch { + // Take shared buffers into consideration if the read epoch is above the max + // committed epoch + let overlapped_shared_buffer_iters = self + .shared_buffer_manager + .iters(&key_range, (version.max_committed_epoch() + 1)..=epoch) + .into_iter() + .map(|i| Box::new(i) as BoxedHummockIterator); + MergeIterator::new( + overlapped_shared_buffer_iters.chain(overlapped_sstable_iters), + self.stats.clone(), + ) + } else { + self.stats + .iter_merge_sstable_counts + .observe(tables_count as f64); + MergeIterator::new(overlapped_sstable_iters, self.stats.clone()) + }; + + // TODO: avoid this clone + let mut user_iter = DirectedUserIterator::Forward(UserIterator::new( + mi, + ( + key_range.start_bound().map(|b| b.as_ref().to_owned()), + key_range.end_bound().map(|b| b.as_ref().to_owned()), + ), + epoch, + Some(version), + )); + + user_iter.rewind().await?; + Ok(HummockStateStoreIter::new(user_iter)) + } } - pub fn options(&self) -> &Arc { - &self.options + /// Returns a reversed iterator that scans from the end key to the begin key + /// The result is based on a snapshot corresponding to the given `epoch`. + fn reverse_iter(&self, key_range: R, epoch: u64) -> Self::ReverseIterFuture<'_, R, B> + where + R: RangeBounds + Send, + B: AsRef<[u8]> + Send, + { + async move { + let version = self.local_version_manager.get_version()?; + // Check epoch validity + validate_epoch(version.safe_epoch(), epoch)?; + + // Filter out tables that overlap with given `key_range` + let overlapped_sstable_iters = self + .local_version_manager + .tables(&version.levels()) + .await? + .into_iter() + .filter(|t| { + let table_start = user_key(t.meta.smallest_key.as_slice()); + let table_end = user_key(t.meta.largest_key.as_slice()); + range_overlap(&key_range, table_start, table_end, true) + }) + .map(|t| { + Box::new(ReverseSSTableIterator::new(t, self.sstable_store.clone())) + as BoxedHummockIterator + }) + .collect_vec(); + + let reverse_merge_iterator = if version.max_committed_epoch() < epoch { + // Take shared buffers into consideration if the read epoch is above the max + // committed epoch + let overlapped_shared_buffer_iters = self + .shared_buffer_manager + .reverse_iters(&key_range, (version.max_committed_epoch() + 1)..=epoch) + .into_iter() + .map(|i| Box::new(i) as BoxedHummockIterator); + ReverseMergeIterator::new( + overlapped_shared_buffer_iters.chain(overlapped_sstable_iters), + self.stats.clone(), + ) + } else { + ReverseMergeIterator::new(overlapped_sstable_iters, self.stats.clone()) + }; + + // TODO: avoid this clone + let mut reverse_user_iter = + DirectedUserIterator::Backward(ReverseUserIterator::new_with_epoch( + reverse_merge_iterator, + ( + key_range.end_bound().map(|b| b.as_ref().to_owned()), + key_range.start_bound().map(|b| b.as_ref().to_owned()), + ), + epoch, + Some(version), + )); + + reverse_user_iter.rewind().await?; + Ok(HummockStateStoreIter::new(reverse_user_iter)) + } } - pub fn sstable_store(&self) -> SstableStoreRef { - self.sstable_store.clone() + fn wait_epoch(&self, epoch: u64) -> Self::WaitEpochFuture<'_> { + async move { Ok(self.local_version_manager.wait_epoch(epoch).await?) } } - pub fn local_version_manager(&self) -> &Arc { - &self.local_version_manager + fn sync(&self, epoch: Option) -> Self::SyncFuture<'_> { + async move { + self.shared_buffer_manager.sync(epoch).await?; + Ok(()) + } } +} - pub async fn wait_epoch(&self, epoch: HummockEpoch) -> Result<()> { - Ok(self.local_version_manager.wait_epoch(epoch).await?) - } +pub struct HummockStateStoreIter<'a> { + inner: DirectedUserIterator<'a>, +} - pub fn shared_buffer_manager(&self) -> &SharedBufferManager { - &self.shared_buffer_manager +impl<'a> HummockStateStoreIter<'a> { + fn new(inner: DirectedUserIterator<'a>) -> Self { + Self { inner } } } -impl fmt::Debug for HummockStorage { - fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { - todo!() +#[async_trait] +impl<'a> StateStoreIter for HummockStateStoreIter<'a> { + // TODO: directly return `&[u8]` to user instead of `Bytes`. + type Item = (Bytes, StorageValue); + + async fn next(&mut self) -> Result> { + let iter = &mut self.inner; + + if iter.is_valid() { + let kv = ( + Bytes::copy_from_slice(iter.key()), + StorageValue::from(Bytes::copy_from_slice(iter.value())), + ); + iter.next().await?; + Ok(Some(kv)) + } else { + Ok(None) + } } } diff --git a/rust/storage/src/hummock/snapshot_tests.rs b/rust/storage/src/hummock/snapshot_tests.rs index 37f5d94abd592..6ee47b05c3818 100644 --- a/rust/storage/src/hummock/snapshot_tests.rs +++ b/rust/storage/src/hummock/snapshot_tests.rs @@ -116,15 +116,13 @@ async fn gen_and_upload_table_with_sstable_store( macro_rules! assert_count_range_scan { ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ - let mut it = $storage - .range_scan::<_, Vec>($range, $epoch) - .await - .unwrap(); - it.rewind().await.unwrap(); + let mut it = $storage.iter::<_, Vec>($range, $epoch).await.unwrap(); let mut count = 0; - while it.is_valid() { - count += 1; - it.next().await.unwrap(); + loop { + match it.next().await.unwrap() { + Some(_) => count += 1, + None => break, + } } assert_eq!(count, $expect_count); }}; @@ -133,14 +131,15 @@ macro_rules! assert_count_range_scan { macro_rules! assert_count_reverse_range_scan { ($storage:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ let mut it = $storage - .reverse_range_scan::<_, Vec>($range, $epoch) + .reverse_iter::<_, Vec>($range, $epoch) .await .unwrap(); - it.rewind().await.unwrap(); let mut count = 0; - while it.is_valid() { - count += 1; - it.next().await.unwrap(); + loop { + match it.next().await.unwrap() { + Some(_) => count += 1, + None => break, + } } assert_eq!(count, $expect_count); }}; diff --git a/rust/storage/src/hummock/state_store.rs b/rust/storage/src/hummock/state_store.rs deleted file mode 100644 index b04fe34822041..0000000000000 --- a/rust/storage/src/hummock/state_store.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use std::future::Future; -use std::ops::RangeBounds; - -use async_trait::async_trait; -use bytes::Bytes; -use risingwave_common::error::Result; - -use super::HummockStorage; -use crate::hummock::iterator::DirectedUserIterator; -use crate::storage_value::StorageValue; -use crate::store::{collect_from_iter, *}; -use crate::{define_state_store_associated_type, StateStore, StateStoreIter}; - -/// A wrapper over [`HummockStorage`] as a state store. -#[derive(Clone)] -pub struct HummockStateStore { - pub storage: HummockStorage, -} - -impl HummockStateStore { - pub fn new(storage: HummockStorage) -> Self { - Self { storage } - } - - pub fn storage(&self) -> &HummockStorage { - &self.storage - } -} - -impl StateStore for HummockStateStore { - type Iter<'a> = HummockStateStoreIter<'a>; - define_state_store_associated_type!(); - - fn get<'a>(&'a self, key: &'a [u8], epoch: u64) -> Self::GetFuture<'_> { - async move { - let value = self.storage.get(key, epoch).await?; - let value = value.map(Bytes::from); - let storage_value = value.map(StorageValue::from); - Ok(storage_value) - } - } - - fn scan( - &self, - key_range: R, - limit: Option, - epoch: u64, - ) -> Self::ScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { collect_from_iter(self.iter(key_range, epoch).await?, limit).await } - } - - fn reverse_scan( - &self, - key_range: R, - limit: Option, - epoch: u64, - ) -> Self::ReverseScanFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { collect_from_iter(self.reverse_iter(key_range, epoch).await?, limit).await } - } - - fn ingest_batch( - &self, - kv_pairs: Vec<(Bytes, Option)>, - epoch: u64, - ) -> Self::IngestBatchFuture<'_> { - async move { - self.storage - .write_batch(kv_pairs.into_iter().map(|(k, v)| (k, v.into())), epoch) - .await?; - Ok(()) - } - } - - fn replicate_batch( - &self, - kv_pairs: Vec<(Bytes, Option)>, - epoch: u64, - ) -> Self::ReplicateBatchFuture<'_> { - async move { - self.storage - .replicate_batch(kv_pairs.into_iter().map(|(k, v)| (k, v.into())), epoch) - .await?; - Ok(()) - } - } - - fn iter(&self, key_range: R, epoch: u64) -> Self::IterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { - let inner = self.storage.range_scan(key_range, epoch).await?; - let mut res = DirectedUserIterator::Forward(inner); - res.rewind().await?; - Ok(HummockStateStoreIter::new(res)) - } - } - - fn reverse_iter(&self, key_range: R, epoch: u64) -> Self::ReverseIterFuture<'_, R, B> - where - R: RangeBounds + Send, - B: AsRef<[u8]> + Send, - { - async move { - let mut res = self.storage.reverse_range_scan(key_range, epoch).await?; - res.rewind().await?; - Ok(HummockStateStoreIter::new(DirectedUserIterator::Backward( - res, - ))) - } - } - - fn wait_epoch(&self, epoch: u64) -> Self::WaitEpochFuture<'_> { - async move { self.storage.wait_epoch(epoch).await } - } - - fn sync(&self, epoch: Option) -> Self::SyncFuture<'_> { - async move { - self.storage.sync(epoch).await?; - Ok(()) - } - } -} - -pub struct HummockStateStoreIter<'a> { - inner: DirectedUserIterator<'a>, -} - -impl<'a> HummockStateStoreIter<'a> { - fn new(inner: DirectedUserIterator<'a>) -> Self { - Self { inner } - } -} - -#[async_trait] -impl<'a> StateStoreIter for HummockStateStoreIter<'a> { - // TODO: directly return `&[u8]` to user instead of `Bytes`. - type Item = (Bytes, StorageValue); - - async fn next(&mut self) -> Result> { - let iter = &mut self.inner; - - if iter.is_valid() { - let kv = ( - Bytes::copy_from_slice(iter.key()), - StorageValue::from(Bytes::copy_from_slice(iter.value())), - ); - iter.next().await?; - Ok(Some(kv)) - } else { - Ok(None) - } - } -} diff --git a/rust/storage/src/hummock/state_store_tests.rs b/rust/storage/src/hummock/state_store_tests.rs index 896bb3fad67fc..3345ed59b615f 100644 --- a/rust/storage/src/hummock/state_store_tests.rs +++ b/rust/storage/src/hummock/state_store_tests.rs @@ -16,14 +16,15 @@ use std::sync::Arc; use bytes::Bytes; -use super::iterator::UserIterator; -use super::HummockStorage; +use super::{HummockStateStoreIter, HummockStorage, StateStore}; use crate::hummock::iterator::test_utils::mock_sstable_store_with_object_store; use crate::hummock::local_version_manager::LocalVersionManager; use crate::hummock::mock::{MockHummockMetaClient, MockHummockMetaService}; use crate::hummock::test_utils::default_config_for_test; use crate::monitor::StateStoreMetrics; use crate::object::InMemObjectStore; +use crate::storage_value::StorageValue; +use crate::StateStoreIter; #[tokio::test] async fn test_basic() { @@ -47,8 +48,11 @@ async fn test_basic() { // First batch inserts the anchor and others. let mut batch1 = vec![ - (anchor.clone(), Some(Bytes::from("111"))), - (Bytes::from("bb"), Some(Bytes::from("222"))), + (anchor.clone(), Some(StorageValue::from(Bytes::from("111")))), + ( + Bytes::from("bb"), + Some(StorageValue::from(Bytes::from("222"))), + ), ]; // Make sure the batch is sorted. @@ -56,8 +60,14 @@ async fn test_basic() { // Second batch modifies the anchor. let mut batch2 = vec![ - (Bytes::from("cc"), Some(Bytes::from("333"))), - (anchor.clone(), Some(Bytes::from("111111"))), + ( + Bytes::from("cc"), + Some(StorageValue::from(Bytes::from("333"))), + ), + ( + anchor.clone(), + Some(StorageValue::from(Bytes::from("111111"))), + ), ]; // Make sure the batch is sorted. @@ -65,8 +75,14 @@ async fn test_basic() { // Third batch deletes the anchor let mut batch3 = vec![ - (Bytes::from("dd"), Some(Bytes::from("444"))), - (Bytes::from("ee"), Some(Bytes::from("555"))), + ( + Bytes::from("dd"), + Some(StorageValue::from(Bytes::from("444"))), + ), + ( + Bytes::from("ee"), + Some(StorageValue::from(Bytes::from("555"))), + ), (anchor.clone(), None), ]; @@ -77,14 +93,17 @@ async fn test_basic() { let epoch1: u64 = 1; // Write the first batch. - hummock_storage - .write_batch(batch1.into_iter().map(|(k, v)| (k, v.into())), epoch1) - .await - .unwrap(); + hummock_storage.ingest_batch(batch1, epoch1).await.unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111")); + assert_eq!(value.to_bytes(), Bytes::from("111")); + let value = hummock_storage + .get(&Bytes::from("bb"), epoch1) + .await + .unwrap() + .unwrap(); + assert_eq!(value.to_bytes(), Bytes::from("222")); // Test looking for a nonexistent key. `next()` would return the next key. let value = hummock_storage @@ -95,21 +114,15 @@ async fn test_basic() { // Write the second batch. let epoch2 = epoch1 + 1; - hummock_storage - .write_batch(batch2.into_iter().map(|(k, v)| (k, v.into())), epoch2) - .await - .unwrap(); + hummock_storage.ingest_batch(batch2, epoch2).await.unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111111")); + assert_eq!(value.to_bytes(), Bytes::from("111111")); // Write the third batch. let epoch3 = epoch2 + 1; - hummock_storage - .write_batch(batch3.into_iter().map(|(k, v)| (k, v.into())), epoch3) - .await - .unwrap(); + hummock_storage.ingest_batch(batch3, epoch3).await.unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch3).await.unwrap(); @@ -124,44 +137,40 @@ async fn test_basic() { // Write aa bb let mut iter = hummock_storage - .range_scan(..=b"ee".to_vec(), epoch1) + .iter(..=b"ee".to_vec(), epoch1) .await .unwrap(); - iter.rewind().await.unwrap(); let len = count_iter(&mut iter).await; assert_eq!(len, 2); // Get the anchor value at the first snapshot let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111")); + assert_eq!(value.to_bytes(), Bytes::from("111")); // Get the anchor value at the second snapshot let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111111")); + assert_eq!(value.to_bytes(), Bytes::from("111111")); // Update aa, write cc let mut iter = hummock_storage - .range_scan(..=b"ee".to_vec(), epoch2) + .iter(..=b"ee".to_vec(), epoch2) .await .unwrap(); - iter.rewind().await.unwrap(); let len = count_iter(&mut iter).await; assert_eq!(len, 3); // Delete aa, write dd,ee let mut iter = hummock_storage - .range_scan(..=b"ee".to_vec(), epoch3) + .iter(..=b"ee".to_vec(), epoch3) .await .unwrap(); - iter.rewind().await.unwrap(); let len = count_iter(&mut iter).await; assert_eq!(len, 4); } -async fn count_iter(iter: &mut UserIterator<'_>) -> usize { +async fn count_iter(iter: &mut HummockStateStoreIter<'_>) -> usize { let mut c: usize = 0; - while iter.is_valid() { - c += 1; - iter.next().await.unwrap(); + while iter.next().await.unwrap().is_some() { + c += 1 } c } @@ -191,8 +200,11 @@ async fn test_reload_storage() { // First batch inserts the anchor and others. let mut batch1 = vec![ - (anchor.clone(), Some(Bytes::from("111"))), - (Bytes::from("bb"), Some(Bytes::from("222"))), + (anchor.clone(), Some(StorageValue::from(Bytes::from("111")))), + ( + Bytes::from("bb"), + Some(StorageValue::from(Bytes::from("222"))), + ), ]; // Make sure the batch is sorted. @@ -200,8 +212,14 @@ async fn test_reload_storage() { // Second batch modifies the anchor. let mut batch2 = vec![ - (Bytes::from("cc"), Some(Bytes::from("333"))), - (anchor.clone(), Some(Bytes::from("111111"))), + ( + Bytes::from("cc"), + Some(StorageValue::from(Bytes::from("333"))), + ), + ( + anchor.clone(), + Some(StorageValue::from(Bytes::from("111111"))), + ), ]; // Make sure the batch is sorted. @@ -211,10 +229,7 @@ async fn test_reload_storage() { let epoch1: u64 = 1; // Write the first batch. - hummock_storage - .write_batch(batch1.into_iter().map(|(k, v)| (k, v.into())), epoch1) - .await - .unwrap(); + hummock_storage.ingest_batch(batch1, epoch1).await.unwrap(); // Mock something happened to storage internal, and storage is reloaded. drop(hummock_storage); @@ -230,7 +245,7 @@ async fn test_reload_storage() { // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111")); + assert_eq!(value.to_bytes(), Bytes::from("111")); // Test looking for a nonexistent key. `next()` would return the next key. let value = hummock_storage @@ -241,37 +256,32 @@ async fn test_reload_storage() { // Write the second batch. let epoch2 = epoch1 + 1; - hummock_storage - .write_batch(batch2.into_iter().map(|(k, v)| (k, v.into())), epoch2) - .await - .unwrap(); + hummock_storage.ingest_batch(batch2, epoch2).await.unwrap(); // Get the value after flushing to remote. let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111111")); + assert_eq!(value.to_bytes(), Bytes::from("111111")); // Write aa bb let mut iter = hummock_storage - .range_scan(..=b"ee".to_vec(), epoch1) + .iter(..=b"ee".to_vec(), epoch1) .await .unwrap(); - iter.rewind().await.unwrap(); let len = count_iter(&mut iter).await; assert_eq!(len, 2); // Get the anchor value at the first snapshot let value = hummock_storage.get(&anchor, epoch1).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111")); + assert_eq!(value.to_bytes(), Bytes::from("111")); // Get the anchor value at the second snapshot let value = hummock_storage.get(&anchor, epoch2).await.unwrap().unwrap(); - assert_eq!(Bytes::from(value), Bytes::from("111111")); + assert_eq!(value.to_bytes(), Bytes::from("111111")); // Update aa, write cc let mut iter = hummock_storage - .range_scan(..=b"ee".to_vec(), epoch2) + .iter(..=b"ee".to_vec(), epoch2) .await .unwrap(); - iter.rewind().await.unwrap(); let len = count_iter(&mut iter).await; assert_eq!(len, 3); } diff --git a/rust/storage/src/store_impl.rs b/rust/storage/src/store_impl.rs index 7ca1c8cbbbe07..711f703420609 100644 --- a/rust/storage/src/store_impl.rs +++ b/rust/storage/src/store_impl.rs @@ -21,7 +21,7 @@ use risingwave_rpc_client::MetaClient; use crate::hummock::hummock_meta_client::RpcHummockMetaClient; use crate::hummock::local_version_manager::LocalVersionManager; -use crate::hummock::{HummockStateStore, SstableStore}; +use crate::hummock::{HummockStorage, SstableStore}; use crate::memory::MemoryStateStore; use crate::monitor::{HummockMetrics, MonitoredStateStore as Monitored, StateStoreMetrics}; use crate::object::{InMemObjectStore, ObjectStore, S3ObjectStore}; @@ -40,7 +40,7 @@ pub enum StateStoreImpl { /// * `hummock+s3://bucket` /// * `hummock+minio://KEY:SECRET@minio-ip:port` /// * `hummock+memory` (should only be used in 1 compute node mode) - HummockStateStore(Monitored), + HummockStateStore(Monitored), /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state /// store misses some critical implementation to ensure the correctness of persisting streaming @@ -93,8 +93,6 @@ impl StateStoreImpl { ) -> Result { let store = match s { hummock if hummock.starts_with("hummock") => { - use crate::hummock::HummockStorage; - let object_store = match hummock { s3 if s3.starts_with("hummock+s3://") => Arc::new( S3ObjectStore::new(s3.strip_prefix("hummock+s3://").unwrap().to_string()) @@ -123,20 +121,18 @@ impl StateStoreImpl { config.data_directory.to_string(), state_store_stats.clone(), )); - let inner = HummockStateStore::new( - HummockStorage::new( - config.clone(), - sstable_store.clone(), - Arc::new(LocalVersionManager::new(sstable_store)), - Arc::new(RpcHummockMetaClient::new( - meta_client, - hummock_stats.clone(), - )), - state_store_stats.clone(), - ) - .await - .map_err(RwError::from)?, - ); + let inner = HummockStorage::new( + config.clone(), + sstable_store.clone(), + Arc::new(LocalVersionManager::new(sstable_store)), + Arc::new(RpcHummockMetaClient::new( + meta_client, + hummock_stats.clone(), + )), + state_store_stats.clone(), + ) + .await + .map_err(RwError::from)?; StateStoreImpl::HummockStateStore(inner.monitored(state_store_stats)) }