Skip to content

Commit

Permalink
feat(storage): replace hummock protobuf strcut with rust struct (#15386)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jul 25, 2024
1 parent 1b77066 commit 1c2c400
Show file tree
Hide file tree
Showing 92 changed files with 2,805 additions and 1,316 deletions.
10 changes: 5 additions & 5 deletions src/ctl/src/cmd_impl/hummock/list_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub async fn list_version(
let l0 = levels.l0.as_mut().unwrap();
for sub_level in &mut l0.sub_levels {
for t in &mut sub_level.table_infos {
t.key_range = None;
t.remove_key_range();
}
}
}

// l1 ~ lmax
for level in &mut levels.levels {
for t in &mut level.table_infos {
t.key_range = None;
t.remove_key_range();
}
}
});
Expand All @@ -63,18 +63,18 @@ pub async fn list_version(
println!(
"sub_level_id {} type {} sst_num {} size {}",
sub_level.sub_level_id,
sub_level.level_type().as_str_name(),
sub_level.level_type.as_str_name(),
sub_level.table_infos.len(),
sub_level.total_file_size
)
}
}

for level in levels.get_levels() {
for level in &levels.levels {
println!(
"level_idx {} type {} sst_num {} size {}",
level.level_idx,
level.level_type().as_str_name(),
level.level_type.as_str_name(),
level.table_infos.len(),
level.total_file_size
)
Expand Down
11 changes: 6 additions & 5 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAw
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::level::Level;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_pb::hummock::{Level, SstableInfo};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
Expand Down Expand Up @@ -83,11 +84,11 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
if let Some(object_id) = &args.object_id {
if *object_id == sstable_info.get_object_id() {
if *object_id == sstable_info.object_id {
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.get_object_id(),
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
&table_data,
Expand All @@ -100,7 +101,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.get_object_id(),
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
&table_data,
Expand Down Expand Up @@ -161,7 +162,7 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
}

fn print_level(level: &Level, sst_info: &SstableInfo) {
println!("Level Type: {}", level.level_type);
println!("Level Type: {}", level.level_type.as_str_name());
println!("Level Idx: {}", level.level_idx);
if level.level_idx == 0 {
println!("L0 Sub-Level Idx: {}", level.sub_level_id);
Expand Down
6 changes: 3 additions & 3 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use itertools::Itertools;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{version_archive_dir, HummockSstableObjectId, HummockVersionId};
use risingwave_object_store::object::ObjectStoreRef;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::{HummockVersionArchive, SstableInfo};
use risingwave_pb::hummock::HummockVersionArchive;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{Block, BlockHolder, BlockIterator, SstableStoreRef};
Expand Down Expand Up @@ -109,8 +110,7 @@ async fn print_user_key_in_version(
.chain(cg.levels.iter())
{
for sstable_info in &level.table_infos {
use risingwave_hummock_sdk::key_range::KeyRange;
let key_range: KeyRange = sstable_info.key_range.as_ref().unwrap().into();
let key_range = &sstable_info.key_range;
let left_user_key = FullKey::decode(&key_range.left);
let right_user_key = FullKey::decode(&key_range.right);
if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn remove_key_range_from_version(mut version: HummockVersion) -> HummockVersion
.chain(cg.l0.as_mut().unwrap().sub_levels.iter_mut())
{
for sst in &mut level.table_infos {
sst.key_range.take();
sst.remove_key_range();
}
}
}
Expand All @@ -107,7 +107,7 @@ fn version_to_compaction_group_rows(version: &HummockVersion) -> Vec<RwHummockVe
version_id: version.id as _,
max_committed_epoch: version.max_committed_epoch as _,
safe_epoch: version.visible_table_safe_epoch() as _,
compaction_group: json!(cg).into(),
compaction_group: json!(cg.to_protobuf()).into(),
})
.collect()
}
Expand All @@ -117,16 +117,16 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
for cg in version.levels.into_values() {
for level in cg.levels.into_iter().chain(cg.l0.unwrap().sub_levels) {
for sst in level.table_infos {
let key_range = sst.key_range.unwrap();
let key_range = sst.key_range;
sstables.push(RwHummockSstable {
sstable_id: sst.sst_id as _,
object_id: sst.object_id as _,
compaction_group_id: cg.group_id as _,
level_id: level.level_idx as _,
sub_level_id: (level.level_idx == 0).then_some(level.sub_level_id as _),
level_type: level.level_type as _,
key_range_left: key_range.left,
key_range_right: key_range.right,
key_range_left: key_range.left.to_vec(),
key_range_right: key_range.right.to_vec(),
right_exclusive: key_range.right_exclusive,
file_size: sst.file_size as _,
meta_offset: sst.meta_offset as _,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
Expand Down Expand Up @@ -41,7 +44,12 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
max_committed_epoch: d.max_committed_epoch as _,
safe_epoch: d.visible_table_safe_epoch() as _,
trivial_move: d.trivial_move,
group_deltas: json!(d.group_deltas).into(),
group_deltas: json!(d
.group_deltas
.into_iter()
.map(|(group_id, deltas)| (group_id, PbGroupDeltas::from(deltas)))
.collect::<HashMap<u64, PbGroupDeltas>>())
.into(),
})
.collect();
Ok(rows)
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model_v2/src/hummock_sstable_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub sst_id: HummockSstableObjectId,
pub object_id: HummockSstableObjectId,
pub sstable_info: SstableInfo,
pub sstable_info: SstableInfoV2Backend,
}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

crate::derive_from_blob!(SstableInfo, PbSstableInfo);
crate::derive_from_blob!(SstableInfoV2Backend, PbSstableInfo);
2 changes: 1 addition & 1 deletion src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async-trait = "0.1"
either = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = { workspace = true }
prost ={ workspace = true }
prost = { workspace = true }
rand = { workspace = true }
regex = "1"
risingwave_common = { workspace = true }
Expand Down
29 changes: 19 additions & 10 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use compact_task::PbTaskStatus;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::version::HummockVersionDelta;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
Expand Down Expand Up @@ -85,7 +87,7 @@ impl HummockManagerService for HummockServiceImpl {
let current_version = self.hummock_manager.get_current_version().await;
Ok(Response::new(GetCurrentVersionResponse {
status: None,
current_version: Some(current_version.to_protobuf()),
current_version: Some(current_version.into()),
}))
}

Expand All @@ -101,7 +103,7 @@ impl HummockManagerService for HummockServiceImpl {
))
.await?;
Ok(Response::new(ReplayVersionDeltaResponse {
version: Some(version.to_protobuf()),
version: Some(version.into()),
modified_compaction_groups: compaction_groups,
}))
}
Expand All @@ -123,7 +125,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<DisableCommitEpochResponse>, Status> {
let version = self.hummock_manager.disable_commit_epoch().await;
Ok(Response::new(DisableCommitEpochResponse {
current_version: Some(version.to_protobuf()),
current_version: Some(version.into()),
}))
}

Expand All @@ -139,8 +141,8 @@ impl HummockManagerService for HummockServiceImpl {
let resp = ListVersionDeltasResponse {
version_deltas: Some(PbHummockVersionDeltas {
version_deltas: version_deltas
.iter()
.map(HummockVersionDelta::to_protobuf)
.into_iter()
.map(HummockVersionDelta::into)
.collect(),
}),
};
Expand Down Expand Up @@ -234,8 +236,12 @@ impl HummockManagerService for HummockServiceImpl {

// rewrite the key_range
match request.key_range {
Some(key_range) => {
option.key_range = key_range;
Some(pb_key_range) => {
option.key_range = KeyRange {
left: pb_key_range.left.into(),
right: pb_key_range.right.into(),
right_exclusive: pb_key_range.right_exclusive,
};
}

None => {
Expand Down Expand Up @@ -426,7 +432,7 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let version = self.hummock_manager.pin_version(req.context_id).await?;
Ok(Response::new(PinVersionResponse {
pinned_version: Some(version.to_protobuf()),
pinned_version: Some(version.into()),
}))
}

Expand Down Expand Up @@ -464,7 +470,7 @@ impl HummockManagerService for HummockServiceImpl {
) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
checkpoint_version: Some(checkpoint_version.to_protobuf()),
checkpoint_version: Some(checkpoint_version.into()),
}))
}

Expand Down Expand Up @@ -660,7 +666,10 @@ impl HummockManagerService for HummockServiceImpl {
let request = request.into_inner();
let ret = self
.hummock_manager
.cancel_compact_task(request.task_id, request.task_status())
.cancel_compact_task(
request.task_id,
PbTaskStatus::try_from(request.task_status).unwrap(),
)
.await?;

let response = Response::new(CancelCompactTaskResponse { ret });
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl NotificationServiceImpl {

Ok(MetaSnapshot {
tables,
hummock_version: Some(hummock_version.to_protobuf()),
hummock_version: Some(hummock_version.into()),
version: Some(SnapshotVersion {
catalog_version,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn restore_hummock_version(
);
let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
let checkpoint = PbHummockVersionCheckpoint {
version: Some(hummock_version.to_protobuf()),
version: Some(hummock_version.into()),
// Ignore stale objects. Full GC will clear them.
stale_objects: Default::default(),
};
Expand Down
11 changes: 4 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,15 +1160,15 @@ fn collect_commit_epoch_info(
for resp in resps {
let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| {
let sst_info = grouped.sst.expect("field not None");
sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id);
sst_to_worker.insert(sst_info.object_id, resp.worker_id);
LocalSstableInfo::new(
sst_info,
sst_info.into(),
from_prost_table_stats_map(grouped.table_stats_map),
)
});
synced_ssts.extend(ssts_iter);
table_watermarks.push(resp.table_watermarks);
old_value_ssts.extend(resp.old_value_sstables);
old_value_ssts.extend(resp.old_value_sstables.into_iter().map(|s| s.into()));
}
let new_table_fragment_info =
if let Command::CreateStreamingJob { info, .. } = &command_ctx.command {
Expand Down Expand Up @@ -1215,10 +1215,7 @@ fn collect_commit_epoch_info(
watermarks
.into_iter()
.map(|(table_id, watermarks)| {
(
TableId::new(table_id),
TableWatermarks::from_protobuf(&watermarks),
)
(TableId::new(table_id), TableWatermarks::from(&watermarks))
})
.collect()
})
Expand Down
13 changes: 6 additions & 7 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_hummock_sdk::compact_task::CompactTask;
use risingwave_hummock_sdk::level::Levels;
use risingwave_pb::hummock::compact_task::{self, TaskType};

mod picker;
Expand All @@ -30,8 +32,7 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
use risingwave_pb::hummock::{CompactionConfig, LevelType};
pub use selector::{CompactionSelector, CompactionSelectorContext};

use self::selector::{EmergencySelector, LocalSelectorStatistic};
Expand Down Expand Up @@ -145,12 +146,11 @@ impl CompactStatus {
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
if task.task_type() != TaskType::Dynamic && task.task_type() != TaskType::Emergency {
if task.task_type != TaskType::Dynamic && task.task_type != TaskType::Emergency {
return false;
}

if task.input_ssts.len() != 2
|| task.input_ssts[0].level_type() != LevelType::Nonoverlapping
if task.input_ssts.len() != 2 || task.input_ssts[0].level_type != LevelType::Nonoverlapping
{
return false;
}
Expand All @@ -173,7 +173,7 @@ impl CompactStatus {

pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type() == TaskType::VnodeWatermark {
if task.task_type == TaskType::VnodeWatermark {
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
Expand All @@ -186,7 +186,6 @@ impl CompactStatus {
})
}

/// Declares a task as either succeeded, failed or canceled.
pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
for level in &compact_task.input_ssts {
self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
Expand Down
Loading

0 comments on commit 1c2c400

Please sign in to comment.