Skip to content

Commit

Permalink
feat(storage): Introduces a split implementation based on split_key t…
Browse files Browse the repository at this point in the history
…o replace the previous move implementation. (#18594)
  • Loading branch information
Li0k authored Sep 29, 2024
1 parent 5e20f2d commit 83e3c4b
Show file tree
Hide file tree
Showing 19 changed files with 1,601 additions and 588 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3352,13 +3352,13 @@ def section_hummock_manager(outer_panels):
"The times of move_state_table occurs",
[
panels.target(
f"sum({table_metric('storage_move_state_table_count')}) by (group)",
"move table cg{{group}}",
f"sum({table_metric('storage_split_compaction_group_count')}) by (group)",
"split compaction group cg{{group}}",
),

panels.target(
f"sum({table_metric('storage_merge_compaction_group_count')}) by (group)",
"merge_compaction_group_count cg-{{group}}",
"merge compaction group cg{{group}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum CompatibilityVersion {
VERSION_UNSPECIFIED = 0;
NO_TRIVIAL_SPLIT = 1;
NO_MEMBER_TABLE_IDS = 2;
SPLIT_GROUP_BY_TABLE_ID = 3;
}

message GroupConstruct {
Expand All @@ -85,6 +86,13 @@ message GroupConstruct {
uint64 group_id = 4;
uint64 new_sst_start_id = 5;
CompatibilityVersion version = 6;

// The split_key is the key that the group is split by.
// When GroupConstruct with commit_epoch, split_key will be empty
// When split_key is not None, GroupConstruct tells to use split_key to check each level and split the sstable_info in the level into two groups (bounded by split_key).
// For the left sstable_info, split_key (right_exclusive=true) will be used as key_range_right.
// In the current implementation split_key always contains a table_id, vnode = 0, epoch = MAX
optional bytes split_key = 7;
}

message GroupDestroy {}
Expand Down Expand Up @@ -665,6 +673,7 @@ message RiseCtlUpdateCompactionConfigRequest {
CompressionAlgorithm compression_algorithm = 17;
uint32 max_l0_compact_level_count = 18;
uint64 sst_allowed_trivial_move_min_size = 19;
uint32 split_weight_by_vnode = 20;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down
9 changes: 7 additions & 2 deletions src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,13 @@ impl HummockManagerService for HummockServiceImpl {
let req = request.into_inner();
let new_group_id = self
.hummock_manager
.split_compaction_group(req.group_id, &req.table_ids, req.partition_vnode_count)
.await?;
.move_state_tables_to_dedicated_compaction_group(
req.group_id,
&req.table_ids,
req.partition_vnode_count,
)
.await?
.0;
Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
}

Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::split_sst;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::{
Expand Down Expand Up @@ -390,7 +390,8 @@ impl HummockManager {
})
.sum();

let branch_sst = split_sst(
// TODO(li0k): replace with `split_sst`
let branch_sst = split_sst_with_table_ids(
&mut sst.sst_info,
&mut new_sst_id,
origin_sst_size - new_sst_size,
Expand Down
178 changes: 6 additions & 172 deletions src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,17 @@ use std::sync::Arc;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_hummock_sdk::compact_task::ReportTask;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
get_compaction_group_ids, TableGroupInfo,
};
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
use risingwave_hummock_sdk::version::GroupDelta;
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_meta_model_v2::compaction_config;
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
CompactionConfig, CompactionGroupInfo, CompatibilityVersion, PbGroupConstruct, PbGroupDestroy,
PbStateTableInfoDelta,
CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
};
use tokio::sync::OnceCell;

Expand All @@ -45,7 +42,7 @@ use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::{commit_multi_var, HummockManager};
use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::model::CompactionGroup;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
use crate::hummock::sequence::next_compaction_group_id;
use crate::manager::{MetaSrvEnv, MetaStoreImpl};
use crate::model::{
BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModel,
Expand Down Expand Up @@ -422,172 +419,6 @@ impl HummockManager {
results
}

/// move some table to another compaction-group. Create a new compaction group if it does not
/// exist.
pub async fn move_state_table_to_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
partition_vnode_count: u32,
) -> Result<CompactionGroupId> {
if table_ids.is_empty() {
return Ok(parent_group_id);
}
let table_ids = table_ids.iter().cloned().unique().collect_vec();
let compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
// Validate parameters.
if !versioning
.current_version
.levels
.contains_key(&parent_group_id)
{
return Err(Error::CompactionGroup(format!(
"invalid group {}",
parent_group_id
)));
}

for table_id in &table_ids {
if !versioning
.current_version
.state_table_info
.compaction_group_member_table_ids(parent_group_id)
.contains(&TableId::new(*table_id))
{
return Err(Error::CompactionGroup(format!(
"table {} doesn't in group {}",
table_id, parent_group_id
)));
}
}

if table_ids.len()
== versioning
.current_version
.state_table_info
.compaction_group_member_table_ids(parent_group_id)
.len()
{
return Err(Error::CompactionGroup(format!(
"invalid split attempt for group {}: all member tables are moved",
parent_group_id
)));
}
let mut version = HummockVersionTransaction::new(
&mut versioning.current_version,
&mut versioning.hummock_version_deltas,
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta(None);

let new_sst_start_id = next_sstable_object_id(
&self.env,
new_version_delta
.latest_version()
.count_new_ssts_in_group_split(
parent_group_id,
HashSet::from_iter(table_ids.clone()),
),
)
.await?;
let (new_group, target_compaction_group_id) = {
{
// All NewCompactionGroup pairs are mapped to one new compaction group.
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
// The new config will be persisted later.
let mut config = self
.compaction_group_manager
.read()
.await
.default_compaction_config()
.as_ref()
.clone();
config.split_weight_by_vnode = partition_vnode_count;

#[expect(deprecated)]
// fill the deprecated field with default value
new_version_delta.group_deltas.insert(
new_compaction_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta::GroupConstruct(PbGroupConstruct {
group_config: Some(config.clone()),
group_id: new_compaction_group_id,
parent_group_id,
new_sst_start_id,
table_ids: vec![],
version: CompatibilityVersion::NoMemberTableIds as i32,
})],
},
);
((new_compaction_group_id, config), new_compaction_group_id)
}
};

let (new_compaction_group_id, config) = new_group;
new_version_delta.with_latest_version(|version, new_version_delta| {
for table_id in &table_ids {
let table_id = TableId::new(*table_id);
let info = version
.state_table_info
.info()
.get(&table_id)
.expect("have check exist previously");
assert!(new_version_delta
.state_table_info_delta
.insert(
table_id,
PbStateTableInfoDelta {
committed_epoch: info.committed_epoch,
compaction_group_id: new_compaction_group_id,
}
)
.is_none());
}
});
{
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
compaction_groups_txn
.create_compaction_groups(new_compaction_group_id, Arc::new(config));

new_version_delta.pre_apply();
commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
}
// Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
versioning.mark_next_time_travel_version_snapshot();
let mut canceled_tasks = vec![];
for task_assignment in compaction_guard.compact_task_assignment.values() {
if let Some(task) = task_assignment.compact_task.as_ref() {
let need_cancel = HummockManager::is_compact_task_expired(
&task.into(),
&versioning.current_version,
);
if need_cancel {
canceled_tasks.push(ReportTask {
task_id: task.task_id,
task_status: TaskStatus::ManualCanceled,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
});
}
}
}

drop(versioning_guard);
drop(compaction_guard);
self.report_compact_tasks(canceled_tasks).await?;

self.metrics
.move_state_table_count
.with_label_values(&[&parent_group_id.to_string()])
.inc();

Ok(target_compaction_group_id)
}

pub async fn calculate_compaction_group_statistic(&self) -> Vec<TableGroupInfo> {
let mut infos = vec![];
{
Expand Down Expand Up @@ -755,6 +586,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
target.sst_allowed_trivial_move_min_size = Some(*c);
}
MutableConfig::SplitWeightByVnode(c) => {
target.split_weight_by_vnode = *c;
}
}
}
}
Expand Down
Loading

0 comments on commit 83e3c4b

Please sign in to comment.