Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): replace hummock protobuf strcut with rust struct #15386

Merged
merged 28 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1f30ff7
feat(storage): replace pb with struct
Li0k Mar 1, 2024
163ff20
fix(storage): fix check
Li0k Mar 4, 2024
e866deb
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 4, 2024
940b233
refactor(storage): refactor and reduce copy
Li0k Mar 4, 2024
9a8ebbc
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 4, 2024
afd7469
fix(storage): address comments
Li0k Mar 8, 2024
9f9707e
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Mar 8, 2024
3bf2bf3
fix(storage): fix compile
Li0k Mar 8, 2024
4f72cba
fix(storage): fix check
Li0k Mar 8, 2024
0a75e90
fix(storage): fix check
Li0k Mar 8, 2024
5672d61
refactor(storage): refactor version
Li0k May 7, 2024
1758269
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 7, 2024
480fc62
fix(storage): fix compile
Li0k May 9, 2024
5fff8ae
fix(storage): fix check
Li0k May 9, 2024
b912567
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k May 9, 2024
ebcc00b
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 11, 2024
c5d4b7f
fix(storage): address comments
Li0k Jul 11, 2024
7a1fbb1
fix(storage): address check
Li0k Jul 12, 2024
705f7b0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 23, 2024
bf1addf
fix(storage): address comments
Li0k Jul 23, 2024
a22660b
fix(storage): fix compile
Li0k Jul 23, 2024
ee43cd3
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 23, 2024
75d72d4
fix(storage): fix sync_point and bench code
Li0k Jul 23, 2024
2de6a48
fix(storage): reorg and address comments
Li0k Jul 25, 2024
7eae479
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
6018f58
fix(storage): fix compile
Li0k Jul 25, 2024
ffc5ac6
fix(storage): fix test compile
Li0k Jul 25, 2024
66011da
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Li0k Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAw
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::version::{Level, SstableInfo};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_pb::hummock::{Level, SstableInfo};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
Expand Down Expand Up @@ -156,7 +156,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}

fn print_level(level: &Level, sst_info: &SstableInfo) {
println!("Level Type: {}", level.level_type);
println!("Level Type: {}", level.level_type.as_str_name());
println!("Level Idx: {}", level.level_idx);
if level.level_idx == 0 {
println!("L0 Sub-Level Idx: {}", level.sub_level_id);
Expand Down
7 changes: 3 additions & 4 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta, SstableInfo};
use risingwave_hummock_sdk::{version_archive_dir, HummockSstableObjectId, HummockVersionId};
use risingwave_object_store::object::ObjectStoreRef;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::{HummockVersionArchive, SstableInfo};
use risingwave_pb::hummock::HummockVersionArchive;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
Expand Down Expand Up @@ -107,8 +107,7 @@ async fn print_user_key_in_version(
.chain(cg.levels.iter())
{
for sstable_info in &level.table_infos {
use risingwave_hummock_sdk::key_range::KeyRange;
let key_range: KeyRange = sstable_info.key_range.as_ref().unwrap().into();
let key_range = sstable_info.key_range.as_ref().unwrap();
let left_user_key = FullKey::decode(&key_range.left);
let right_user_key = FullKey::decode(&key_range.right);
if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
level_id: level.level_idx as _,
sub_level_id: (level.level_idx == 0).then_some(level.sub_level_id as _),
level_type: level.level_type as _,
key_range_left: key_range.left,
key_range_right: key_range.right,
key_range_left: key_range.left.to_vec(),
key_range_right: key_range.right.to_vec(),
right_exclusive: key_range.right_exclusive,
file_size: sst.file_size as _,
meta_offset: sst.meta_offset as _,
Expand Down
1 change: 1 addition & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
bytes = { version = "1" }
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = { workspace = true }
Expand Down
24 changes: 15 additions & 9 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
Expand Down Expand Up @@ -85,7 +86,7 @@ impl HummockManagerService for HummockServiceImpl {
let current_version = self.hummock_manager.get_current_version().await;
Ok(Response::new(GetCurrentVersionResponse {
status: None,
current_version: Some(current_version.to_protobuf()),
current_version: Some(current_version.into()),
}))
}

Expand All @@ -101,7 +102,7 @@ impl HummockManagerService for HummockServiceImpl {
))
.await?;
Ok(Response::new(ReplayVersionDeltaResponse {
version: Some(version.to_protobuf()),
version: Some(version.into()),
modified_compaction_groups: compaction_groups,
}))
}
Expand All @@ -123,7 +124,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<DisableCommitEpochResponse>, Status> {
let version = self.hummock_manager.disable_commit_epoch().await;
Ok(Response::new(DisableCommitEpochResponse {
current_version: Some(version.to_protobuf()),
current_version: Some(version.into()),
}))
}

Expand All @@ -139,8 +140,8 @@ impl HummockManagerService for HummockServiceImpl {
let resp = ListVersionDeltasResponse {
version_deltas: Some(PbHummockVersionDeltas {
version_deltas: version_deltas
.iter()
.map(HummockVersionDelta::to_protobuf)
.into_iter()
.map(HummockVersionDelta::into)
.collect(),
}),
};
Expand Down Expand Up @@ -224,6 +225,7 @@ impl HummockManagerService for HummockServiceImpl {
&self,
request: Request<TriggerManualCompactionRequest>,
) -> Result<Response<TriggerManualCompactionResponse>, Status> {
use bytes::Bytes;
let request = request.into_inner();
let compaction_group_id = request.compaction_group_id;
let mut option = ManualCompactionOption {
Expand All @@ -234,8 +236,12 @@ impl HummockManagerService for HummockServiceImpl {

// rewrite the key_range
match request.key_range {
Some(key_range) => {
option.key_range = key_range;
Some(pb_key_range) => {
option.key_range = KeyRange {
left: Bytes::from(pb_key_range.left),
Li0k marked this conversation as resolved.
Show resolved Hide resolved
right: Bytes::from(pb_key_range.right),
right_exclusive: pb_key_range.right_exclusive,
};
}

None => {
Expand Down Expand Up @@ -426,7 +432,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let version = self.hummock_manager.pin_version(req.context_id).await?;
Ok(Response::new(PinVersionResponse {
pinned_version: Some(version.to_protobuf()),
pinned_version: Some(version.into()),
}))
}

Expand Down Expand Up @@ -464,7 +470,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
checkpoint_version: Some(checkpoint_version.to_protobuf()),
checkpoint_version: Some(checkpoint_version.into()),
}))
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl NotificationServiceImpl {

Ok(MetaSnapshot {
tables,
hummock_version: Some(hummock_version.to_protobuf()),
hummock_version: Some(hummock_version.into()),
version: Some(SnapshotVersion {
catalog_version,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn restore_hummock_version(
);
let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
let checkpoint = PbHummockVersionCheckpoint {
version: Some(hummock_version.to_protobuf()),
version: Some(hummock_version.into()),
// Ignore stale objects. Full GC will clear them.
stale_objects: Default::default(),
};
Expand Down
10 changes: 4 additions & 6 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
use risingwave_hummock_sdk::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks,
};
use risingwave_hummock_sdk::version::SstableInfo;
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -1161,13 +1162,13 @@ fn collect_commit_epoch_info(
sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id);
ExtendedSstableInfo::new(
grouped.compaction_group_id,
sst_info,
SstableInfo::from(sst_info),
grouped.table_stats_map,
)
});
synced_ssts.extend(ssts_iter);
table_watermarks.push(resp.table_watermarks);
old_value_ssts.extend(resp.old_value_sstables);
old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
}
let new_table_fragment_info = if let Command::CreateStreamingJob {
table_fragments, ..
Expand Down Expand Up @@ -1213,10 +1214,7 @@ fn collect_commit_epoch_info(
watermarks
.into_iter()
.map(|(table_id, watermarks)| {
(
TableId::new(table_id),
TableWatermarks::from_protobuf(&watermarks),
)
(TableId::new(table_id), TableWatermarks::from(&watermarks))
})
.collect()
})
Expand Down
10 changes: 8 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::version::{CompactTask, Levels};
use risingwave_pb::hummock::compact_task::{self, TaskType};

mod picker;
Expand All @@ -28,8 +29,7 @@ use std::sync::Arc;
use picker::{LevelCompactionPicker, TierCompactionPicker};
use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
use risingwave_pb::hummock::{CompactionConfig, LevelType, PbCompactTask};
pub use selector::CompactionSelector;

use self::selector::{EmergencySelector, LocalSelectorStatistic};
Expand Down Expand Up @@ -174,6 +174,12 @@ impl CompactStatus {
}

/// Declares a task as either succeeded, failed or canceled.
pub fn report_compact_task_pb(&mut self, compact_task: &PbCompactTask) {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
for level in &compact_task.input_ssts {
self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
Li0k marked this conversation as resolved.
Show resolved Hide resolved
}
}

pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
for level in &compact_task.input_ssts {
self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::fmt::Debug;
use std::ops::Range;

use itertools::Itertools;
use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_hummock_sdk::version::SstableInfo;
use risingwave_hummock_sdk::KeyComparator;
use risingwave_pb::hummock::{KeyRange, SstableInfo};

pub trait OverlapInfo: Debug {
fn check_overlap(&self, a: &SstableInfo) -> bool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::Arc;

use itertools::Itertools;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel};
use risingwave_hummock_sdk::version::{InputLevel, Level, Levels, OverlappingLevel};
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
use super::{
Expand Down Expand Up @@ -51,7 +51,7 @@ impl CompactionPicker for LevelCompactionPicker {
if l0.sub_levels.is_empty() {
return None;
}
if l0.sub_levels[0].level_type != LevelType::Nonoverlapping as i32
if l0.sub_levels[0].level_type != LevelType::Nonoverlapping
&& l0.sub_levels[0].table_infos.len() > 1
{
stats.skip_by_overlapping += 1;
Expand Down Expand Up @@ -230,7 +230,7 @@ impl LevelCompactionPicker {
.into_iter()
.map(|table_infos| InputLevel {
level_idx: 0,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos,
})
.collect_vec();
Expand Down Expand Up @@ -406,7 +406,7 @@ pub mod tests {

let levels = vec![Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos: vec![
generate_table(3, 1, 0, 50, 1),
generate_table(4, 1, 150, 200, 1),
Expand Down Expand Up @@ -470,7 +470,7 @@ pub mod tests {
let mut picker = create_compaction_picker_for_test();
let levels = vec![Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos: vec![],
total_file_size: 0,
sub_level_id: 0,
Expand Down Expand Up @@ -535,7 +535,7 @@ pub mod tests {
let mut levels = Levels {
levels: vec![Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
level_type: LevelType::Nonoverlapping,
table_infos: vec![
generate_table(1, 1, 100, 399, 2),
generate_table(2, 1, 400, 699, 2),
Expand Down Expand Up @@ -583,7 +583,7 @@ pub mod tests {
]);
// We can set level_type only because the input above is valid.
for s in &mut l0.sub_levels {
s.level_type = LevelType::Nonoverlapping as i32;
s.level_type = LevelType::Nonoverlapping;
}
let levels = Levels {
l0: Some(l0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_hummock_sdk::version::Levels;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::{
Expand Down Expand Up @@ -55,22 +55,20 @@ impl EmergencyCompactionPicker {
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping as i32)
.filter(|level| level.level_type == LevelType::Overlapping)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count == 0
level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count > 0
level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count > 0
})
.count();
// We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
Expand Down
Loading
Loading