Skip to content

Commit

Permalink
feat(meta): support move state-table between compaction-group (#8390)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace authored Mar 22, 2023
1 parent 52caa65 commit 359fbf5
Show file tree
Hide file tree
Showing 12 changed files with 742 additions and 368 deletions.
8 changes: 8 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ message GroupMetaChange {
repeated uint32 table_ids_remove = 2;
}

message GroupTableChange {
repeated uint32 table_ids = 1;
uint64 target_group_id = 2;
uint64 origin_group_id = 3;
uint64 new_sst_start_id = 4;
}

message GroupDestroy {}

message GroupDelta {
Expand All @@ -79,6 +86,7 @@ message GroupDelta {
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4;
GroupTableChange group_table_change = 5;
}
}

Expand Down
241 changes: 157 additions & 84 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::{
CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy,
GroupMetaChange,
compact_task, CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy,
GroupMetaChange, GroupTableChange,
};
use tokio::sync::{OnceCell, RwLock};

Expand Down Expand Up @@ -61,7 +62,7 @@ impl<S: MetaStore> HummockManager<S> {
) -> Result<RwLock<CompactionGroupManager>> {
let compaction_group_manager = RwLock::new(CompactionGroupManager {
compaction_groups: BTreeMap::new(),
provided_default_config_for_test: config,
default_config: config,
});
compaction_group_manager
.write()
Expand Down Expand Up @@ -421,11 +422,24 @@ impl<S: MetaStore> HummockManager<S> {

/// Splits a compaction group into two. The new one will contain `table_ids`.
/// Returns the newly created compaction group id.
#[named]
pub async fn split_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
) -> Result<CompactionGroupId> {
self.move_state_table_to_compaction_group(parent_group_id, table_ids, None, false)
.await
}

/// move some table to another compaction-group. Create a new compaction group if it does not
/// exist.
#[named]
pub async fn move_state_table_to_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
target_group_id: Option<CompactionGroupId>,
allow_split_by_table: bool,
) -> Result<CompactionGroupId> {
if table_ids.is_empty() {
return Ok(parent_group_id);
Expand Down Expand Up @@ -453,120 +467,176 @@ impl<S: MetaStore> HummockManager<S> {
parent_group_id
)));
}
if let Some(compaction_group_id) = target_group_id {
if !versioning.check_branched_sst_in_target_group(
&table_ids,
&parent_group_id,
&compaction_group_id,
) {
return Err(Error::CompactionGroup(format!(
"invalid split attempt for group {}: we shall wait some time for parent group and target group could compact stale sst files",
parent_group_id
)));
}
}

let mut new_version_delta = BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
);

// Remove tables from parent group.
for table_id in &table_ids {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(parent_group_id)
.or_default()
.group_deltas;
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange {
table_ids_remove: vec![*table_id],
..Default::default()
})),
});
}

// Add tables to new group.
let new_group_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::CompactionGroup }>()
.await?;
let new_sst_start_id = self
.env
.id_gen_manager()
.generate_interval::<{ IdCategory::HummockSstableId }>(
versioning.current_version.count_new_ssts_in_group_split(
current_version.count_new_ssts_in_group_split(
parent_group_id,
&HashSet::from_iter(table_ids.iter().cloned()),
HashSet::from_iter(table_ids.clone()),
),
)
.await?;
let group_deltas = &mut new_version_delta
.group_deltas
.entry(new_group_id)
.or_default()
.group_deltas;
let config = self
.compaction_group_manager
.read()
.await
.get_compaction_group_config(new_group_id)
.compaction_config
.as_ref()
.clone();
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupConstruct(GroupConstruct {
group_config: Some(config),
group_id: new_group_id,
parent_group_id,
table_ids,
new_sst_start_id,
})),
});
let mut new_group = None;
let target_compaction_group_id = match target_group_id {
Some(compaction_group_id) => {
match current_version.levels.get(&compaction_group_id) {
Some(group) => {
for table_id in &table_ids {
if group.member_table_ids.contains(table_id) {
return Err(Error::CompactionGroup(format!(
"table {} already exist in group {}",
*table_id, compaction_group_id,
)));
}
}
}
None => {
return Err(Error::CompactionGroup(format!(
"target group {} does not exist",
compaction_group_id,
)));
}
}
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupTableChange(GroupTableChange {
table_ids: table_ids.to_vec(),
origin_group_id: parent_group_id,
target_group_id: compaction_group_id,
new_sst_start_id,
})),
});
compaction_group_id
}
None => {
// All NewCompactionGroup pairs are mapped to one new compaction group.
let new_compaction_group_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::CompactionGroup }>()
.await?;
let mut config = self
.compaction_group_manager
.read()
.await
.get_default_compaction_group_config();
config.split_by_state_table = allow_split_by_table;

new_version_delta.group_deltas.insert(
new_compaction_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta {
delta_type: Some(DeltaType::GroupConstruct(GroupConstruct {
group_config: Some(config.clone()),
group_id: new_compaction_group_id,
parent_group_id,
new_sst_start_id,
table_ids: table_ids.to_vec(),
})),
}],
},
);

new_group = Some((new_compaction_group_id, config));
new_version_delta.group_deltas.insert(
parent_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta {
delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange {
table_ids_remove: table_ids.to_vec(),
..Default::default()
})),
}],
},
);
new_compaction_group_id
}
};
let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts);
let mut trx = Transaction::default();
new_version_delta.apply_to_txn(&mut trx)?;
self.env.meta_store().txn(trx).await?;
if let Some((new_compaction_group_id, config)) = new_group {
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let insert = BTreeMapEntryTransaction::new_insert(
&mut compaction_group_manager.compaction_groups,
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: Arc::new(config),
},
);
insert.apply_to_txn(&mut trx)?;
self.env.meta_store().txn(trx).await?;
insert.commit();
} else {
self.env.meta_store().txn(trx).await?;
}
let sst_split_info = versioning
.current_version
.apply_version_delta(&new_version_delta);
// Updates SST split info
for (object_id, sst_id, parent_old_sst_id, parent_new_sst_id) in sst_split_info {
for (object_id, sst_id, _parent_old_sst_id, parent_new_sst_id) in sst_split_info {
match branched_ssts.get_mut(object_id) {
Some(mut entry) => {
let p = entry.get_mut(&parent_group_id).unwrap();
let parent_pos = p.iter().position(|id| *id == parent_old_sst_id).unwrap();
if let Some(parent_new_sst_id) = parent_new_sst_id {
p[parent_pos] = parent_new_sst_id;
entry.insert(parent_group_id, parent_new_sst_id);
} else {
p.remove(parent_pos);
if p.is_empty() {
entry.remove(&parent_group_id);
}
entry.remove(&parent_group_id);
}
entry.entry(new_group_id).or_default().push(sst_id);
entry.insert(target_compaction_group_id, sst_id);
}
None => {
branched_ssts.insert(
object_id,
if let Some(parent_new_sst_id) = parent_new_sst_id {
[
(parent_group_id, vec![parent_new_sst_id]),
(new_group_id, vec![sst_id]),
]
.into_iter()
.collect()
} else {
[(new_group_id, vec![sst_id])].into_iter().collect()
},
);
let mut groups = HashMap::from_iter([(target_compaction_group_id, sst_id)]);
if let Some(parent_new_sst_id) = parent_new_sst_id {
groups.insert(parent_group_id, parent_new_sst_id);
}
branched_ssts.insert(object_id, groups);
}
}
}
new_version_delta.commit();
branched_ssts.commit_memory();
self.notify_last_version_delta(versioning);

Ok(new_group_id)
// Don't trigger compactions if we enable deterministic compaction
if !self.env.opts.compaction_deterministic_test {
// commit_epoch may contains SSTs from any compaction group
self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim);
self.try_send_compaction_request(
target_compaction_group_id,
compact_task::TaskType::SpaceReclaim,
);
}
Ok(target_compaction_group_id)
}
}

#[derive(Default)]
pub(super) struct CompactionGroupManager {
compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
/// Provided default config, only used in test.
provided_default_config_for_test: CompactionConfig,
default_config: CompactionConfig,
}

impl CompactionGroupManager {
Expand Down Expand Up @@ -602,14 +672,20 @@ impl CompactionGroupManager {
compaction_group_ids
.iter()
.map(|id| {
let group = self.compaction_groups.get(id).cloned().unwrap_or_else(|| {
CompactionGroup::new(*id, self.provided_default_config_for_test.clone())
});
let group = self
.compaction_groups
.get(id)
.cloned()
.unwrap_or_else(|| CompactionGroup::new(*id, self.default_config.clone()));
(*id, group)
})
.collect()
}

fn get_default_compaction_group_config(&self) -> CompactionConfig {
self.default_config.clone()
}

async fn update_compaction_config<S: MetaStore>(
&mut self,
compaction_group_ids: &[CompactionGroupId],
Expand All @@ -621,10 +697,7 @@ impl CompactionGroupManager {
if !compaction_groups.contains_key(compaction_group_id) {
compaction_groups.insert(
*compaction_group_id,
CompactionGroup::new(
*compaction_group_id,
self.provided_default_config_for_test.clone(),
),
CompactionGroup::new(*compaction_group_id, self.default_config.clone()),
);
}
let group = compaction_groups.get(compaction_group_id).unwrap();
Expand Down
Loading

0 comments on commit 359fbf5

Please sign in to comment.