From a5cddc2112fcfb696387b75edfcde9f806df7efc Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 9 Mar 2023 13:53:04 +0800 Subject: [PATCH] refactor(storage): distinguish SST id and object id (close #8434) --- dashboard/proto/gen/hummock.ts | 27 ++-- proto/hummock.proto | 18 +-- src/ctl/src/cmd_impl/hummock/sst_dump.rs | 15 +- src/meta/src/barrier/mod.rs | 2 +- .../src/hummock/compaction/level_selector.rs | 16 +-- src/meta/src/hummock/compaction/mod.rs | 9 +- .../picker/base_level_compaction_picker.rs | 36 ++--- .../picker/manual_compaction_picker.rs | 42 +++--- .../picker/min_overlap_compaction_picker.rs | 18 +-- .../picker/space_reclaim_compaction_picker.rs | 17 +-- .../picker/tier_compaction_picker.rs | 4 +- .../picker/ttl_reclaim_compaction_picker.rs | 16 +-- .../src/hummock/compaction_schedule_policy.rs | 4 +- src/meta/src/hummock/level_handler.rs | 6 +- src/meta/src/hummock/manager/compaction.rs | 3 +- .../manager/compaction_group_manager.rs | 52 ++++--- src/meta/src/hummock/manager/gc.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 85 +++++++----- src/meta/src/hummock/manager/tests.rs | 129 +++++++++++------- .../src/hummock/mock_hummock_meta_client.rs | 2 +- src/meta/src/hummock/test_utils.rs | 24 ++-- src/meta/src/hummock/vacuum.rs | 4 +- src/storage/backup/src/lib.rs | 2 +- src/storage/hummock_sdk/src/compact.rs | 12 +- .../compaction_group/hummock_version_ext.rs | 118 ++++++++++------ .../src/hummock_read_version_tests.rs | 14 +- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/src/hummock/compactor/iterator.rs | 7 +- src/storage/src/hummock/compactor/mod.rs | 2 +- .../src/hummock/event_handler/uploader.rs | 4 +- src/storage/src/hummock/sstable/builder.rs | 4 +- src/storage/src/hummock/sstable/mod.rs | 4 +- src/storage/src/hummock/sstable_store.rs | 2 +- src/storage/src/hummock/store/version.rs | 4 +- src/storage/src/hummock/test_utils.rs | 8 +- src/storage/src/hummock/utils.rs | 2 +- src/storage/src/hummock/validator.rs | 25 ++-- 37 files changed, 428 insertions(+), 313 deletions(-) diff --git a/dashboard/proto/gen/hummock.ts b/dashboard/proto/gen/hummock.ts index b747e56ef1c1e..d9d2867f99e1f 100644 --- a/dashboard/proto/gen/hummock.ts +++ b/dashboard/proto/gen/hummock.ts @@ -47,15 +47,14 @@ export function levelTypeToJSON(object: LevelType): string { } export interface SstableInfo { - id: number; + objectId: number; + sstId: number; keyRange: KeyRange | undefined; fileSize: number; tableIds: number[]; metaOffset: number; staleKeyCount: number; totalKeyCount: number; - /** When a SST is divided, its divide_version will increase one. */ - divideVersion: number; minEpoch: number; maxEpoch: number; uncompressedFileSize: number; @@ -97,6 +96,7 @@ export interface GroupConstruct { parentGroupId: number; tableIds: number[]; groupId: number; + newSstStartId: number; } export interface GroupMetaChange { @@ -848,14 +848,14 @@ export interface HummockVersionStats_TableStatsEntry { function createBaseSstableInfo(): SstableInfo { return { - id: 0, + objectId: 0, + sstId: 0, keyRange: undefined, fileSize: 0, tableIds: [], metaOffset: 0, staleKeyCount: 0, totalKeyCount: 0, - divideVersion: 0, minEpoch: 0, maxEpoch: 0, uncompressedFileSize: 0, @@ -865,14 +865,14 @@ function createBaseSstableInfo(): SstableInfo { export const SstableInfo = { fromJSON(object: any): SstableInfo { return { - id: isSet(object.id) ? Number(object.id) : 0, + objectId: isSet(object.objectId) ? Number(object.objectId) : 0, + sstId: isSet(object.sstId) ? Number(object.sstId) : 0, keyRange: isSet(object.keyRange) ? KeyRange.fromJSON(object.keyRange) : undefined, fileSize: isSet(object.fileSize) ? Number(object.fileSize) : 0, tableIds: Array.isArray(object?.tableIds) ? object.tableIds.map((e: any) => Number(e)) : [], metaOffset: isSet(object.metaOffset) ? Number(object.metaOffset) : 0, staleKeyCount: isSet(object.staleKeyCount) ? Number(object.staleKeyCount) : 0, totalKeyCount: isSet(object.totalKeyCount) ? Number(object.totalKeyCount) : 0, - divideVersion: isSet(object.divideVersion) ? Number(object.divideVersion) : 0, minEpoch: isSet(object.minEpoch) ? Number(object.minEpoch) : 0, maxEpoch: isSet(object.maxEpoch) ? Number(object.maxEpoch) : 0, uncompressedFileSize: isSet(object.uncompressedFileSize) ? Number(object.uncompressedFileSize) : 0, @@ -881,7 +881,8 @@ export const SstableInfo = { toJSON(message: SstableInfo): unknown { const obj: any = {}; - message.id !== undefined && (obj.id = Math.round(message.id)); + message.objectId !== undefined && (obj.objectId = Math.round(message.objectId)); + message.sstId !== undefined && (obj.sstId = Math.round(message.sstId)); message.keyRange !== undefined && (obj.keyRange = message.keyRange ? KeyRange.toJSON(message.keyRange) : undefined); message.fileSize !== undefined && (obj.fileSize = Math.round(message.fileSize)); if (message.tableIds) { @@ -892,7 +893,6 @@ export const SstableInfo = { message.metaOffset !== undefined && (obj.metaOffset = Math.round(message.metaOffset)); message.staleKeyCount !== undefined && (obj.staleKeyCount = Math.round(message.staleKeyCount)); message.totalKeyCount !== undefined && (obj.totalKeyCount = Math.round(message.totalKeyCount)); - message.divideVersion !== undefined && (obj.divideVersion = Math.round(message.divideVersion)); message.minEpoch !== undefined && (obj.minEpoch = Math.round(message.minEpoch)); message.maxEpoch !== undefined && (obj.maxEpoch = Math.round(message.maxEpoch)); message.uncompressedFileSize !== undefined && (obj.uncompressedFileSize = Math.round(message.uncompressedFileSize)); @@ -901,7 +901,8 @@ export const SstableInfo = { fromPartial, I>>(object: I): SstableInfo { const message = createBaseSstableInfo(); - message.id = object.id ?? 0; + message.objectId = object.objectId ?? 0; + message.sstId = object.sstId ?? 0; message.keyRange = (object.keyRange !== undefined && object.keyRange !== null) ? KeyRange.fromPartial(object.keyRange) : undefined; @@ -910,7 +911,6 @@ export const SstableInfo = { message.metaOffset = object.metaOffset ?? 0; message.staleKeyCount = object.staleKeyCount ?? 0; message.totalKeyCount = object.totalKeyCount ?? 0; - message.divideVersion = object.divideVersion ?? 0; message.minEpoch = object.minEpoch ?? 0; message.maxEpoch = object.maxEpoch ?? 0; message.uncompressedFileSize = object.uncompressedFileSize ?? 0; @@ -1080,7 +1080,7 @@ export const IntraLevelDelta = { }; function createBaseGroupConstruct(): GroupConstruct { - return { groupConfig: undefined, parentGroupId: 0, tableIds: [], groupId: 0 }; + return { groupConfig: undefined, parentGroupId: 0, tableIds: [], groupId: 0, newSstStartId: 0 }; } export const GroupConstruct = { @@ -1090,6 +1090,7 @@ export const GroupConstruct = { parentGroupId: isSet(object.parentGroupId) ? Number(object.parentGroupId) : 0, tableIds: Array.isArray(object?.tableIds) ? object.tableIds.map((e: any) => Number(e)) : [], groupId: isSet(object.groupId) ? Number(object.groupId) : 0, + newSstStartId: isSet(object.newSstStartId) ? Number(object.newSstStartId) : 0, }; }, @@ -1104,6 +1105,7 @@ export const GroupConstruct = { obj.tableIds = []; } message.groupId !== undefined && (obj.groupId = Math.round(message.groupId)); + message.newSstStartId !== undefined && (obj.newSstStartId = Math.round(message.newSstStartId)); return obj; }, @@ -1115,6 +1117,7 @@ export const GroupConstruct = { message.parentGroupId = object.parentGroupId ?? 0; message.tableIds = object.tableIds?.map((e) => e) || []; message.groupId = object.groupId ?? 0; + message.newSstStartId = object.newSstStartId ?? 0; return message; }, }; diff --git a/proto/hummock.proto b/proto/hummock.proto index bf572a20992de..ea7051058b0fc 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -10,15 +10,14 @@ option java_package = "com.risingwave.proto"; option optimize_for = SPEED; message SstableInfo { - uint64 id = 1; - KeyRange key_range = 2; - uint64 file_size = 3; - repeated uint32 table_ids = 4; - uint64 meta_offset = 5; - uint64 stale_key_count = 6; - uint64 total_key_count = 7; - // When a SST is divided, its divide_version will increase one. - uint64 divide_version = 8; + uint64 object_id = 1; + uint64 sst_id = 2; + KeyRange key_range = 3; + uint64 file_size = 4; + repeated uint32 table_ids = 5; + uint64 meta_offset = 6; + uint64 stale_key_count = 7; + uint64 total_key_count = 8; uint64 min_epoch = 9; uint64 max_epoch = 10; uint64 uncompressed_file_size = 11; @@ -64,6 +63,7 @@ message GroupConstruct { uint64 parent_group_id = 2; repeated uint32 table_ids = 3; uint64 group_id = 4; + uint64 new_sst_start_id = 5; } message GroupMetaChange { diff --git a/src/ctl/src/cmd_impl/hummock/sst_dump.rs b/src/ctl/src/cmd_impl/hummock/sst_dump.rs index 17f60fc6d75fc..5cbe594443113 100644 --- a/src/ctl/src/cmd_impl/hummock/sst_dump.rs +++ b/src/ctl/src/cmd_impl/hummock/sst_dump.rs @@ -26,6 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_frontend::TableCatalog; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt; use risingwave_hummock_sdk::key::FullKey; +use risingwave_hummock_sdk::HummockSstableId; use risingwave_object_store::object::BlockLocation; use risingwave_pb::hummock::{Level, SstableInfo}; use risingwave_rpc_client::MetaClient; @@ -73,14 +74,14 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result for level in version.get_combined_levels() { for sstable_info in &level.table_infos { if let Some(sst_id) = &args.sst_id { - if *sst_id == sstable_info.id { + if *sst_id == sstable_info.get_object_id() { if args.print_level { print_level(level); } sst_dump_via_sstable_store( sstable_store, - sstable_info.id, + sstable_info.get_object_id(), sstable_info.meta_offset, sstable_info.file_size, &table_data, @@ -96,7 +97,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result sst_dump_via_sstable_store( sstable_store, - sstable_info.id, + sstable_info.get_object_id(), sstable_info.meta_offset, sstable_info.file_size, &table_data, @@ -111,16 +112,16 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result pub async fn sst_dump_via_sstable_store( sstable_store: &SstableStore, - sst_id: u64, + object_id: HummockSstableId, meta_offset: u64, file_size: u64, table_data: &TableData, args: &SstDumpArgs, ) -> anyhow::Result<()> { let sstable_info = SstableInfo { - id: sst_id, - meta_offset, + object_id, file_size, + meta_offset, ..Default::default() }; let sstable_cache = sstable_store @@ -129,7 +130,7 @@ pub async fn sst_dump_via_sstable_store( let sstable = sstable_cache.value().as_ref(); let sstable_meta = &sstable.meta; - println!("SST id: {}", sst_id); + println!("SST object id: {}", object_id); println!("-------------------------------------"); println!("File Size: {}", sstable.estimate_size()); diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c3cb91ff78e55..1e36284c702e2 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1022,7 +1022,7 @@ fn collect_synced_ssts( .iter_mut() .map(|grouped| { let sst_info = std::mem::take(&mut grouped.sst).expect("field not None"); - sst_to_worker.insert(sst_info.id, resp.worker_id); + sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); ExtendedSstableInfo::new( grouped.compaction_group_id, sst_info, diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 29b92467f27b3..166b085218c93 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -540,7 +540,7 @@ pub mod tests { level_type: LevelType::Overlapping as i32, total_file_size: sst.file_size, uncompressed_file_size: sst.uncompressed_file_size, - sub_level_id: sst.id, + sub_level_id: sst.get_sst_id(), table_infos: vec![sst], }); } @@ -563,7 +563,7 @@ pub mod tests { .iter() .map(|table| table.uncompressed_file_size) .sum(); - let sub_level_id = table_infos[0].id; + let sub_level_id = table_infos[0].get_sst_id(); levels.l0.as_mut().unwrap().total_file_size += total_file_size; levels.l0.as_mut().unwrap().sub_levels.push(Level { level_idx: 0, @@ -583,7 +583,8 @@ pub mod tests { epoch: u64, ) -> SstableInfo { SstableInfo { - id, + object_id: id, + sst_id: id, key_range: Some(KeyRange { left: iterator_test_key_of_epoch(table_prefix, left, epoch), right: iterator_test_key_of_epoch(table_prefix, right, epoch), @@ -594,7 +595,6 @@ pub mod tests { meta_offset: 0, stale_key_count: 0, total_key_count: 0, - divide_version: 0, uncompressed_file_size: (right - left + 1) as u64, min_epoch: 0, max_epoch: 0, @@ -613,7 +613,8 @@ pub mod tests { max_epoch: u64, ) -> SstableInfo { SstableInfo { - id, + object_id: id, + sst_id: id, key_range: Some(KeyRange { left: iterator_test_key_of_epoch(table_prefix, left, epoch), right: iterator_test_key_of_epoch(table_prefix, right, epoch), @@ -624,7 +625,6 @@ pub mod tests { meta_offset: 0, stale_key_count: 0, total_key_count: 0, - divide_version: 0, uncompressed_file_size: (right - left + 1) as u64, min_epoch, max_epoch, @@ -730,7 +730,7 @@ pub mod tests { ) { for i in &compact_task.input.input_levels { for t in &i.table_infos { - assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.id)); + assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id)); } } } @@ -906,7 +906,7 @@ pub mod tests { compaction.input.input_levels[0] .table_infos .iter() - .map(|sst| sst.id) + .map(|sst| sst.get_sst_id()) .collect_vec(), vec![5, 6, 7] ); diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 9d27359b7c414..7852bbef2fe32 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -16,6 +16,7 @@ pub mod compaction_config; mod level_selector; mod overlap_strategy; use risingwave_common::catalog::TableOption; +use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::compact_task::{self, TaskStatus}; @@ -27,7 +28,9 @@ use std::sync::Arc; use picker::{ LevelCompactionPicker, ManualCompactionPicker, MinOverlappingPicker, TierCompactionPicker, }; -use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId, HummockEpoch}; +use risingwave_hummock_sdk::{ + CompactionGroupId, HummockCompactionTaskId, HummockEpoch, HummockSstableId, +}; use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactTask, CompactionConfig, InputLevel, KeyRange, LevelType}; @@ -220,11 +223,11 @@ impl CompactStatus { #[derive(Clone, Debug, PartialEq)] pub struct ManualCompactionOption { /// Filters out SSTs to pick. Has no effect if empty. - pub sst_ids: Vec, + pub sst_ids: Vec, /// Filters out SSTs to pick. pub key_range: KeyRange, /// Filters out SSTs to pick. Has no effect if empty. - pub internal_table_id: HashSet, + pub internal_table_id: HashSet, /// Input level. pub level: usize, } diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 3f294c0835bef..a6f2e2368b49a 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -130,7 +130,7 @@ impl LevelCompactionPicker { continue; } - if level_handlers[0].is_pending_compact(&sst.id) { + if level_handlers[0].is_pending_compact(&sst.sst_id) { pending_compact = true; break; } @@ -157,7 +157,7 @@ impl LevelCompactionPicker { .filter(|sst| table_id.map(|id| sst.table_ids[0] == id).unwrap_or(true)); let mut target_level_size = 0; for sst in target_level_files.clone() { - if level_handlers[self.target_level].is_pending_compact(&sst.id) { + if level_handlers[self.target_level].is_pending_compact(&sst.sst_id) { return None; } target_level_size += sst.file_size; @@ -241,8 +241,8 @@ pub mod tests { .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); assert_eq!(ret.input_levels[0].table_infos.len(), 2); - assert_eq!(ret.input_levels[0].table_infos[0].id, 5); - assert_eq!(ret.input_levels[1].table_infos[0].id, 3); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 5); + assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 3); ret.add_pending_task(0, &mut levels_handler); // Cannot pick because sub-level[0] is pending. @@ -254,7 +254,7 @@ pub mod tests { levels.l0.as_mut().unwrap().sub_levels[0] .table_infos - .retain(|table| table.id != 4); + .retain(|table| table.get_sst_id() != 4); levels.l0.as_mut().unwrap().total_file_size -= ret.input_levels[0].table_infos[0].file_size; levels_handler[0].remove_task(0); @@ -264,9 +264,9 @@ pub mod tests { .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); assert_eq!(ret.input_levels.len(), 4); - assert_eq!(ret.input_levels[0].table_infos[0].id, 7); - assert_eq!(ret.input_levels[1].table_infos[0].id, 6); - assert_eq!(ret.input_levels[2].table_infos[0].id, 5); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 7); + assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 6); + assert_eq!(ret.input_levels[2].table_infos[0].get_sst_id(), 5); assert_eq!(ret.input_levels[3].table_infos.len(), 3); ret.add_pending_task(1, &mut levels_handler); @@ -284,9 +284,9 @@ pub mod tests { .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); assert_eq!(ret.input_levels.len(), 4); - assert_eq!(ret.input_levels[0].table_infos[0].id, 7); - assert_eq!(ret.input_levels[1].table_infos[0].id, 6); - assert_eq!(ret.input_levels[2].table_infos[0].id, 5); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 7); + assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 6); + assert_eq!(ret.input_levels[2].table_infos[0].get_sst_id(), 5); assert_eq!(ret.input_levels[3].table_infos.len(), 3); } @@ -342,7 +342,7 @@ pub mod tests { ret.input_levels[1] .table_infos .iter() - .map(|t| t.id) + .map(|t| t.get_sst_id()) .collect_vec(), vec![1] ); @@ -351,7 +351,7 @@ pub mod tests { ret.input_levels[0] .table_infos .iter() - .map(|t| t.id) + .map(|t| t.get_sst_id()) .collect_vec(), vec![7, 8] ); @@ -488,9 +488,9 @@ pub mod tests { .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); assert_eq!(ret.input_levels.len(), 3); - assert_eq!(ret.input_levels[2].table_infos[0].id, 1); - assert_eq!(ret.input_levels[2].table_infos[1].id, 2); - assert_eq!(ret.input_levels[2].table_infos[2].id, 3); + assert_eq!(ret.input_levels[2].table_infos[0].get_sst_id(), 1); + assert_eq!(ret.input_levels[2].table_infos[1].get_sst_id(), 2); + assert_eq!(ret.input_levels[2].table_infos[2].get_sst_id(), 3); levels.levels[0].table_infos[0].file_size += 1600 - levels.levels[0].total_file_size; levels.levels[0].total_file_size = 1600; let sub_level = &mut levels.l0.as_mut().unwrap().sub_levels[0]; @@ -572,7 +572,7 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - assert_eq!(ret.input_levels[0].table_infos[0].id, 7); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 7); assert_eq!( 3, ret.input_levels.iter().filter(|l| l.level_idx == 0).count() @@ -596,7 +596,7 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - assert_eq!(ret.input_levels[0].table_infos[0].id, 6); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6); assert_eq!( 2, ret.input_levels.iter().filter(|l| l.level_idx == 0).count() diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index 21e78598578a6..d5a39a99c8718 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -148,7 +148,7 @@ impl ManualCompactionPicker { info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos); if target_input_ssts .iter() - .any(|table| level_handlers[self.target_level].is_pending_compact(&table.id)) + .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id)) { return None; } @@ -189,7 +189,7 @@ impl ManualCompactionPicker { && !level .table_infos .iter() - .any(|t| hint_sst_ids.contains(&t.id)) + .any(|t| hint_sst_ids.contains(&t.sst_id)) { return false; } @@ -239,7 +239,7 @@ impl CompactionPicker for ManualCompactionPicker { .get_level(self.option.level) .table_infos .iter() - .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.id)) + .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id)) .filter(|sst_info| range_overlap_info.check_overlap(sst_info)) .filter(|sst_info| { if self.option.internal_table_id.is_empty() { @@ -265,13 +265,15 @@ impl CompactionPicker for ManualCompactionPicker { .get_level(level) .table_infos .iter() - .find_position(|p| p.id == select_input_ssts.first().unwrap().id) + .find_position(|p| { + p.get_sst_id() == select_input_ssts.first().unwrap().get_sst_id() + }) .unwrap(); let (right, _) = levels .get_level(level) .table_infos .iter() - .find_position(|p| p.id == select_input_ssts.last().unwrap().id) + .find_position(|p| p.get_sst_id() == select_input_ssts.last().unwrap().get_sst_id()) .unwrap(); select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec(); vec![] @@ -283,13 +285,13 @@ impl CompactionPicker for ManualCompactionPicker { }; if select_input_ssts .iter() - .any(|table| level_handlers[level].is_pending_compact(&table.id)) + .any(|table| level_handlers[level].is_pending_compact(&table.sst_id)) { return None; } if target_input_ssts .iter() - .any(|table| level_handlers[target_level].is_pending_compact(&table.id)) + .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id)) { return None; } @@ -574,7 +576,7 @@ pub mod tests { for t in &mut l.table_infos { t.table_ids.clear(); if idx == 0 { - t.table_ids.push(((t.id % 2) + 1) as _); + t.table_ids.push(((t.get_sst_id() % 2) + 1) as _); } else { t.table_ids.push(3); } @@ -699,7 +701,7 @@ pub mod tests { result.input_levels[l] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), *e ); @@ -735,7 +737,7 @@ pub mod tests { result.input_levels[l] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), *e ); @@ -787,7 +789,7 @@ pub mod tests { result.input_levels[i] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), *e ); @@ -853,7 +855,7 @@ pub mod tests { .iter() .take(3) .flat_map(|s| s.table_infos.clone()) - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), vec![9, 10, 7, 8, 5, 6] ); @@ -861,7 +863,7 @@ pub mod tests { result.input_levels[3] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), vec![3] ); @@ -895,7 +897,7 @@ pub mod tests { .iter() .take(3) .flat_map(|s| s.table_infos.clone()) - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), vec![9, 10, 7, 8, 5, 6] ); @@ -903,7 +905,7 @@ pub mod tests { result.input_levels[3] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), vec![3] ); @@ -941,7 +943,7 @@ pub mod tests { .iter() .take(1) .flat_map(|s| s.table_infos.clone()) - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), vec![5, 6] ); @@ -949,7 +951,7 @@ pub mod tests { result.input_levels[1] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), vec![3] ); @@ -1044,7 +1046,7 @@ pub mod tests { result.input_levels[l] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), *e ); @@ -1087,7 +1089,7 @@ pub mod tests { result.input_levels[i] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), *e ); @@ -1137,7 +1139,7 @@ pub mod tests { result.input_levels[i] .table_infos .iter() - .map(|s| s.id) + .map(|s| s.get_sst_id()) .collect_vec(), *e ); diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index cd653cb3b49f4..db4b3b0986b6b 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -55,13 +55,13 @@ impl MinOverlappingPicker { ) -> (Vec, Vec) { let mut scores = vec![]; for left in 0..select_tables.len() { - if level_handlers[self.level].is_pending_compact(&select_tables[left].id) { + if level_handlers[self.level].is_pending_compact(&select_tables[left].sst_id) { continue; } let mut overlap_info = self.overlap_strategy.create_overlap_info(); let mut select_file_size = 0; for (right, table) in select_tables.iter().enumerate().skip(left) { - if level_handlers[self.level].is_pending_compact(&table.id) { + if level_handlers[self.level].is_pending_compact(&table.sst_id) { break; } if self.split_by_table && table.table_ids != select_tables[left].table_ids { @@ -76,7 +76,7 @@ impl MinOverlappingPicker { let mut total_file_size = 0; let mut pending_campct = false; for other in overlap_files { - if level_handlers[self.target_level].is_pending_compact(&other.id) { + if level_handlers[self.target_level].is_pending_compact(&other.sst_id) { pending_campct = true; break; } @@ -210,7 +210,7 @@ pub mod tests { assert_eq!(ret.input_levels[0].level_idx, 1); assert_eq!(ret.target_level, 2); assert_eq!(ret.input_levels[0].table_infos.len(), 1); - assert_eq!(ret.input_levels[0].table_infos[0].id, 2); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 2); assert_eq!(ret.input_levels[1].table_infos.len(), 0); ret.add_pending_task(0, &mut level_handlers); @@ -221,8 +221,8 @@ pub mod tests { assert_eq!(ret.target_level, 2); assert_eq!(ret.input_levels[0].table_infos.len(), 2); assert_eq!(ret.input_levels[1].table_infos.len(), 3); - assert_eq!(ret.input_levels[0].table_infos[0].id, 0); - assert_eq!(ret.input_levels[1].table_infos[0].id, 4); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 0); + assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 4); ret.add_pending_task(1, &mut level_handlers); let ret = picker.pick_compaction(&levels, &level_handlers, &mut local_stats); @@ -287,10 +287,10 @@ pub mod tests { assert_eq!(ret.input_levels[1].level_idx, 2); assert_eq!(ret.input_levels[0].table_infos.len(), 2); - assert_eq!(ret.input_levels[0].table_infos[0].id, 0); - assert_eq!(ret.input_levels[0].table_infos[1].id, 1); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 0); + assert_eq!(ret.input_levels[0].table_infos[1].get_sst_id(), 1); assert_eq!(ret.input_levels[1].table_infos.len(), 1); - assert_eq!(ret.input_levels[1].table_infos[0].id, 4); + assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 4); } } diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index 878e5fd1ffcc5..57eafec9ee11e 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -133,7 +133,8 @@ impl SpaceReclaimCompactionPicker { .as_ref() .unwrap() .sstable_overlap(&state.last_select_end_bound); - if unmatched_sst || (level_handler.is_pending_compact(&sst.id) || self.filter(sst)) { + if unmatched_sst || (level_handler.is_pending_compact(&sst.sst_id) || self.filter(sst)) + { if !select_input_ssts.is_empty() { // Our goal is to pick as many complete layers of data as possible and keep the // picked files contiguous to avoid overlapping key_ranges, so the strategy is @@ -246,7 +247,7 @@ mod test { { let sst_10 = levels[3].table_infos.get_mut(8).unwrap(); - assert_eq!(10, sst_10.id); + assert_eq!(10, sst_10.get_sst_id()); sst_10.key_range.as_mut().unwrap().right_exclusive = true; } @@ -281,7 +282,7 @@ mod test { let mut start_id = 2; for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -330,7 +331,7 @@ mod test { let mut start_id = 7; for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -365,7 +366,7 @@ mod test { compact_task::TaskType::SpaceReclaim )); for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -430,7 +431,7 @@ mod test { let mut start_id = 10; for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -478,7 +479,7 @@ mod test { let select_sst = &task.input.input_levels[0] .table_infos .iter() - .map(|sst| sst.id) + .map(|sst| sst.get_sst_id()) .collect_vec(); assert!(select_sst.is_sorted()); assert_eq!(expect_task_sst_id_range[index], *select_sst); @@ -532,7 +533,7 @@ mod test { let select_sst = &task.input.input_levels[0] .table_infos .iter() - .map(|sst| sst.id) + .map(|sst| sst.get_sst_id()) .collect_vec(); assert!(select_sst.is_sorted()); assert_eq!(expect_task_sst_id_range[index], *select_sst); diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index df9bcdeb182d2..2406dc59a9ee0 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -198,7 +198,7 @@ impl TierCompactionPicker { } for sst in &level.table_infos { - if level_handler.is_pending_compact(&sst.id) { + if level_handler.is_pending_compact(&sst.sst_id) { continue; } } @@ -228,7 +228,7 @@ impl TierCompactionPicker { let mut cur_level_size = 0; for sst in &other.table_infos { if *table_id == sst.table_ids[0] { - if level_handler.is_pending_compact(&sst.id) { + if level_handler.is_pending_compact(&sst.sst_id) { pending_compact = true; break; } diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index 429817373bf95..b973ac0882fdf 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -164,7 +164,7 @@ impl TtlReclaimCompactionPicker { .sstable_overlap(&state.last_select_end_bound); if unmatched_sst - || level_handler.is_pending_compact(&sst.id) + || level_handler.is_pending_compact(&sst.sst_id) || self.filter(sst, current_epoch_time) { if !select_input_ssts.is_empty() { @@ -362,7 +362,7 @@ mod test { { let sst_10 = levels[3].table_infos.get_mut(8).unwrap(); - assert_eq!(10, sst_10.id); + assert_eq!(10, sst_10.get_sst_id()); sst_10.key_range.as_mut().unwrap().right_exclusive = true; } @@ -404,7 +404,7 @@ mod test { let mut start_id = 2; for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -455,7 +455,7 @@ mod test { let mut start_id = 7; for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -490,7 +490,7 @@ mod test { compact_task::TaskType::Ttl )); for sst in &task.input.input_levels[0].table_infos { - assert_eq!(start_id, sst.id); + assert_eq!(start_id, sst.get_sst_id()); start_id += 1; } @@ -551,7 +551,7 @@ mod test { // test table_option_filter assert_eq!(task.input.input_levels[0].table_infos.len(), 1); let select_sst = &task.input.input_levels[0].table_infos.first().unwrap(); - assert_eq!(select_sst.id, 5); + assert_eq!(select_sst.get_sst_id(), 5); assert_eq!(task.input.input_levels[1].level_idx, 4); assert_eq!(task.input.input_levels[1].table_infos.len(), 0); @@ -655,7 +655,7 @@ mod test { let select_sst = &task.input.input_levels[0] .table_infos .iter() - .map(|sst| sst.id) + .map(|sst| sst.get_sst_id()) .collect_vec(); assert!(select_sst.is_sorted()); assert_eq!(expect_task_sst_id_range[index], *select_sst); @@ -746,7 +746,7 @@ mod test { let select_sst = &task.input.input_levels[0] .table_infos .iter() - .map(|sst| sst.id) + .map(|sst| sst.get_sst_id()) .collect_vec(); assert!(select_sst.is_sorted()); assert_eq!(expect_task_sst_id_range[index], *select_sst); diff --git a/src/meta/src/hummock/compaction_schedule_policy.rs b/src/meta/src/hummock/compaction_schedule_policy.rs index 4b53379f0ffd8..886cd0eabad02 100644 --- a/src/meta/src/hummock/compaction_schedule_policy.rs +++ b/src/meta/src/hummock/compaction_schedule_policy.rs @@ -414,14 +414,14 @@ mod tests { level_idx: 0, level_type: 0, table_infos: vec![SstableInfo { - id: 0, + object_id: 0, + sst_id: 0, key_range: None, file_size: input_file_size, table_ids: vec![], meta_offset: 0, stale_key_count: 0, total_key_count: 0, - divide_version: 0, uncompressed_file_size: input_file_size, min_epoch: 0, max_epoch: 0, diff --git a/src/meta/src/hummock/level_handler.rs b/src/meta/src/hummock/level_handler.rs index 42011463ad89e..5dd27c51a38b3 100644 --- a/src/meta/src/hummock/level_handler.rs +++ b/src/meta/src/hummock/level_handler.rs @@ -66,7 +66,7 @@ impl LevelHandler { level .table_infos .iter() - .any(|table| self.compacting_files.contains_key(&table.id)) + .any(|table| self.compacting_files.contains_key(&table.sst_id)) } pub fn add_pending_task(&mut self, task_id: u64, target_level: usize, ssts: &[SstableInfo]) { @@ -74,9 +74,9 @@ impl LevelHandler { let mut table_ids = vec![]; let mut total_file_size = 0; for sst in ssts { - self.compacting_files.insert(sst.id, task_id); + self.compacting_files.insert(sst.get_sst_id(), task_id); total_file_size += sst.file_size; - table_ids.push(sst.id); + table_ids.push(sst.get_sst_id()); } self.pending_tasks.push(RunningCompactTask { diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 7558c5cb1e710..828eb2b76695c 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -152,7 +152,8 @@ mod tests { compact_task.task_id, compact_task.target_level as usize, &[SstableInfo { - id: 1, + object_id: 1, + sst_id: 1, ..Default::default() }], ); diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 0007803645d8a..fa1000e06b18a 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -19,9 +19,9 @@ use std::sync::Arc; use function_name::named; use itertools::Itertools; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - build_version_delta_after_version, get_compaction_group_ids, get_compaction_group_sst_ids, - get_member_table_ids, try_get_compaction_group_id_by_table_id, HummockVersionExt, - HummockVersionUpdateExt, + build_version_delta_after_version, get_compaction_group_ids, get_compaction_group_object_ids, + get_member_table_ids, try_get_compaction_group_id_by_table_id, HummockLevelsExt, + HummockVersionExt, HummockVersionUpdateExt, }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::CompactionGroupId; @@ -328,9 +328,9 @@ impl HummockManager { // We don't bother to add IntraLevelDelta to remove SSTs from group, because the entire // group is to be removed. // However, we need to take care of SST GC for the removed group. - for sst_id in get_compaction_group_sst_ids(current_version, *group_id) { - if drop_sst(&mut branched_ssts, *group_id, sst_id) { - new_version_delta.gc_sst_ids.push(sst_id); + for object_id in get_compaction_group_object_ids(current_version, *group_id) { + if drop_sst(&mut branched_ssts, *group_id, object_id) { + new_version_delta.gc_sst_ids.push(object_id); } } let group_deltas = &mut new_version_delta @@ -477,6 +477,13 @@ impl HummockManager { .id_gen_manager() .generate::<{ IdCategory::CompactionGroup }>() .await?; + let new_sst_start_id = self + .env + .id_gen_manager() + .generate_interval::<{ IdCategory::HummockSstableId }>( + parent_group.count_ssts() as u64 * 2, + ) + .await?; let group_deltas = &mut new_version_delta .group_deltas .entry(new_group_id) @@ -494,6 +501,7 @@ impl HummockManager { group_id: new_group_id, parent_group_id, table_ids, + new_sst_start_id, })), }); @@ -505,27 +513,27 @@ impl HummockManager { .current_version .apply_version_delta(&new_version_delta); // Updates SST split info - for (id, divide_ver, _, is_trivial_adjust) in sst_split_info { - match branched_ssts.get_mut(id) { + for (object_id, sst_id, parent_new_sst_id) in sst_split_info { + match branched_ssts.get_mut(object_id) { Some(mut entry) => { - if is_trivial_adjust { - entry.remove(&parent_group_id).unwrap(); - } else { + if let Some(parent_new_sst_id) = parent_new_sst_id { let p = entry.get_mut(&parent_group_id).unwrap(); - assert_eq!(*p + 1, divide_ver); - *p = divide_ver; + *p = parent_new_sst_id; + } else { + entry.remove(&parent_group_id).unwrap(); } - entry.insert(new_group_id, divide_ver); + entry.insert(new_group_id, sst_id); } None => { - let to_insert: HashMap = if is_trivial_adjust { - [(new_group_id, divide_ver)].into_iter().collect() - } else { - [(parent_group_id, divide_ver), (new_group_id, divide_ver)] - .into_iter() - .collect() - }; - branched_ssts.insert(id, to_insert); + let to_insert: HashMap = + if let Some(parent_new_sst_id) = parent_new_sst_id { + [(parent_group_id, parent_new_sst_id), (new_group_id, sst_id)] + .into_iter() + .collect() + } else { + [(new_group_id, sst_id)].into_iter().collect() + }; + branched_ssts.insert(object_id, to_insert); } } } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 547eee0dce6bc..386d8930dfd18 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -100,7 +100,7 @@ where let tracked_sst_ids: HashSet = { let versioning_guard = read_lock!(self, versioning).await; let mut tracked_sst_ids = - HashSet::from_iter(versioning_guard.current_version.get_sst_ids()); + HashSet::from_iter(versioning_guard.current_version.get_object_ids()); for delta in versioning_guard.hummock_version_deltas.values() { tracked_sst_ids.extend(delta.get_gc_sst_ids()); } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index cfdded3601c78..45da430526243 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -1035,17 +1035,16 @@ where ) -> bool { for input_level in compact_task.get_input_ssts() { for table_info in input_level.get_table_infos() { - if match branched_ssts.get(&table_info.id) { - Some(mp) => match mp.get(&compact_task.compaction_group_id) { - Some(divide_version) => *divide_version, + if let Some(mp) = branched_ssts.get(&table_info.object_id) { + if match mp.get(&compact_task.compaction_group_id) { + Some(sst_id) => *sst_id, None => { return true; } - }, - None => 0, - } > table_info.divide_version - { - return true; + } != table_info.sst_id + { + return true; + } } } } @@ -1339,22 +1338,22 @@ where new_version_delta.max_committed_epoch = epoch; let mut new_hummock_version = old_version.clone(); new_hummock_version.id = new_version_delta.id; - let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); - let mut branch_sstables = vec![]; + let mut incorrect_ssts = vec![]; + let mut new_sst_id_number = 0; sstables.retain_mut(|local_sst_info| { let ExtendedSstableInfo { compaction_group_id, sst_info: sst, .. } = local_sst_info; - let is_sst_belong_to_group_declared = match old_version.levels.get(compaction_group_id) - { - Some(compaction_group) => sst - .table_ids - .iter() - .all(|t| compaction_group.member_table_ids.contains(t)), - None => false, - }; + let mut is_sst_belong_to_group_declared = + match old_version.levels.get(compaction_group_id) { + Some(compaction_group) => sst + .table_ids + .iter() + .all(|t| compaction_group.member_table_ids.contains(t)), + None => false, + }; if !is_sst_belong_to_group_declared { let mut group_table_ids: BTreeMap<_, Vec<_>> = BTreeMap::new(); for table_id in sst.get_table_ids() { @@ -1372,7 +1371,7 @@ where tracing::warn!( "table {} in SST {} doesn't belong to any compaction group", table_id, - sst.get_id(), + sst.get_object_id(), ); } } @@ -1380,24 +1379,40 @@ where let is_trivial_adjust = group_table_ids.len() == 1 && group_table_ids.first_key_value().unwrap().1.len() == sst.get_table_ids().len(); - if !is_trivial_adjust { - sst.divide_version += 1; - } - let mut branch_groups = HashMap::new(); - for (group_id, match_ids) in group_table_ids { - let mut branch_sst = sst.clone(); - branch_sst.table_ids = match_ids; - branch_sstables.push(ExtendedSstableInfo::with_compaction_group( - group_id, branch_sst, - )); - branch_groups.insert(group_id, sst.get_divide_version()); - } - if !branch_groups.is_empty() && !is_trivial_adjust { - branched_ssts.insert(sst.get_id(), branch_groups); + if is_trivial_adjust { + *compaction_group_id = *group_table_ids.first_key_value().unwrap().0; + is_sst_belong_to_group_declared = true; + } else { + new_sst_id_number += group_table_ids.len(); + incorrect_ssts.push((std::mem::take(sst), group_table_ids)); } } is_sst_belong_to_group_declared }); + let mut new_sst_id = self + .env + .id_gen_manager() + .generate_interval::<{ IdCategory::HummockSstableId }>(new_sst_id_number as u64) + .await?; + let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); + let mut branch_sstables = Vec::with_capacity(new_sst_id_number); + for (sst, group_table_ids) in incorrect_ssts { + let mut branch_groups = HashMap::new(); + for (group_id, match_ids) in group_table_ids { + let mut branch_sst = sst.clone(); + branch_sst.sst_id = new_sst_id; + branch_sst.table_ids = match_ids; + branch_sstables.push(ExtendedSstableInfo::with_compaction_group( + group_id, branch_sst, + )); + branch_groups.insert(group_id, new_sst_id); + new_sst_id += 1; + } + if !branch_groups.is_empty() { + branched_ssts.insert(sst.get_object_id(), branch_groups); + } + } + sstables.append(&mut branch_sstables); let mut modified_compaction_groups = vec![]; @@ -2055,13 +2070,13 @@ fn gen_version_delta<'a>( .table_infos .iter() .map(|sst| { - let id = sst.id; + let id = sst.get_object_id(); if !trivial_move && drop_sst(branched_ssts, compact_task.compaction_group_id, id) { gc_sst_ids.push(id); } - id + sst.get_sst_id() }) .collect_vec(), ..Default::default() diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 26a8e0096c43a..1a9ade9ede73f 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - get_compaction_group_ids, get_compaction_group_sst_ids, HummockVersionExt, + get_compaction_group_ids, get_compaction_group_object_ids, HummockVersionExt, }; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::table_stats::{to_prost_table_stats_map, TableStats, TableStatsMap}; @@ -246,15 +246,15 @@ async fn test_hummock_table() { .iter() .chain(levels.levels.iter()) .flat_map(|level| level.table_infos.iter()) - .map(|info| info.id) + .map(|info| info.get_object_id()) .sorted() - .cmp(original_tables.iter().map(|ot| ot.id).sorted()) + .cmp(original_tables.iter().map(|ot| ot.get_object_id()).sorted()) ); // Confirm tables got are equal to original tables assert_eq!( - get_sorted_sstable_ids(&original_tables), - get_sorted_committed_sstable_ids(&pinned_version) + get_sorted_object_ids(&original_tables), + get_sorted_committed_object_ids(&pinned_version) ); } @@ -279,7 +279,7 @@ async fn test_hummock_transaction() { // Get tables before committing epoch1. No tables should be returned. let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.max_committed_epoch, INVALID_EPOCH); - assert!(get_sorted_committed_sstable_ids(¤t_version).is_empty()); + assert!(get_sorted_committed_object_ids(¤t_version).is_empty()); // Commit epoch1 commit_from_meta_node( @@ -295,8 +295,8 @@ async fn test_hummock_transaction() { let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.max_committed_epoch, epoch1); assert_eq!( - get_sorted_sstable_ids(&committed_tables), - get_sorted_committed_sstable_ids(¤t_version) + get_sorted_object_ids(&committed_tables), + get_sorted_committed_object_ids(¤t_version) ); } @@ -318,8 +318,8 @@ async fn test_hummock_transaction() { let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.max_committed_epoch, epoch1); assert_eq!( - get_sorted_sstable_ids(&committed_tables), - get_sorted_committed_sstable_ids(¤t_version) + get_sorted_object_ids(&committed_tables), + get_sorted_committed_object_ids(¤t_version) ); // Commit epoch2 @@ -337,8 +337,8 @@ async fn test_hummock_transaction() { let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.max_committed_epoch, epoch2); assert_eq!( - get_sorted_sstable_ids(&committed_tables), - get_sorted_committed_sstable_ids(¤t_version) + get_sorted_object_ids(&committed_tables), + get_sorted_committed_object_ids(¤t_version) ); } } @@ -763,7 +763,7 @@ async fn test_invalid_sst_id() { // reject due to invalid context id let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, WorkerId::MAX)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), WorkerId::MAX)) .collect(); let error = hummock_manager .commit_epoch(epoch, ssts.clone(), sst_to_worker) @@ -773,7 +773,7 @@ async fn test_invalid_sst_id() { let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, context_id)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), context_id)) .collect(); hummock_manager .commit_epoch(epoch, ssts, sst_to_worker) @@ -1129,14 +1129,19 @@ async fn test_extend_ssts_to_delete() { let sst_infos = add_test_tables(hummock_manager.as_ref(), context_id).await; let max_committed_sst_id = sst_infos .iter() - .map(|ssts| ssts.iter().max_by_key(|s| s.id).map(|s| s.id).unwrap()) + .map(|ssts| { + ssts.iter() + .max_by_key(|s| s.get_object_id()) + .map(|s| s.get_object_id()) + .unwrap() + }) .max() .unwrap(); let orphan_sst_num = 10; let orphan_sst_ids = sst_infos .iter() .flatten() - .map(|s| s.id) + .map(|s| s.get_object_id()) .chain(max_committed_sst_id + 1..=max_committed_sst_id + orphan_sst_num) .collect_vec(); assert!(hummock_manager.get_ssts_to_delete().await.is_empty()); @@ -1195,7 +1200,8 @@ async fn test_version_stats() { .map(|(idx, table_ids)| LocalSstableInfo { compaction_group_id: StaticCompactionGroupId::StateDefault as _, sst_info: SstableInfo { - id: sst_ids[idx], + object_id: sst_ids[idx], + sst_id: sst_ids[idx], key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 1, 1), right: iterator_test_key_of_epoch(1, 1, 1), @@ -1213,7 +1219,7 @@ async fn test_version_stats() { .collect_vec(); let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, worker_node.id)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), worker_node.id)) .collect(); hummock_manager .commit_epoch(epoch, ssts, sst_to_worker) @@ -1307,10 +1313,10 @@ async fn test_split_compaction_group_on_commit() { let sst_1 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 10, + object_id: 10, + sst_id: 10, key_range: None, table_ids: vec![100, 101], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() @@ -1323,8 +1329,14 @@ async fn test_split_compaction_group_on_commit() { .unwrap(); let current_version = hummock_manager.get_current_version().await; assert_eq!(current_version.levels.len(), 2); - assert_eq!(get_compaction_group_sst_ids(¤t_version, 2), vec![10]); - assert_eq!(get_compaction_group_sst_ids(¤t_version, 3), vec![10]); + assert_eq!( + get_compaction_group_object_ids(¤t_version, 2), + vec![10] + ); + assert_eq!( + get_compaction_group_object_ids(¤t_version, 3), + vec![10] + ); assert_eq!( current_version .get_compaction_group_levels(2) @@ -1345,7 +1357,7 @@ async fn test_split_compaction_group_on_commit() { .clone(); assert_eq!(branched_ssts.len(), 1); assert_eq!(branched_ssts.values().next().unwrap().len(), 2); - assert_eq!( + assert_ne!( branched_ssts .values() .next() @@ -1353,9 +1365,6 @@ async fn test_split_compaction_group_on_commit() { .get(&2) .cloned() .unwrap(), - 1 - ); - assert_eq!( branched_ssts .values() .next() @@ -1363,7 +1372,6 @@ async fn test_split_compaction_group_on_commit() { .get(&3) .cloned() .unwrap(), - 1 ); } @@ -1423,10 +1431,10 @@ async fn test_split_compaction_group_on_demand_basic() { let sst_1 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 10, + object_id: 10, + sst_id: 10, key_range: None, table_ids: vec![100], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() @@ -1436,10 +1444,10 @@ async fn test_split_compaction_group_on_demand_basic() { let sst_2 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 11, + object_id: 11, + sst_id: 11, key_range: None, table_ids: vec![100, 101], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() @@ -1480,11 +1488,11 @@ async fn test_split_compaction_group_on_demand_basic() { let new_group_id = current_version.levels.keys().max().cloned().unwrap(); assert!(new_group_id > StaticCompactionGroupId::End as u64); assert!( - get_compaction_group_sst_ids(¤t_version, 2).is_empty(), + get_compaction_group_object_ids(¤t_version, 2).is_empty(), "SST 10, 11 has been moved to new_group completely." ); assert_eq!( - get_compaction_group_sst_ids(¤t_version, new_group_id), + get_compaction_group_object_ids(¤t_version, new_group_id), vec![10, 11] ); assert_eq!( @@ -1501,17 +1509,17 @@ async fn test_split_compaction_group_on_demand_basic() { ); let branched_ssts = get_branched_ssts(&hummock_manager).await; assert_eq!(branched_ssts.len(), 2); - for sst_id in [10, 11] { - assert_eq!(branched_ssts.get(&sst_id).unwrap().len(), 1); + for object_id in [10, 11] { + assert_eq!(branched_ssts.get(&object_id).unwrap().len(), 1); assert_eq!( branched_ssts - .get(&sst_id) + .get(&object_id) .unwrap() .get(&new_group_id) .cloned() .unwrap(), - 0, - "trivial adjust doesn't increase divide version" + object_id, + "trivial adjust doesn't generate a new SST id" ); } } @@ -1523,10 +1531,10 @@ async fn test_split_compaction_group_on_demand_non_trivial() { let sst_1 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 10, + object_id: 10, + sst_id: 10, key_range: None, table_ids: vec![100, 101], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() @@ -1555,9 +1563,12 @@ async fn test_split_compaction_group_on_demand_non_trivial() { assert_eq!(current_version.levels.len(), 3); let new_group_id = current_version.levels.keys().max().cloned().unwrap(); assert!(new_group_id > StaticCompactionGroupId::End as u64); - assert_eq!(get_compaction_group_sst_ids(¤t_version, 2), vec![10]); assert_eq!( - get_compaction_group_sst_ids(¤t_version, new_group_id), + get_compaction_group_object_ids(¤t_version, 2), + vec![10] + ); + assert_eq!( + get_compaction_group_object_ids(¤t_version, new_group_id), vec![10] ); assert_eq!( @@ -1575,15 +1586,25 @@ async fn test_split_compaction_group_on_demand_non_trivial() { let branched_ssts = get_branched_ssts(&hummock_manager).await; assert_eq!(branched_ssts.len(), 1); assert_eq!(branched_ssts.get(&10).unwrap().len(), 2); - assert_eq!(branched_ssts.get(&10).unwrap().get(&2).cloned().unwrap(), 1,); - assert_eq!( + let sst_id = branched_ssts.get(&10).unwrap().get(&2).cloned().unwrap(); + assert_ne!(sst_id, 10); + assert_ne!( + branched_ssts + .get(&10) + .unwrap() + .get(&new_group_id) + .cloned() + .unwrap(), + sst_id, + ); + assert_ne!( branched_ssts .get(&10) .unwrap() .get(&new_group_id) .cloned() .unwrap(), - 1, + 10, ); } @@ -1629,14 +1650,14 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { let sst_1 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 10, + object_id: 10, + sst_id: 10, key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 1, 1), right: iterator_test_key_of_epoch(1, 1, 1), right_exclusive: false, }), table_ids: vec![100, 101], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() @@ -1654,7 +1675,8 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { assert_eq!(compaction_task.target_level, base_level as u32); compaction_task.sorted_output_ssts = vec![ SstableInfo { - id: sst_1.sst_info.id + 1, + object_id: sst_1.sst_info.get_object_id() + 1, + sst_id: sst_1.sst_info.get_object_id() + 1, table_ids: vec![100, 101], key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 1, 1), @@ -1664,7 +1686,8 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { ..Default::default() }, SstableInfo { - id: sst_1.sst_info.id + 2, + object_id: sst_1.sst_info.get_object_id() + 2, + sst_id: sst_1.sst_info.get_object_id() + 2, table_ids: vec![100], key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 2, 2), @@ -1753,14 +1776,14 @@ async fn test_compaction_task_expiration_due_to_split_group() { let sst_1 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 10, + object_id: 10, + sst_id: 10, key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 1, 1), right: iterator_test_key_of_epoch(1, 1, 1), right_exclusive: false, }), table_ids: vec![100, 101], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() @@ -1770,14 +1793,14 @@ async fn test_compaction_task_expiration_due_to_split_group() { let sst_2 = ExtendedSstableInfo { compaction_group_id: 2, sst_info: SstableInfo { - id: 11, + object_id: 11, + sst_id: 11, key_range: Some(KeyRange { left: iterator_test_key_of_epoch(1, 1, 1), right: iterator_test_key_of_epoch(1, 1, 1), right_exclusive: false, }), table_ids: vec![101], - divide_version: 0, min_epoch: 20, max_epoch: 20, ..Default::default() diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 4b5327f5e0d81..d612bb39e45ff 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -155,7 +155,7 @@ impl HummockMetaClient for MockHummockMetaClient { ) -> Result<()> { let sst_to_worker = sstables .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, self.context_id)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id)) .collect(); self.hummock_manager .commit_epoch(epoch, sstables, sst_to_worker) diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 9b0a2f6ea5ed2..bc84636da6d83 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -69,7 +69,7 @@ where let ssts = to_local_sstable_info(&test_tables); let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, context_id)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), context_id)) .collect(); hummock_manager .commit_epoch(epoch, ssts, sst_to_worker) @@ -136,7 +136,7 @@ where let ssts = to_local_sstable_info(&test_tables_3); let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, context_id)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), context_id)) .collect(); hummock_manager .commit_epoch(epoch, ssts, sst_to_worker) @@ -151,7 +151,8 @@ pub fn generate_test_tables(epoch: u64, sst_ids: Vec) -> Vec) -> Vec Vec { - sstables.iter().map(|table| table.id).sorted().collect_vec() +pub fn get_sorted_object_ids(sstables: &[SstableInfo]) -> Vec { + sstables + .iter() + .map(|table| table.get_object_id()) + .sorted() + .collect_vec() } -pub fn get_sorted_committed_sstable_ids(hummock_version: &HummockVersion) -> Vec { +pub fn get_sorted_committed_object_ids(hummock_version: &HummockVersion) -> Vec { let levels = match hummock_version .levels .get(&StaticCompactionGroupId::StateDefault.into()) @@ -280,7 +284,7 @@ pub fn get_sorted_committed_sstable_ids(hummock_version: &HummockVersion) -> Vec .levels .iter() .chain(levels.l0.as_ref().unwrap().sub_levels.iter()) - .flat_map(|levels| levels.table_infos.iter().map(|info| info.id)) + .flat_map(|levels| levels.table_infos.iter().map(|info| info.get_object_id())) .sorted() .collect_vec() } @@ -357,7 +361,7 @@ where { let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, META_NODE_ID)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), META_NODE_ID)) .collect(); hummock_manager_ref .commit_epoch(epoch, ssts, sst_to_worker) @@ -383,7 +387,7 @@ where let ssts = to_local_sstable_info(&test_tables); let sst_to_worker = ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| (sst_info.id, context_id)) + .map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), context_id)) .collect(); hummock_manager .commit_epoch(epoch, ssts, sst_to_worker) diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 0fb7d73e5e2e9..5d59140008dd7 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -404,7 +404,7 @@ mod tests { .first() .unwrap() .iter() - .map(|s| s.id) + .map(|s| s.get_object_id()) .collect_vec(), }) .await @@ -512,7 +512,7 @@ mod tests { let committed_sst_ids = sst_infos .into_iter() .flatten() - .map(|s| s.id) + .map(|s| s.get_object_id()) .sorted() .collect_vec(); assert!(!committed_sst_ids.is_empty()); diff --git a/src/storage/backup/src/lib.rs b/src/storage/backup/src/lib.rs index 44b9c39c9f87b..9e3036fd9655f 100644 --- a/src/storage/backup/src/lib.rs +++ b/src/storage/backup/src/lib.rs @@ -67,7 +67,7 @@ impl MetaSnapshotMetadata { Self { id, hummock_version_id: v.id, - ssts: v.get_sst_ids(), + ssts: v.get_object_ids(), max_committed_epoch: v.max_committed_epoch, safe_epoch: v.safe_epoch, } diff --git a/src/storage/hummock_sdk/src/compact.rs b/src/storage/hummock_sdk/src/compact.rs index 25f6eb553a77f..7faf644972036 100644 --- a/src/storage/hummock_sdk/src/compact.rs +++ b/src/storage/hummock_sdk/src/compact.rs @@ -46,7 +46,7 @@ pub fn compact_task_to_string(compact_task: &CompactTask) -> String { let tables: Vec = level_entry .table_infos .iter() - .map(|table| format!("[id: {}, {}KB]", table.id, table.file_size / 1024)) + .map(|table| format!("[id: {}, {}KB]", table.get_sst_id(), table.file_size / 1024)) .collect(); writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap(); } @@ -76,8 +76,9 @@ pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) let ratio = sstable_info.stale_key_count * 100 / sstable_info.total_key_count; writeln!( s, - "SstableInfo: id={:?}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={:?}KB, delete_ratio={:?}%", - sstable_info.id, + "SstableInfo: object id={:?}, SST id={:?}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={:?}KB, delete_ratio={:?}%", + sstable_info.get_object_id(), + sstable_info.get_sst_id(), left_str, right_str, sstable_info.table_ids, @@ -88,8 +89,9 @@ pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) } else { writeln!( s, - "SstableInfo: id={:?}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={:?}KB", - sstable_info.id, + "SstableInfo: object id={:?}, SST id={:?}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={:?}KB", + sstable_info.get_object_id(), + sstable_info.get_sst_id(), left_str, right_str, sstable_info.table_ids, diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 6bf1aa7f0ac9e..635464d8ec0e2 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -103,7 +103,7 @@ pub trait HummockVersionExt { fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize; fn level_iter bool>(&self, compaction_group_id: CompactionGroupId, f: F); - fn get_sst_ids(&self) -> Vec; + fn get_object_ids(&self) -> Vec; } pub trait HummockVersionUpdateExt { @@ -112,6 +112,7 @@ pub trait HummockVersionUpdateExt { parent_group_id: CompactionGroupId, group_id: CompactionGroupId, member_table_ids: &HashSet, + new_sst_start_id: u64, ) -> Vec; fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) -> Vec; @@ -146,10 +147,16 @@ impl HummockVersionExt for HummockVersion { combined_levels } - fn get_sst_ids(&self) -> Vec { + /// This function does NOT dedup. + fn get_object_ids(&self) -> Vec { self.get_combined_levels() .iter() - .flat_map(|level| level.table_infos.iter().map(|table_info| table_info.id)) + .flat_map(|level| { + level + .table_infos + .iter() + .map(|table_info| table_info.get_object_id()) + }) .collect_vec() } @@ -182,13 +189,12 @@ impl HummockVersionExt for HummockVersion { } pub type SstSplitInfo = ( + // Object id. HummockSstableId, - // Divide version. Counts the number of split of this SST. - u64, - // Level idx of the SSt. - u32, - // The SST is moved to the new group completely. It should be removed from parent group. - bool, + // SST id. + HummockSstableId, + // New SST id in parent group. + Option, ); impl HummockVersionUpdateExt for HummockVersion { @@ -197,7 +203,9 @@ impl HummockVersionUpdateExt for HummockVersion { parent_group_id: CompactionGroupId, group_id: CompactionGroupId, member_table_ids: &HashSet, + new_sst_start_id: u64, ) -> Vec { + let mut new_sst_id = new_sst_start_id; let mut split_id_vers = vec![]; if parent_group_id == StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId || !self.levels.contains_key(&parent_group_id) @@ -225,16 +233,22 @@ impl HummockVersionUpdateExt for HummockVersion { .get_table_ids() .iter() .all(|table_id| member_table_ids.contains(table_id)); + let mut branch_table_info = sst_info.clone(); if !is_trivial { - sst_info.divide_version += 1; + sst_info.sst_id = new_sst_id; + new_sst_id += 1; + branch_table_info.sst_id = new_sst_id; + new_sst_id += 1; } split_id_vers.push(( - sst_info.get_id(), - sst_info.get_divide_version(), - 0, - is_trivial, + branch_table_info.get_object_id(), + branch_table_info.get_sst_id(), + if is_trivial { + None + } else { + Some(sst_info.get_sst_id()) + }, )); - let mut branch_table_info = sst_info.clone(); branch_table_info.table_ids = sst_info .table_ids .drain_filter(|table_id| member_table_ids.contains(table_id)) @@ -260,7 +274,6 @@ impl HummockVersionUpdateExt for HummockVersion { } } for (z, level) in parent_levels.levels.iter_mut().enumerate() { - let level_idx = level.get_level_idx(); for sst_info in &mut level.table_infos { if sst_info .get_table_ids() @@ -271,16 +284,22 @@ impl HummockVersionUpdateExt for HummockVersion { .get_table_ids() .iter() .all(|table_id| member_table_ids.contains(table_id)); + let mut branch_table_info = sst_info.clone(); if !is_trivial { - sst_info.divide_version += 1; + sst_info.sst_id = new_sst_id; + new_sst_id += 1; + branch_table_info.sst_id = new_sst_id; + new_sst_id += 1; } split_id_vers.push(( - sst_info.get_id(), - sst_info.get_divide_version(), - level_idx, - is_trivial, + branch_table_info.get_object_id(), + branch_table_info.get_sst_id(), + if is_trivial { + None + } else { + Some(sst_info.get_sst_id()) + }, )); - let mut branch_table_info = sst_info.clone(); branch_table_info.table_ids = sst_info .table_ids .drain_filter(|table_id| member_table_ids.contains(table_id)) @@ -319,6 +338,7 @@ impl HummockVersionUpdateExt for HummockVersion { parent_group_id, *compaction_group_id, &HashSet::from_iter(group_construct.get_table_ids().iter().cloned()), + group_construct.get_new_sst_start_id(), )); } let has_destroy = summary.group_destroy.is_some(); @@ -399,15 +419,15 @@ impl HummockVersionUpdateExt for HummockVersion { for compaction_group_id in self.get_levels().keys() { self.level_iter(*compaction_group_id, |level| { for table_info in level.get_table_infos() { - let sst_id = table_info.get_id(); - ret.entry(sst_id) + let object_id = table_info.get_object_id(); + ret.entry(object_id) .or_default() - .insert(*compaction_group_id, table_info.get_divide_version()); + .insert(*compaction_group_id, table_info.get_sst_id()); } true }); } - ret.retain(|_, v| v.len() != 1 || *v.values().next().unwrap() != 0); + ret.retain(|object_id, v| v.len() != 1 || *v.values().next().unwrap() != *object_id); ret } } @@ -416,6 +436,7 @@ pub trait HummockLevelsExt { fn get_level0(&self) -> &OverlappingLevel; fn get_level(&self, idx: usize) -> &Level; fn get_level_mut(&mut self, idx: usize) -> &mut Level; + fn count_ssts(&self) -> usize; fn apply_compact_ssts(&mut self, summary: GroupDeltasSummary); } @@ -432,6 +453,15 @@ impl HummockLevelsExt for Levels { &mut self.levels[level_idx - 1] } + fn count_ssts(&self) -> usize { + self.get_level0() + .get_sub_levels() + .iter() + .chain(self.get_levels().iter()) + .map(|level| level.get_table_infos().len()) + .sum() + } + fn apply_compact_ssts(&mut self, summary: GroupDeltasSummary) { let GroupDeltasSummary { delete_sst_levels, @@ -549,8 +579,8 @@ pub fn get_member_table_ids(version: &HummockVersion) -> HashSet { .collect() } -/// Gets all SST ids in `group_id` -pub fn get_compaction_group_sst_ids( +/// Gets all object ids in `group_id` +pub fn get_compaction_group_object_ids( version: &HummockVersion, group_id: CompactionGroupId, ) -> Vec { @@ -563,7 +593,12 @@ pub fn get_compaction_group_sst_ids( .iter() .rev() .chain(group_levels.levels.iter()) - .flat_map(|level| level.table_infos.iter().map(|table_info| table_info.id)) + .flat_map(|level| { + level + .table_infos + .iter() + .map(|table_info| table_info.get_object_id()) + }) .collect_vec() } @@ -635,11 +670,14 @@ pub fn build_version_delta_after_version(version: &HummockVersion) -> HummockVer /// Delete sstables if the table id is in the id set. /// /// Return `true` if some sst is deleted, and `false` is the deletion is trivial -fn level_delete_ssts(operand: &mut Level, delete_sst_ids_superset: &HashSet) -> bool { +fn level_delete_ssts( + operand: &mut Level, + delete_sst_ids_superset: &HashSet, +) -> bool { let original_len = operand.table_infos.len(); operand .table_infos - .retain(|table| !delete_sst_ids_superset.contains(&table.id)); + .retain(|table| !delete_sst_ids_superset.contains(&table.sst_id)); operand.total_file_size = operand .table_infos .iter() @@ -709,7 +747,7 @@ mod tests { max_committed_epoch: 0, safe_epoch: 0, }; - assert_eq!(version.get_sst_ids().len(), 0); + assert_eq!(version.get_object_ids().len(), 0); // Add to sub level version @@ -722,22 +760,24 @@ mod tests { .sub_levels .push(Level { table_infos: vec![SstableInfo { - id: 11, + object_id: 11, + sst_id: 11, ..Default::default() }], ..Default::default() }); - assert_eq!(version.get_sst_ids().len(), 1); + assert_eq!(version.get_object_ids().len(), 1); // Add to non sub level version.levels.get_mut(&0).unwrap().levels.push(Level { table_infos: vec![SstableInfo { - id: 22, + object_id: 22, + sst_id: 22, ..Default::default() }], ..Default::default() }); - assert_eq!(version.get_sst_ids().len(), 2); + assert_eq!(version.get_object_ids().len(), 2); } #[test] @@ -801,7 +841,8 @@ mod tests { delta_type: Some(DeltaType::IntraLevel(IntraLevelDelta { level_idx: 1, inserted_table_infos: vec![SstableInfo { - id: 1, + object_id: 1, + sst_id: 1, ..Default::default() }], ..Default::default() @@ -824,7 +865,8 @@ mod tests { level_idx: 1, level_type: LevelType::Nonoverlapping as i32, table_infos: vec![SstableInfo { - id: 1, + object_id: 1, + sst_id: 1, ..Default::default() }], ..Default::default() diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index e2b316435cefe..51a29b802eb02 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -151,7 +151,8 @@ async fn test_read_version_basic() { let dummy_sst = StagingSstableInfo::new( vec![ LocalSstableInfo::for_test(SstableInfo { - id: 1, + object_id: 1, + sst_id: 1, key_range: Some(KeyRange { left: key_with_epoch(iterator_test_user_key_of(1).encode(), 1), right: key_with_epoch(iterator_test_user_key_of(2).encode(), 2), @@ -162,13 +163,13 @@ async fn test_read_version_basic() { meta_offset: 1, stale_key_count: 1, total_key_count: 1, - divide_version: 0, uncompressed_file_size: 1, min_epoch: 0, max_epoch: 0, }), LocalSstableInfo::for_test(SstableInfo { - id: 2, + object_id: 2, + sst_id: 2, key_range: Some(KeyRange { left: key_with_epoch(iterator_test_user_key_of(3).encode(), 3), right: key_with_epoch(iterator_test_user_key_of(3).encode(), 3), @@ -179,7 +180,6 @@ async fn test_read_version_basic() { meta_offset: 1, stale_key_count: 1, total_key_count: 1, - divide_version: 0, uncompressed_file_size: 1, min_epoch: 0, max_epoch: 0, @@ -233,8 +233,8 @@ async fn test_read_version_basic() { let staging_ssts = staging_sst_iter.cloned().collect_vec(); assert_eq!(2, staging_ssts.len()); - assert_eq!(1, staging_ssts[0].id); - assert_eq!(2, staging_ssts[1].id); + assert_eq!(1, staging_ssts[0].get_object_id()); + assert_eq!(2, staging_ssts[1].get_object_id()); } { @@ -257,7 +257,7 @@ async fn test_read_version_basic() { let staging_ssts = staging_sst_iter.cloned().collect_vec(); assert_eq!(1, staging_ssts.len()); - assert_eq!(2, staging_ssts[0].id); + assert_eq!(2, staging_ssts[0].get_object_id()); } } diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 475aa2bce57cf..2cbb2f587d22f 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1339,7 +1339,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { sync_result .uncommitted_ssts .iter() - .map(|LocalSstableInfo { sst_info, .. }| sst_info.id) + .map(|LocalSstableInfo { sst_info, .. }| sst_info.get_object_id()) .min() .unwrap() }; diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 79976dc420a67..8aa7d7851ec6a 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -180,8 +180,11 @@ impl SstableStreamIterator { fn sst_debug_info(&self) -> String { format!( - "sst_id={}, meta_offset={}, table_ids={:?}", - self.sstable_info.id, self.sstable_info.meta_offset, self.sstable_info.table_ids + "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}", + self.sstable_info.get_object_id(), + self.sstable_info.get_sst_id(), + self.sstable_info.meta_offset, + self.sstable_info.table_ids ) } } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 49e61cf9892c1..253461bab9e0e 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -264,7 +264,7 @@ impl Compactor { context.compactor_metrics.compact_task_pending_num.dec(); for level in &compact_task.input_ssts { for table in &level.table_infos { - context.sstable_store.delete_cache(table.id); + context.sstable_store.delete_cache(table.get_object_id()); } } task_status diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index a11ecae8704ae..07ac46513e55e 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -814,7 +814,8 @@ mod tests { let end_full_key = FullKey::new(TEST_TABLE_ID, TableKey(dummy_table_key()), end_epoch); let gen_sst_id = (start_epoch << 8) + end_epoch; vec![LocalSstableInfo::for_test(SstableInfo { - id: gen_sst_id, + object_id: gen_sst_id, + sst_id: gen_sst_id, key_range: Some(KeyRange { left: start_full_key.encode(), right: end_full_key.encode(), @@ -825,7 +826,6 @@ mod tests { meta_offset: 0, stale_key_count: 0, total_key_count: 0, - divide_version: 0, uncompressed_file_size: 0, min_epoch: 0, max_epoch: 0, diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 2c64578e13d42..974340667f96d 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -404,7 +404,8 @@ impl SstableBuilder { }; let sst_info = SstableInfo { - id: self.sstable_id, + object_id: self.sstable_id, + sst_id: self.sstable_id, key_range: Some(risingwave_pb::hummock::KeyRange { left: meta.smallest_key.clone(), right: meta.largest_key.clone(), @@ -415,7 +416,6 @@ impl SstableBuilder { meta_offset: meta.meta_offset, stale_key_count: self.stale_key_count, total_key_count: self.total_key_count, - divide_version: 0, uncompressed_file_size: uncompressed_file_size + meta.encoded_size() as u64, min_epoch: cmp::min(min_epoch, tombstone_min_epoch), max_epoch: cmp::max(max_epoch, tombstone_max_epoch), diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index a9f5dbd5cbc01..bd52f20b1e041 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -196,7 +196,8 @@ impl Sstable { #[cfg(test)] pub fn get_sstable_info(&self) -> SstableInfo { SstableInfo { - id: self.id, + object_id: self.id, + sst_id: self.id, key_range: Some(KeyRange { left: self.meta.smallest_key.clone(), right: self.meta.largest_key.clone(), @@ -207,7 +208,6 @@ impl Sstable { meta_offset: self.meta.meta_offset, stale_key_count: 0, total_key_count: self.meta.key_count as u64, - divide_version: 0, uncompressed_file_size: self.meta.estimated_size as u64, min_epoch: 0, max_epoch: 0, diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 3d2e9081d1d5a..3383f4ef716aa 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -341,7 +341,7 @@ impl SstableStore { stats: &StoreLocalStatistic, ) -> HummockResult<(TableHolder, u64)> { let mut local_cache_meta_block_miss = 0; - let sst_id = sst.id; + let sst_id = sst.get_object_id(); let result = self .meta_cache .lookup_with_request_dedup::<_, HummockError, _>(sst_id, sst_id, || { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 9b0b47d5e7658..f6ef7308daf11 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -675,7 +675,7 @@ impl HummockVersionReader { for sstable_info in fetch_meta_req { let (sstable, local_cache_meta_block_miss) = flatten_resps.pop().unwrap().unwrap(); - assert_eq!(sstable_info.id, sstable.value().id); + assert_eq!(sstable_info.get_object_id(), sstable.value().id); local_stats.apply_meta_fetch(local_cache_meta_block_miss); if !sstable.value().meta.range_tombstone_list.is_empty() && !read_options.ignore_range_tombstone @@ -701,7 +701,7 @@ impl HummockVersionReader { for sstable_info in fetch_meta_req { let (sstable, local_cache_meta_block_miss) = flatten_resps.pop().unwrap().unwrap(); - assert_eq!(sstable_info.id, sstable.value().id); + assert_eq!(sstable_info.get_object_id(), sstable.value().id); local_stats.apply_meta_fetch(local_cache_meta_block_miss); if !sstable.value().meta.range_tombstone_list.is_empty() && !read_options.ignore_range_tombstone diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 816efd07611c6..c7bc49846ea51 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -98,7 +98,8 @@ pub fn gen_dummy_sst_info( file_size += batch.size() as u64; } SstableInfo { - id, + object_id: id, + sst_id: id, key_range: Some(KeyRange { left: FullKey::for_test(table_id, min_table_key, epoch).encode(), right: FullKey::for_test(table_id, max_table_key, epoch).encode(), @@ -109,7 +110,6 @@ pub fn gen_dummy_sst_info( meta_offset: 0, stale_key_count: 0, total_key_count: 0, - divide_version: 0, uncompressed_file_size: file_size, min_epoch: 0, max_epoch: 0, @@ -173,7 +173,8 @@ pub async fn put_sst( } meta.meta_offset = writer.data_len() as u64; let sst = SstableInfo { - id: sst_id, + object_id: sst_id, + sst_id, key_range: Some(KeyRange { left: meta.smallest_key.clone(), right: meta.largest_key.clone(), @@ -184,7 +185,6 @@ pub async fn put_sst( meta_offset: meta.meta_offset, stale_key_count: 0, total_key_count: 0, - divide_version: 0, uncompressed_file_size: meta.estimated_size as u64, min_epoch: 0, max_epoch: 0, diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index f99c37a305101..cb2a0fb29fe58 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -84,7 +84,7 @@ pub fn validate_table_key_range(version: &HummockVersion) { assert!( t.key_range.is_some(), "key_range in table [{}] is none", - t.id + t.get_object_id() ); } } diff --git a/src/storage/src/hummock/validator.rs b/src/storage/src/hummock/validator.rs index 5012750222c4f..e3cec24dad5c7 100644 --- a/src/storage/src/hummock/validator.rs +++ b/src/storage/src/hummock/validator.rs @@ -36,11 +36,11 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) let mut key_counts = 0; let worker_id = *task .sst_id_to_worker_id - .get(&sst.id) + .get(&sst.object_id) .expect("valid worker_id"); tracing::debug!( "Validating SST {} from worker {}, epoch {}", - sst.id, + sst.get_object_id(), worker_id, task.epoch ); @@ -48,7 +48,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) Ok(holder) => holder, Err(err) => { // One reasonable cause is the SST has been vacuumed. - tracing::warn!("Skip sanity check for SST {}. {}", sst.id, err); + tracing::warn!("Skip sanity check for SST {}. {}", sst.get_object_id(), err); continue; } }; @@ -62,7 +62,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) ); let mut previous_key: Option>> = None; if let Err(err) = iter.rewind().await { - tracing::warn!("Skip sanity check for SST {}. {}", sst.id, err); + tracing::warn!("Skip sanity check for SST {}. {}", sst.get_object_id(), err); } while iter.is_valid() { key_counts += 1; @@ -73,32 +73,39 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) { panic!("SST sanity check failed: Duplicate key {:x?} in SST {} from worker {} and SST {} from worker {}", current_key, - sst.id, + sst.get_object_id(), worker_id, duplicate_sst_id, duplicate_worker_id) } - visited_keys.insert(current_key.to_owned(), (sst.id, worker_id)); + visited_keys.insert(current_key.to_owned(), (sst.get_object_id(), worker_id)); // Ordered and Locally unique if let Some(previous_key) = previous_key.take() { let cmp = previous_key.cmp(¤t_key); if cmp != cmp::Ordering::Less { panic!( "SST sanity check failed: For SST {}, expect {:x?} < {:x?}, got {:#?}", - sst.id, previous_key, current_key, cmp + sst.get_object_id(), + previous_key, + current_key, + cmp ) } } previous_key = Some(current_key); if let Err(err) = iter.next().await { - tracing::warn!("Skip remaining sanity check for SST {}. {}", sst.id, err); + tracing::warn!( + "Skip remaining sanity check for SST {}. {}", + sst.get_object_id(), + err + ); break; } } tracing::debug!( "Validated {} keys for SST {}, epoch {}", key_counts, - sst.id, + sst.get_object_id(), task.epoch ); iter.collect_local_statistic(&mut unused);