Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): add file number limits to TWCS compaction #4481

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ pub fn new_picker(
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_runs,
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_runs,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
Expand Down
26 changes: 26 additions & 0 deletions src/mito2/src/compaction/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,29 @@ pub fn new_file_handle(
file_purger,
)
}

pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
let file_purger = new_noop_file_purger();
file_specs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
},
file_purger.clone(),
)
})
.collect()
}
98 changes: 90 additions & 8 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_time::Timestamp;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, reduce_runs};
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, FileId, Level};
use crate::sst::version::LevelMeta;
Expand All @@ -35,7 +35,9 @@ const LEVEL_COMPACTED: Level = 1;
/// candidates.
pub struct TwcsPicker {
max_active_window_runs: usize,
max_active_window_files: usize,
max_inactive_window_runs: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
}

Expand All @@ -51,13 +53,17 @@ impl Debug for TwcsPicker {
impl TwcsPicker {
pub fn new(
max_active_window_runs: usize,
max_active_window_files: usize,
max_inactive_window_runs: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
) -> Self {
Self {
max_inactive_window_runs,
max_active_window_runs,
time_window_seconds,
max_active_window_files,
max_inactive_window_files,
}
}

Expand All @@ -73,12 +79,15 @@ impl TwcsPicker {
for (window, files) in time_windows {
let sorted_runs = find_sorted_runs(&mut files.files);

let max_runs = if let Some(active_window) = active_window
let (max_runs, max_files) = if let Some(active_window) = active_window
&& *window == active_window
{
self.max_active_window_runs
(self.max_active_window_runs, self.max_active_window_files)
} else {
self.max_inactive_window_runs
(
self.max_inactive_window_runs,
self.max_inactive_window_files,
)
};

// we only remove deletion markers once no file in current window overlaps with any other window.
Expand All @@ -87,16 +96,26 @@ impl TwcsPicker {

if found_runs > max_runs {
let files_to_compact = reduce_runs(sorted_runs, max_runs);
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}", active_window, *window,max_runs, found_runs, files_to_compact.len());
info!("Building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, output size: {}, remove deletion markers: {}", active_window, *window,max_runs, found_runs, files_to_compact.len(), filter_deleted);
for inputs in files_to_compact {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs,
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.});
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
}
} else if files.files.len() > max_files {
// Files in window exceeds file num limit
let to_merge = enforce_file_num(&files.files, max_files);
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: to_merge,
filter_deleted,
output_time_range: None,
});
} else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
}
Expand All @@ -105,6 +124,31 @@ impl TwcsPicker {
}
}

/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
/// the solution with minimum overhead according to files sizes to be merged.
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
/// `runs` must be sorted according to time ranges.
fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
debug_assert!(files.len() > max_file_num);
let to_merge = files.len() - max_file_num + 1;
let mut min_penalty = usize::MAX;
let mut min_idx = 0;

for idx in 0..=(files.len() - to_merge) {
let current_penalty: usize = files
.iter()
.skip(idx)
.take(to_merge)
.map(|f| f.size())
.sum();
if current_penalty < min_penalty {
min_penalty = current_penalty;
min_idx = idx;
}
}
files.iter().skip(min_idx).take(to_merge).cloned().collect()
}

impl Picker for TwcsPicker {
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
let region_id = compaction_region.region_id;
Expand Down Expand Up @@ -264,7 +308,7 @@ mod tests {
use std::collections::HashSet;

use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::Level;

#[test]
Expand Down Expand Up @@ -482,7 +526,8 @@ mod tests {
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output = TwcsPicker::new(4, 1, None).build_output(&mut windows, active_window);
let output = TwcsPicker::new(4, usize::MAX, 1, usize::MAX, None)
.build_output(&mut windows, active_window);

let output = output
.iter()
Expand Down Expand Up @@ -514,6 +559,43 @@ mod tests {
output_level: Level,
}

fn check_enforce_file_num(
input_files: &[(i64, i64, u64)],
max_file_num: usize,
files_to_merge: &[(i64, i64)],
) {
let mut files = new_file_handles(input_files);
// ensure sorted
find_sorted_runs(&mut files);
let mut to_merge = enforce_file_num(&files, max_file_num);
to_merge.sort_unstable_by_key(|f| f.time_range().0);
assert_eq!(
files_to_merge.to_vec(),
to_merge
.iter()
.map(|f| {
let (start, end) = f.time_range();
(start.value(), end.value())
})
.collect::<Vec<_>>()
);
}

#[test]
fn test_enforce_file_num() {
check_enforce_file_num(
&[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
2,
&[(100, 200), (200, 400)],
);

check_enforce_file_num(
&[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
1,
&[(0, 300), (100, 200), (200, 400)],
);
}

#[test]
fn test_build_twcs_output() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();

Expand Down
20 changes: 19 additions & 1 deletion src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,15 @@ pub struct TwcsOptions {
/// Max num of sorted runs that can be kept in active writing time window.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_runs: usize,
/// Max num of files that can be kept in inactive time window.
/// Max num of files in the active window.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_files: usize,
/// Max num of sorted runs that can be kept in inactive time windows.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_runs: usize,
/// Max num of files in inactive time windows.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_files: usize,
/// Compaction time window defined when creating tables.
#[serde(with = "humantime_serde")]
pub time_window: Option<Duration>,
Expand Down Expand Up @@ -217,7 +223,9 @@ impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_runs: 4,
max_active_window_files: 4,
max_inactive_window_runs: 1,
max_inactive_window_files: 1,
time_window: None,
remote_compaction: false,
}
Expand Down Expand Up @@ -576,7 +584,9 @@ mod tests {
let map = make_map(&[
("ttl", "7d"),
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.max_active_window_files", "11"),
("compaction.twcs.max_inactive_window_runs", "2"),
("compaction.twcs.max_inactive_window_files", "3"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("compaction.twcs.remote_compaction", "false"),
Expand All @@ -599,7 +609,9 @@ mod tests {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
max_inactive_window_runs: 2,
max_inactive_window_files: 3,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
}),
Expand Down Expand Up @@ -628,7 +640,9 @@ mod tests {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: usize::MAX,
max_inactive_window_runs: 2,
max_inactive_window_files: usize::MAX,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
}),
Expand Down Expand Up @@ -663,7 +677,9 @@ mod tests {
"compaction": {
"compaction.type": "twcs",
"compaction.twcs.max_active_window_runs": "8",
"compaction.twcs.max_active_window_files": "11",
"compaction.twcs.max_inactive_window_runs": "2",
"compaction.twcs.max_inactive_window_files": "7",
"compaction.twcs.time_window": "2h"
},
"storage": "S3",
Expand All @@ -689,7 +705,9 @@ mod tests {
ttl: Some(Duration::from_secs(3600 * 24 * 7)),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
max_inactive_window_runs: 2,
max_inactive_window_files: 7,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
}),
Expand Down
2 changes: 2 additions & 0 deletions src/store-api/src/mito_engine_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
"ttl",
"compaction.type",
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_active_window_files",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.time_window",
"compaction.twcs.remote_compaction",
"storage",
Expand Down