Skip to content

Commit

Permalink
fix(meta): fix level not exist (#10242)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace authored Jun 8, 2023
1 parent 035aae7 commit bd7cf4d
Show file tree
Hide file tree
Showing 15 changed files with 172 additions and 329 deletions.
47 changes: 25 additions & 22 deletions src/meta/src/hummock/compaction_schedule_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,32 @@ mod tests {
CompactionSchedulePolicy, RoundRobinPolicy, ScoredPolicy,
};
use crate::hummock::test_utils::{
commit_from_meta_node, generate_test_sstables_with_table_id, get_sst_ids,
register_table_ids_to_compaction_group, setup_compute_env_with_config,
commit_from_meta_node, generate_test_tables, get_sst_ids,
register_sstable_infos_to_compaction_group, setup_compute_env_with_config,
to_local_sstable_info,
};
use crate::hummock::HummockManager;
use crate::storage::MetaStore;

async fn add_compact_task<S>(hummock_manager: &HummockManager<S>, _context_id: u32, epoch: u64)
where
S: MetaStore,
{
let original_tables = generate_test_tables(epoch, get_sst_ids(hummock_manager, 2).await);
register_sstable_infos_to_compaction_group(
hummock_manager,
&original_tables,
StaticCompactionGroupId::StateDefault.into(),
)
.await;
commit_from_meta_node(
hummock_manager,
epoch,
to_local_sstable_info(&original_tables),
)
.await
.unwrap();
}

fn dummy_compact_task(task_id: u64, input_file_size: u64) -> CompactTask {
CompactTask {
Expand Down Expand Up @@ -541,26 +563,7 @@ mod tests {
let (_, hummock_manager, _, worker_node) = setup_compute_env_with_config(80, config).await;
let context_id = worker_node.id;
let mut compactor_manager = RoundRobinPolicy::new();
register_table_ids_to_compaction_group(
hummock_manager.as_ref(),
&[1],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
for epoch in 1..3 {
let original_tables = generate_test_sstables_with_table_id(
epoch,
1,
get_sst_ids(hummock_manager.as_ref(), 2).await,
);
commit_from_meta_node(
hummock_manager.as_ref(),
epoch,
to_local_sstable_info(&original_tables),
)
.await
.unwrap();
}
add_compact_task(hummock_manager.as_ref(), context_id, 1).await;

// No compactor available.
assert!(compactor_manager.next_compactor().is_none());
Expand Down
20 changes: 2 additions & 18 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,7 @@ mod tests {
use crate::hummock::compaction_scheduler::{
CompactionRequestChannel, CompactionRequestChannelItem, ScheduleStatus,
};
use crate::hummock::test_utils::{
add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
};
use crate::hummock::test_utils::{add_ssts, setup_compute_env};
use crate::hummock::CompactionScheduler;

#[tokio::test]
Expand Down Expand Up @@ -739,15 +737,7 @@ mod tests {
.await
);

register_table_ids_to_compaction_group(
hummock_manager.as_ref(),
&[1],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
let _sst_infos = add_ssts(1, hummock_manager.as_ref(), context_id).await;
let _sst_infos = add_ssts(2, hummock_manager.as_ref(), context_id).await;
let _sst_infos = add_ssts(3, hummock_manager.as_ref(), context_id).await;

let compactor = hummock_manager.get_idle_compactor().await.unwrap();
// Cannot assign because of invalid compactor
Expand Down Expand Up @@ -781,7 +771,7 @@ mod tests {
);

// Add more SSTs for compaction.
let _sst_infos = add_ssts(4, hummock_manager.as_ref(), context_id).await;
let _sst_infos = add_ssts(2, hummock_manager.as_ref(), context_id).await;

// No idle compactor
assert_eq!(
Expand Down Expand Up @@ -836,12 +826,6 @@ mod tests {
tokio::sync::mpsc::unbounded_channel::<CompactionRequestChannelItem>();
let request_channel = Arc::new(CompactionRequestChannel::new(request_tx));

register_table_ids_to_compaction_group(
hummock_manager.as_ref(),
&[1],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
let _sst_infos = add_ssts(1, hummock_manager.as_ref(), context_id).await;
let _receiver = compactor_manager.add_compactor(context_id, 1, 1);

Expand Down
10 changes: 1 addition & 9 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,7 @@ mod tests {
use risingwave_pb::hummock::CompactTaskProgress;

use crate::hummock::compaction::default_level_selector;
use crate::hummock::test_utils::{
add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
};
use crate::hummock::test_utils::{add_ssts, setup_compute_env};
use crate::hummock::CompactorManager;

#[tokio::test]
Expand All @@ -481,12 +479,6 @@ mod tests {
let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await;
let context_id = worker_node.id;
let compactor_manager = hummock_manager.compactor_manager_ref_for_test();
register_table_ids_to_compaction_group(
hummock_manager.as_ref(),
&[1],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
let _sst_infos = add_ssts(1, hummock_manager.as_ref(), context_id).await;
let _receiver = compactor_manager.add_compactor(context_id, 1, 1);
let _compactor = hummock_manager.get_idle_compactor().await.unwrap();
Expand Down
21 changes: 18 additions & 3 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use core::panic;
use std::borrow::BorrowMut;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, LazyLock};
use std::time::{Duration, Instant};
Expand All @@ -29,7 +29,7 @@ use risingwave_common::monitor::rwlock::MonitoredRwLock;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_hummock_sdk::compact::{compact_task_to_string, estimate_state_for_compaction};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
build_version_delta_after_version, get_compaction_group_ids,
build_version_delta_after_version, get_compaction_group_ids, insert_new_sub_level,
try_get_compaction_group_id_by_table_id, BranchedSstInfo, HummockVersionExt,
HummockVersionUpdateExt,
};
Expand Down Expand Up @@ -1598,6 +1598,11 @@ where
.entry(compaction_group_id)
.or_default()
.group_deltas;
let version_l0 = new_hummock_version
.get_compaction_group_levels_mut(compaction_group_id)
.l0
.as_mut()
.expect("Expect level 0 is not empty");
let l0_sub_level_id = epoch;
let group_delta = GroupDelta {
delta_type: Some(DeltaType::IntraLevel(IntraLevelDelta {
Expand All @@ -1608,8 +1613,18 @@ where
})),
};
group_deltas.push(group_delta);

insert_new_sub_level(
version_l0,
l0_sub_level_id,
LevelType::Overlapping,
group_sstables,
None,
);
}
new_hummock_version.apply_version_delta(new_version_delta.deref());

// Create a new_version, possibly merely to bump up the version id and max_committed_epoch.
new_hummock_version.max_committed_epoch = epoch;

// Apply stats changes.
let mut version_stats = VarTransaction::new(&mut versioning.version_stats);
Expand Down
41 changes: 10 additions & 31 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,7 @@ async fn test_hummock_compaction_task() {

// Add some sstables and commit.
let epoch: u64 = 1;
let original_tables = generate_test_sstables_with_table_id(
epoch,
1,
get_sst_ids(&hummock_manager, sst_num).await,
);
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await);
register_sstable_infos_to_compaction_group(
&hummock_manager,
&original_tables,
Expand Down Expand Up @@ -734,8 +730,7 @@ async fn test_print_compact_task() {
let (_, hummock_manager, _cluster_manager, _) = setup_compute_env(80).await;
// Add some sstables and commit.
let epoch: u64 = 1;
let original_tables =
generate_test_sstables_with_table_id(epoch, 1, get_sst_ids(&hummock_manager, 2).await);
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await);
register_sstable_infos_to_compaction_group(
&hummock_manager,
&original_tables,
Expand Down Expand Up @@ -949,14 +944,10 @@ async fn test_hummock_compaction_task_heartbeat() {

// Add some sstables and commit.
let epoch: u64 = 1;
let original_tables = generate_test_sstables_with_table_id(
epoch,
1,
get_sst_ids(&hummock_manager, sst_num).await,
);
register_table_ids_to_compaction_group(
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await);
register_sstable_infos_to_compaction_group(
&hummock_manager,
&[1],
&original_tables,
StaticCompactionGroupId::StateDefault.into(),
)
.await;
Expand Down Expand Up @@ -1081,14 +1072,10 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() {

// Add some sstables and commit.
let epoch: u64 = 1;
let original_tables = generate_test_sstables_with_table_id(
epoch,
1,
get_sst_ids(&hummock_manager, sst_num).await,
);
register_table_ids_to_compaction_group(
let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await);
register_sstable_infos_to_compaction_group(
&hummock_manager,
&[1],
&original_tables,
StaticCompactionGroupId::StateDefault.into(),
)
.await;
Expand Down Expand Up @@ -1466,11 +1453,7 @@ async fn test_split_compaction_group_on_demand_basic() {
sst_info: SstableInfo {
object_id: 10,
sst_id: 10,
key_range: Some(KeyRange {
left: iterator_test_key_of_epoch(100, 1, 20),
right: iterator_test_key_of_epoch(100, 100, 20),
right_exclusive: false,
}),
key_range: None,
table_ids: vec![100],
min_epoch: 20,
max_epoch: 20,
Expand All @@ -1483,11 +1466,7 @@ async fn test_split_compaction_group_on_demand_basic() {
sst_info: SstableInfo {
object_id: 11,
sst_id: 11,
key_range: Some(KeyRange {
left: iterator_test_key_of_epoch(100, 101, 20),
right: iterator_test_key_of_epoch(101, 100, 20),
right_exclusive: false,
}),
key_range: None,
table_ids: vec![100, 101],
min_epoch: 20,
max_epoch: 20,
Expand Down
50 changes: 10 additions & 40 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ where
{
// Increase version by 2.
let mut epoch: u64 = 1;
let sstable_ids = get_sst_ids(hummock_manager, 3).await;
let test_tables = generate_test_sstables_with_table_id(epoch, 1, sstable_ids);
let table_ids = get_sst_ids(hummock_manager, 3).await;
let test_tables = generate_test_tables(epoch, table_ids);
register_sstable_infos_to_compaction_group(
hummock_manager,
&test_tables,
Expand Down Expand Up @@ -147,41 +147,6 @@ where
vec![test_tables, test_tables_2, test_tables_3]
}

pub fn generate_test_sstables_with_table_id(
epoch: u64,
table_id: u32,
sst_ids: Vec<HummockSstableObjectId>,
) -> Vec<SstableInfo> {
let mut sst_info = vec![];
for (i, sst_id) in sst_ids.into_iter().enumerate() {
sst_info.push(SstableInfo {
object_id: sst_id,
sst_id,
key_range: Some(KeyRange {
left: key_with_epoch(
format!("{:03}\0\0_key_test_{:05}", table_id, i + 1)
.as_bytes()
.to_vec(),
epoch,
),
right: key_with_epoch(
format!("{:03}\0\0_key_test_{:05}", table_id, (i + 1) * 10)
.as_bytes()
.to_vec(),
epoch,
),
right_exclusive: false,
}),
file_size: 2,
table_ids: vec![table_id],
uncompressed_file_size: 2,
max_epoch: epoch,
..Default::default()
});
}
sst_info
}

pub fn generate_test_tables(epoch: u64, sst_ids: Vec<HummockSstableObjectId>) -> Vec<SstableInfo> {
let mut sst_info = vec![];
for (i, sst_id) in sst_ids.into_iter().enumerate() {
Expand All @@ -196,7 +161,6 @@ pub fn generate_test_tables(epoch: u64, sst_ids: Vec<HummockSstableObjectId>) ->
file_size: 2,
table_ids: vec![sst_id as u32, sst_id as u32 * 10000],
uncompressed_file_size: 2,
max_epoch: epoch,
..Default::default()
});
}
Expand Down Expand Up @@ -263,7 +227,7 @@ pub fn iterator_test_key_of_epoch(
) -> Vec<u8> {
// key format: {prefix_index}_version
key_with_epoch(
format!("{:03}\0\0_key_test_{:05}", table, idx)
format!("{:03}_key_test_{:05}", table, idx)
.as_bytes()
.to_vec(),
ts,
Expand Down Expand Up @@ -396,7 +360,13 @@ where
S: MetaStore,
{
let table_ids = get_sst_ids(hummock_manager, 3).await;
let test_tables = generate_test_sstables_with_table_id(epoch, 1, table_ids);
let test_tables = generate_test_tables(epoch, table_ids);
register_sstable_infos_to_compaction_group(
hummock_manager,
&test_tables,
StaticCompactionGroupId::StateDefault.into(),
)
.await;
let ssts = to_local_sstable_info(&test_tables);
let sst_to_worker = ssts
.iter()
Expand Down
Loading

0 comments on commit bd7cf4d

Please sign in to comment.