Skip to content

Commit

Permalink
combine into one commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tanruixiang committed Jun 21, 2023
1 parent 032c448 commit a3391bd
Show file tree
Hide file tree
Showing 33 changed files with 1,195 additions and 323 deletions.
42 changes: 27 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.4"
ceresdbproto = { git = "https://github.com/tanruixiang/ceresdbproto.git", rev = "53f5c74a54d8a08ebb08c41e8b862b2369df4a02" }
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand Down
108 changes: 72 additions & 36 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,26 @@ pub struct ExpiredFiles {

#[derive(Default, Clone)]
pub struct CompactionTask {
pub compaction_inputs: Vec<CompactionInputFiles>,
pub expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
expired: Vec<ExpiredFiles>,
}

impl Drop for CompactionTask {
fn drop(&mut self) {
// When a CompactionTask is dropped, it means
// 1. the task finished successfully, or
// 2. the task is cancelled for some reason, like memory limit
//
// In case 2, we need to mark files as not compacted in order for them to be
// scheduled again. In case 1, the files will be moved out of level controller,
// so it doesn't care what the flag is, so it's safe to set false here.
self.mark_files_being_compacted(false);
}
}

impl CompactionTask {
pub fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.compaction_inputs {
fn mark_files_being_compacted(&self, being_compacted: bool) {
for input in &self.inputs {
for file in &input.files {
file.set_being_compacted(being_compacted);
}
Expand All @@ -337,29 +350,76 @@ impl CompactionTask {
}

// Estimate the size of the total input files.
#[inline]
pub fn estimated_total_input_file_size(&self) -> usize {
let total_input_size: u64 = self
.compaction_inputs
.inputs
.iter()
.map(|v| v.files.iter().map(|f| f.size()).sum::<u64>())
.sum();

total_input_size as usize
}

#[inline]
pub fn num_compact_files(&self) -> usize {
self.compaction_inputs.iter().map(|v| v.files.len()).sum()
self.inputs.iter().map(|v| v.files.len()).sum()
}

pub fn num_expired_files(&self) -> usize {
self.expired.iter().map(|v| v.files.len()).sum()
#[inline]
pub fn is_empty(&self) -> bool {
self.is_input_empty() && self.expired.is_empty()
}

#[inline]
pub fn is_input_empty(&self) -> bool {
self.inputs.is_empty()
}

#[inline]
pub fn expired(&self) -> &[ExpiredFiles] {
&self.expired
}

#[inline]
pub fn inputs(&self) -> &[CompactionInputFiles] {
&self.inputs
}
}

pub struct CompactionTaskBuilder {
expired: Vec<ExpiredFiles>,
inputs: Vec<CompactionInputFiles>,
}

impl CompactionTaskBuilder {
pub fn with_expired(expired: Vec<ExpiredFiles>) -> Self {
Self {
expired,
inputs: Vec::new(),
}
}

pub fn add_inputs(&mut self, files: CompactionInputFiles) {
self.inputs.push(files);
}

pub fn build(self) -> CompactionTask {
let task = CompactionTask {
expired: self.expired,
inputs: self.inputs,
};

task.mark_files_being_compacted(true);

task
}
}

impl fmt::Debug for CompactionTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactionTask")
.field("inputs", &self.compaction_inputs)
.field("inputs", &self.inputs)
.field(
"expired",
&self
Expand All @@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask {
}
}

pub struct PickerManager {
default_picker: CompactionPickerRef,
time_window_picker: CompactionPickerRef,
size_tiered_picker: CompactionPickerRef,
}

impl Default for PickerManager {
fn default() -> Self {
let size_tiered_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()),
));
let time_window_picker = Arc::new(CommonCompactionPicker::new(
CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()),
));

Self {
default_picker: time_window_picker.clone(),
size_tiered_picker,
time_window_picker,
}
}
}
#[derive(Default)]
pub struct PickerManager;

impl PickerManager {
pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Default => self.default_picker.clone(),
CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(),
CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(),
}
Arc::new(CommonCompactionPicker::new(strategy))
}
}

Expand Down
Loading

0 comments on commit a3391bd

Please sign in to comment.