Skip to content

Commit

Permalink
refactor(storage): refactor hummock timer loop (#10164)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jun 14, 2023
1 parent 353da76 commit ff91a4a
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 246 deletions.
165 changes: 2 additions & 163 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -22,7 +22,6 @@ use futures::{FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use risingwave_common::util::select_all;
use risingwave_hummock_sdk::compact::compact_task_to_string;
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};
use risingwave_pb::hummock::subscribe_compact_tasks_response::Task;
Expand All @@ -31,7 +30,6 @@ use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Receiver;
use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream};
use tracing::log::info;

use super::Compactor;
use crate::hummock::compaction::{
Expand All @@ -47,8 +45,6 @@ pub type CompactionRequestChannelRef = Arc<CompactionRequestChannel>;

type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);

const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;

/// [`CompactionRequestChannel`] wrappers a mpsc channel and deduplicate requests from same
/// compaction groups.
pub struct CompactionRequestChannel {
Expand Down Expand Up @@ -140,7 +136,6 @@ where
self.env.opts.periodic_space_reclaim_compaction_interval_sec,
self.env.opts.periodic_ttl_reclaim_compaction_interval_sec,
self.env.opts.periodic_compaction_interval_sec,
self.env.opts.periodic_split_compact_group_interval_sec,
);
self.schedule_loop(
sched_channel,
Expand Down Expand Up @@ -324,7 +319,6 @@ where
use futures::pin_mut;
pin_mut!(event_stream);

let mut group_infos = HashMap::default();
loop {
let item = futures::future::select(event_stream.next(), shutdown_rx.clone()).await;
match item {
Expand Down Expand Up @@ -385,17 +379,6 @@ where
)
.await;
}
SchedulerEvent::GroupSplitTrigger => {
// Disable periodic trigger for compaction_deterministic_test.
if self.env.opts.compaction_deterministic_test {
continue;
}
self.on_handle_check_split_multi_group(&mut group_infos)
.await;
}
SchedulerEvent::CheckDeadTaskTrigger => {
self.hummock_manager.check_dead_task().await;
}
}
}
}
Expand Down Expand Up @@ -461,126 +444,6 @@ where

true
}

async fn on_handle_check_split_multi_group(
&self,
history_table_infos: &mut HashMap<StateTableId, VecDeque<u64>>,
) {
const HISTORY_TABLE_INFO_WINDOW_SIZE: usize = 4;
let mut group_infos = self
.hummock_manager
.calculate_compaction_group_statistic()
.await;
group_infos.sort_by_key(|group| group.group_size);
group_infos.reverse();
let group_size_limit = self.env.opts.split_group_size_limit;
let table_split_limit = self.env.opts.move_table_size_limit;
let mut table_infos = vec![];
// TODO: support move small state-table back to default-group to reduce IOPS.
for group in &group_infos {
if group.table_statistic.len() == 1 || group.group_size < group_size_limit {
continue;
}
for (table_id, table_size) in &group.table_statistic {
let last_table_infos = history_table_infos
.entry(*table_id)
.or_insert_with(VecDeque::new);
last_table_infos.push_back(*table_size);
if last_table_infos.len() > HISTORY_TABLE_INFO_WINDOW_SIZE {
last_table_infos.pop_front();
}
table_infos.push((*table_id, group.group_id, group.group_size));
}
}
table_infos.sort_by(|a, b| b.2.cmp(&a.2));
let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into();
let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into();
let mut partition_vnode_count = self.env.opts.partition_vnode_count;
for (table_id, parent_group_id, parent_group_size) in table_infos {
let table_info = history_table_infos.get(&table_id).unwrap();
let table_size = *table_info.back().unwrap();
if table_size < table_split_limit
|| (table_size < group_size_limit
&& table_info.len() < HISTORY_TABLE_INFO_WINDOW_SIZE)
{
continue;
}

let mut target_compact_group_id = None;
let mut allow_split_by_table = false;
if table_size < group_size_limit {
let mut increase = true;
for idx in 1..table_info.len() {
if table_info[idx] < table_info[idx - 1] {
increase = false;
break;
}
}

if !increase {
continue;
}

// do not split a large table and a small table because it would increase IOPS of
// small table.
if parent_group_id != default_group_id && parent_group_id != mv_group_id {
let rest_group_size = parent_group_size - table_size;
if rest_group_size < table_size && rest_group_size < table_split_limit {
continue;
}
}

let increase_data_size = table_size.saturating_sub(*table_info.front().unwrap());
let increase_slow = increase_data_size < table_split_limit;

// if the size of this table increases too fast, we shall create one group for it.
if increase_slow
&& (parent_group_id == mv_group_id || parent_group_id == default_group_id)
{
for group in &group_infos {
// do not move to mv group or state group
if !group.split_by_table || group.group_id == mv_group_id
|| group.group_id == default_group_id
|| group.group_id == parent_group_id
// do not move state-table to a large group.
|| group.group_size + table_size > group_size_limit
// do not move state-table from group A to group B if this operation would make group B becomes larger than A.
|| group.group_size + table_size > parent_group_size - table_size
{
continue;
}
target_compact_group_id = Some(group.group_id);
}
allow_split_by_table = true;
partition_vnode_count = 1;
}
}

let ret = self
.hummock_manager
.move_state_table_to_compaction_group(
parent_group_id,
&[table_id],
target_compact_group_id,
allow_split_by_table,
partition_vnode_count,
)
.await;
match ret {
Ok(_) => {
info!(
"move state table [{}] from group-{} to group-{:?} success, Allow split by table: {}",
table_id, parent_group_id, target_compact_group_id, allow_split_by_table
);
return;
}
Err(e) => info!(
"failed to move state table [{}] from group-{} to group-{:?} because {:?}",
table_id, parent_group_id, target_compact_group_id, e
),
}
}
}
}

#[derive(Clone)]
Expand All @@ -589,8 +452,6 @@ pub enum SchedulerEvent {
DynamicTrigger,
SpaceReclaimTrigger,
TtlReclaimTrigger,
GroupSplitTrigger,
CheckDeadTaskTrigger,
}

impl<S> CompactionScheduler<S>
Expand All @@ -602,7 +463,6 @@ where
periodic_space_reclaim_compaction_interval_sec: u64,
periodic_ttl_reclaim_compaction_interval_sec: u64,
periodic_compaction_interval_sec: u64,
periodic_check_split_group_interval_sec: u64,
) -> impl Stream<Item = SchedulerEvent> {
let dynamic_channel_trigger =
UnboundedReceiverStream::new(sched_rx).map(SchedulerEvent::Channel);
Expand Down Expand Up @@ -632,33 +492,12 @@ where
let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
.map(|_| SchedulerEvent::TtlReclaimTrigger);

let mut check_compact_trigger_interval =
tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
check_compact_trigger_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
check_compact_trigger_interval.reset();
let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
.map(|_| SchedulerEvent::CheckDeadTaskTrigger);

let mut triggers: Vec<BoxStream<'static, SchedulerEvent>> = vec![
let triggers: Vec<BoxStream<'static, SchedulerEvent>> = vec![
Box::pin(dynamic_channel_trigger),
Box::pin(dynamic_tick_trigger),
Box::pin(space_reclaim_trigger),
Box::pin(ttl_reclaim_trigger),
Box::pin(check_compact_trigger),
];

if periodic_check_split_group_interval_sec > 0 {
let mut split_group_trigger_interval =
tokio::time::interval(Duration::from_secs(periodic_check_split_group_interval_sec));
split_group_trigger_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
split_group_trigger_interval.reset();

let split_group_trigger = IntervalStream::new(split_group_trigger_interval)
.map(|_| SchedulerEvent::GroupSplitTrigger);
triggers.push(Box::pin(split_group_trigger));
}
select_all(triggers)
}
}
Expand Down
Loading

0 comments on commit ff91a4a

Please sign in to comment.