Skip to content

Commit

Permalink
fix: avoid flush too many small sst file (#1003)
Browse files Browse the repository at this point in the history
## Rationale
Currently, we attempt to flush the table that consumes the maximum
memory when the system memory usage limit is reached for either
`space_write_buffer_size` or `db_write_buffer_size`. However, if the
target table is currently undergoing flushing, its memory usage will not
be released, causing the `preprocess_flush` (freeze small memtables)
function to be repeatedly triggered. This can result in the creation of
many small SST files, potentially causing query issues.

## Detailed Changes
* Move `preprocess_flush` into `flush_job`
* Split `swith_memtables_or_suggest_duration` into 2 methods, and make
`swith_memtables` return maxium sequence number.

## Test Plan
  • Loading branch information
MichaelLeeHZ authored Jun 27, 2023
1 parent f8471d2 commit cd2b688
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 112 deletions.
13 changes: 9 additions & 4 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,11 @@ impl ScheduleWorker {
let ongoing = self.limit.ongoing_tasks();
match schedule_task {
ScheduleTask::Request(compact_req) => {
debug!("Ongoing compaction tasks:{}", ongoing);
debug!("Ongoing compaction tasks:{ongoing}");
if ongoing >= self.max_ongoing_tasks {
self.limit.add_request(compact_req);
warn!(
"Too many compaction ongoing tasks:{}, max:{}, buf_len:{}",
ongoing,
"Too many compaction ongoing tasks:{ongoing}, max:{}, buf_len:{}",
self.max_ongoing_tasks,
self.limit.request_buf_len()
);
Expand All @@ -448,7 +447,13 @@ impl ScheduleWorker {
for compact_req in pending {
self.handle_table_compaction_request(compact_req).await;
}
debug!("Scheduled {} pending compaction tasks.", len);
debug!("Scheduled {len} pending compaction tasks.");
} else {
warn!(
"Too many compaction ongoing tasks:{ongoing}, max:{}, buf_len:{}",
self.max_ongoing_tasks,
self.limit.request_buf_len()
);
}
}
ScheduleTask::Exit => (),
Expand Down
123 changes: 57 additions & 66 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ pub struct Flusher {
struct FlushTask {
space_store: SpaceStoreRef,
table_data: TableDataRef,
max_sequence: SequenceNumber,

runtime: RuntimeRef,
write_sst_max_buffer_size: usize,
}
Expand All @@ -205,9 +203,7 @@ impl Flusher {
table_data, opts
);

let flush_req = self.preprocess_flush(table_data).await?;

self.schedule_table_flush(flush_scheduler, flush_req, opts, false)
self.schedule_table_flush(flush_scheduler, table_data.clone(), opts, false)
.await
}

Expand All @@ -223,74 +219,20 @@ impl Flusher {
table_data, opts
);

let flush_req = self.preprocess_flush(table_data).await?;

self.schedule_table_flush(flush_scheduler, flush_req, opts, true)
self.schedule_table_flush(flush_scheduler, table_data.clone(), opts, true)
.await
}

async fn preprocess_flush(&self, table_data: &TableDataRef) -> Result<TableFlushRequest> {
let current_version = table_data.current_version();
let last_sequence = table_data.last_sequence();
// Switch (freeze) all mutable memtables. And update segment duration if
// suggestion is returned.
if let Some(suggest_segment_duration) =
current_version.switch_memtables_or_suggest_duration()
{
info!(
"Update segment duration, table:{}, table_id:{}, segment_duration:{:?}",
table_data.name, table_data.id, suggest_segment_duration
);
assert!(!suggest_segment_duration.is_zero());

let mut new_table_opts = (*table_data.table_options()).clone();
new_table_opts.segment_duration = Some(ReadableDuration(suggest_segment_duration));

let edit_req = {
let meta_update = MetaUpdate::AlterOptions(AlterOptionsMeta {
space_id: table_data.space_id,
table_id: table_data.id,
options: new_table_opts.clone(),
});
MetaEditRequest {
shard_info: table_data.shard_info,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.space_store
.manifest
.apply_edit(edit_req)
.await
.context(StoreVersionEdit)?;

// Now the segment duration is applied, we can stop sampling and freeze the
// sampling memtable.
current_version.freeze_sampling();
}

info!("Try to trigger memtable flush of table, table:{}, table_id:{}, max_memtable_id:{}, last_sequence:{}",
table_data.name, table_data.id, table_data.last_memtable_id(), last_sequence);

// Try to flush all memtables of current table
Ok(TableFlushRequest {
table_data: table_data.clone(),
max_sequence: last_sequence,
})
}

/// Schedule table flush request to background workers
async fn schedule_table_flush(
&self,
flush_scheduler: &mut TableFlushScheduler,
flush_req: TableFlushRequest,
table_data: TableDataRef,
opts: TableFlushOptions,
block_on: bool,
) -> Result<()> {
let table_data = flush_req.table_data.clone();

let flush_task = FlushTask {
table_data: table_data.clone(),
max_sequence: flush_req.max_sequence,
space_store: self.space_store.clone(),
runtime: self.runtime.clone(),
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
Expand All @@ -308,18 +250,16 @@ impl FlushTask {
/// should be ensured by the caller.
async fn run(&self) -> Result<()> {
let instant = Instant::now();
let flush_req = self.preprocess_flush(&self.table_data).await?;

let current_version = self.table_data.current_version();
let mems_to_flush = current_version.pick_memtables_to_flush(self.max_sequence);
let mems_to_flush = current_version.pick_memtables_to_flush(flush_req.max_sequence);

if mems_to_flush.is_empty() {
return Ok(());
}

let request_id = RequestId::next_id();
info!(
"Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}",
self.table_data.name, self.table_data.id, request_id, mems_to_flush
);

// Start flush duration timer.
let local_metrics = self.table_data.metrics.local_flush_metrics();
Expand Down Expand Up @@ -348,6 +288,57 @@ impl FlushTask {
Ok(())
}

async fn preprocess_flush(&self, table_data: &TableDataRef) -> Result<TableFlushRequest> {
let current_version = table_data.current_version();
let mut last_sequence = table_data.last_sequence();
// Switch (freeze) all mutable memtables. And update segment duration if
// suggestion is returned.
if let Some(suggest_segment_duration) = current_version.suggest_duration() {
info!(
"Update segment duration, table:{}, table_id:{}, segment_duration:{:?}",
table_data.name, table_data.id, suggest_segment_duration
);
assert!(!suggest_segment_duration.is_zero());

let mut new_table_opts = (*table_data.table_options()).clone();
new_table_opts.segment_duration = Some(ReadableDuration(suggest_segment_duration));

let edit_req = {
let meta_update = MetaUpdate::AlterOptions(AlterOptionsMeta {
space_id: table_data.space_id,
table_id: table_data.id,
options: new_table_opts.clone(),
});
MetaEditRequest {
shard_info: table_data.shard_info,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.space_store
.manifest
.apply_edit(edit_req)
.await
.context(StoreVersionEdit)?;

// Now the segment duration is applied, we can stop sampling and freeze the
// sampling memtable.
if let Some(seq) = current_version.freeze_sampling_memtable() {
last_sequence = seq.max(last_sequence);
}
} else if let Some(seq) = current_version.switch_memtables() {
last_sequence = seq.max(last_sequence);
}

info!("Try to trigger memtable flush of table, table:{}, table_id:{}, max_memtable_id:{}, last_sequence:{last_sequence}",
table_data.name, table_data.id, table_data.last_memtable_id());

// Try to flush all memtables of current table
Ok(TableFlushRequest {
table_data: table_data.clone(),
max_sequence: last_sequence,
})
}

/// This will write picked memtables [FlushableMemTables] to level 0 sst
/// files. Sampling memtable may be dumped into multiple sst file according
/// to the sampled segment duration.
Expand Down
6 changes: 5 additions & 1 deletion analytic_engine/src/instance/serial_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ impl TableFlushScheduler {
*flush_state = FlushState::Flushing;
break;
}
FlushState::Flushing => (),
FlushState::Flushing => {
if !block_on_write_thread {
return Ok(());
}
}
FlushState::Failed { err_msg } => {
if self
.schedule_sync
Expand Down
16 changes: 15 additions & 1 deletion analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ impl TableData {
self.current_version.total_memory_usage()
}

/// Returns mutable memtable memory usage in bytes.
#[inline]
pub fn mutable_memory_usage(&self) -> usize {
self.current_version.mutable_memory_usage()
}

/// Find memtable for given timestamp to insert, create if not exists
///
/// If the memtable schema is outdated, switch all memtables and create the
Expand Down Expand Up @@ -592,6 +598,14 @@ impl TableDataSet {
.cloned()
}

pub fn find_maximum_mutable_memory_usage_table(&self) -> Option<TableDataRef> {
// TODO: Possible performance issue here when there are too many tables.
self.table_datas
.values()
.max_by_key(|t| t.mutable_memory_usage())
.cloned()
}

/// List all tables to `tables`
pub fn list_all_tables(&self, tables: &mut Vec<TableDataRef>) {
for table_data in self.table_datas.values().cloned() {
Expand Down Expand Up @@ -766,7 +780,7 @@ pub mod tests {
Some(ReadableDuration(table_options::DEFAULT_SEGMENT_DURATION));
table_data.set_table_options(table_opts);
// Freeze sampling memtable.
current_version.freeze_sampling();
current_version.freeze_sampling_memtable();

// A new mutable memtable should be created.
let mutable = table_data.find_or_create_mutable(now_ts, &schema).unwrap();
Expand Down
Loading

0 comments on commit cd2b688

Please sign in to comment.