Skip to content

Commit

Permalink
merge conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed Jun 16, 2023
1 parent 1af4ea1 commit 2e5a907
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 311 deletions.
194 changes: 24 additions & 170 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use risingwave_common::util::select_all;
use risingwave_hummock_sdk::compact::compact_task_to_string;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};
use risingwave_pb::hummock::subscribe_compact_tasks_response::Task;
Expand All @@ -44,21 +42,13 @@ use crate::storage::MetaStore;

pub type CompactionSchedulerRef<S> = Arc<CompactionScheduler<S>>;
pub type CompactionRequestChannelRef = Arc<CompactionRequestChannel>;
const HISTORY_TABLE_INFO_WINDOW_SIZE: usize = 16;

#[derive(Clone, Debug)]
pub enum CompactionRequestItem {
Compact {
compaction_group: CompactionGroupId,
task_type: compact_task::TaskType,
},
SplitLargeGroup(PbTableStatsMap),
}
type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);

/// [`CompactionRequestChannel`] wrappers a mpsc channel and deduplicate requests from same
/// compaction groups.
pub struct CompactionRequestChannel {
request_tx: UnboundedSender<CompactionRequestItem>,
request_tx: UnboundedSender<CompactionRequestChannelItem>,
scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
}

Expand All @@ -72,7 +62,7 @@ pub enum ScheduleStatus {
}

impl CompactionRequestChannel {
pub fn new(request_tx: UnboundedSender<CompactionRequestItem>) -> Self {
pub fn new(request_tx: UnboundedSender<CompactionRequestChannelItem>) -> Self {
Self {
request_tx,
scheduled: Default::default(),
Expand All @@ -84,29 +74,17 @@ impl CompactionRequestChannel {
&self,
compaction_group: CompactionGroupId,
task_type: compact_task::TaskType,
) -> Result<bool, SendError<CompactionRequestItem>> {
) -> Result<bool, SendError<CompactionRequestChannelItem>> {
let mut guard = self.scheduled.lock();
let key = (compaction_group, task_type);
if guard.contains(&key) {
return Ok(false);
}
self.request_tx.send(CompactionRequestItem::Compact {
compaction_group,
task_type,
})?;
self.request_tx.send(key)?;
guard.insert(key);
Ok(true)
}

/// Enqueues only if the target is not yet in queue.
pub fn try_split_groups(
&self,
stats: PbTableStatsMap,
) -> Result<(), SendError<CompactionRequestItem>> {
self.request_tx
.send(CompactionRequestItem::SplitLargeGroup(stats))
}

pub fn unschedule(
&self,
compaction_group: CompactionGroupId,
Expand Down Expand Up @@ -143,7 +121,8 @@ where
}

pub async fn start(&self, shutdown_rx: Receiver<()>) {
let (sched_tx, sched_rx) = tokio::sync::mpsc::unbounded_channel::<CompactionRequestItem>();
let (sched_tx, sched_rx) =
tokio::sync::mpsc::unbounded_channel::<CompactionRequestChannelItem>();
let sched_channel = Arc::new(CompactionRequestChannel::new(sched_tx));

self.hummock_manager
Expand Down Expand Up @@ -346,35 +325,22 @@ where
Either::Left((event, _)) => {
if let Some(event) = event {
match event {
SchedulerEvent::Channel(item) => {
SchedulerEvent::Channel((compaction_group, task_type)) => {
// recv
match item {
CompactionRequestItem::Compact {
if !self
.on_handle_compact(
compaction_group,
&mut compaction_selectors,
task_type,
} => {
if !self
.on_handle_compact(
compaction_group,
&mut compaction_selectors,
task_type,
sched_channel.clone(),
)
.await
{
self.hummock_manager
.metrics
.compact_skip_frequency
.with_label_values(&["total", "no-compactor"])
.inc();
}
}
CompactionRequestItem::SplitLargeGroup(stats) => {
self.collect_table_write_throughput(
stats,
&mut group_infos,
);
}
sched_channel.clone(),
)
.await
{
self.hummock_manager
.metrics
.compact_skip_frequency
.with_label_values(&["total", "no-compactor"])
.inc();
}
}
SchedulerEvent::DynamicTrigger => {
Expand Down Expand Up @@ -478,123 +444,11 @@ where

true
}

fn collect_table_write_throughput(
&self,
table_stats: PbTableStatsMap,
table_infos: &mut HashMap<u32, VecDeque<u64>>,
) {
for (table_id, stat) in table_stats {
let throughput = (stat.total_value_size + stat.total_key_size) as u64;
let entry = table_infos.entry(table_id).or_default();
entry.push_back(throughput);
if entry.len() > HISTORY_TABLE_INFO_WINDOW_SIZE {
entry.pop_front();
}
}
}

async fn on_handle_check_split_multi_group(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
) {
let mut group_infos = self
.hummock_manager
.calculate_compaction_group_statistic()
.await;
group_infos.sort_by_key(|group| group.group_size);
group_infos.reverse();
let group_size_limit = self.env.opts.split_group_size_limit;
let table_split_limit = self.env.opts.move_table_size_limit;
let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into();
let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into();
let mut partition_vnode_count = self.env.opts.partition_vnode_count;
for group in &group_infos {
if group.table_statistic.len() == 1 {
continue;
}

for (table_id, table_size) in &group.table_statistic {
let mut is_high_write_throughput = false;
let mut is_low_write_throughput = true;
if let Some(history) = table_write_throughput.get(table_id) {
if history.len() >= HISTORY_TABLE_INFO_WINDOW_SIZE {
let window_total_size = history.iter().sum::<u64>();
is_high_write_throughput = history.iter().all(|throughput| {
*throughput > self.env.opts.table_write_throughput_threshold
});
is_low_write_throughput = window_total_size
< (HISTORY_TABLE_INFO_WINDOW_SIZE as u64)
* self.env.opts.min_table_split_write_throughput;
}
}

if *table_size < group_size_limit && !is_high_write_throughput {
continue;
}

let parent_group_id = group.group_id;
let mut target_compact_group_id = None;
let mut allow_split_by_table = false;
if *table_size > group_size_limit && is_low_write_throughput {
// do not split a large table and a small table because it would increase IOPS
// of small table.
if parent_group_id != default_group_id && parent_group_id != mv_group_id {
let rest_group_size = group.group_size - *table_size;
if rest_group_size < *table_size && rest_group_size < table_split_limit {
continue;
}
} else {
for group in &group_infos {
// do not move to mv group or state group
if !group.split_by_table || group.group_id == mv_group_id
|| group.group_id == default_group_id
|| group.group_id == parent_group_id
// do not move state-table to a large group.
|| group.group_size + *table_size > group_size_limit
// do not move state-table from group A to group B if this operation would make group B becomes larger than A.
|| group.group_size + *table_size > group.group_size - table_size
{
continue;
}
target_compact_group_id = Some(group.group_id);
}
allow_split_by_table = true;
partition_vnode_count = 1;
}
}

let ret = self
.hummock_manager
.move_state_table_to_compaction_group(
parent_group_id,
&[*table_id],
target_compact_group_id,
allow_split_by_table,
partition_vnode_count,
)
.await;
match ret {
Ok(_) => {
info!(
"move state table [{}] from group-{} to group-{:?} success, Allow split by table: {}",
table_id, parent_group_id, target_compact_group_id, allow_split_by_table
);
return;
}
Err(e) => info!(
"failed to move state table [{}] from group-{} to group-{:?} because {:?}",
table_id, parent_group_id, target_compact_group_id, e
),
}
}
}
}
}

#[derive(Clone)]
pub enum SchedulerEvent {
Channel(CompactionRequestItem),
Channel((CompactionGroupId, compact_task::TaskType)),
DynamicTrigger,
SpaceReclaimTrigger,
TtlReclaimTrigger,
Expand All @@ -605,7 +459,7 @@ where
S: MetaStore,
{
fn scheduler_event_stream(
sched_rx: UnboundedReceiver<CompactionRequestItem>,
sched_rx: UnboundedReceiver<(CompactionGroupId, compact_task::TaskType)>,
periodic_space_reclaim_compaction_interval_sec: u64,
periodic_ttl_reclaim_compaction_interval_sec: u64,
periodic_compaction_interval_sec: u64,
Expand Down Expand Up @@ -657,7 +511,7 @@ mod tests {

use crate::hummock::compaction::default_level_selector;
use crate::hummock::compaction_scheduler::{
CompactionRequestChannel, CompactionRequestItem, ScheduleStatus,
CompactionRequestChannel, CompactionRequestChannelItem, ScheduleStatus,
};
use crate::hummock::test_utils::{add_ssts, setup_compute_env};
use crate::hummock::CompactionScheduler;
Expand All @@ -671,7 +525,7 @@ mod tests {
CompactionScheduler::new(env, hummock_manager.clone(), compactor_manager.clone());

let (request_tx, _request_rx) =
tokio::sync::mpsc::unbounded_channel::<CompactionRequestItem>();
tokio::sync::mpsc::unbounded_channel::<CompactionRequestChannelItem>();
let request_channel = Arc::new(CompactionRequestChannel::new(request_tx));

// Add a compactor with invalid context_id.
Expand Down
Loading

0 comments on commit 2e5a907

Please sign in to comment.