Skip to content

Commit

Permalink
refactor: add some comments for scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jul 31, 2024
1 parent 0b71744 commit 46b90b9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ message GetServerlessStreamingJobsStatusResponse {
repeated Status streaming_job_statuses = 1;
}

// This is used by `risectl`
service ScaleService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ pub struct Reschedule {
/// The downstream fragments of this fragment.
pub downstream_fragment_ids: Vec<FragmentId>,

/// Reassigned splits for source actors
/// Reassigned splits for source actors.
/// It becomes the `actor_splits` in [`UpdateMutation`].
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

/// Whether this fragment is injectable. The injectable means whether the fragment contains
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl GlobalBarrierManagerContext {
} else {
let (reschedule_fragment, _) = self
.scale_controller
.prepare_reschedule_command(
.analyze_reschedule_plan(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
Expand Down Expand Up @@ -871,7 +871,7 @@ impl GlobalBarrierManagerContext {

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(
.analyze_reschedule_plan(
plan,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
Expand Down
47 changes: 39 additions & 8 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,8 +799,18 @@ impl ScaleController {
Ok(())
}

// Results are the generated reschedule plan and the changes that need to be updated to the meta store.
pub(crate) async fn prepare_reschedule_command(
/// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`].
///
/// Returns `(reschedule_fragment, applied_reschedules)`
/// - `reschedule_fragment`: the generated reschedule plan
/// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1).
///
/// In [normal process of scaling](`GlobalStreamManager::reschedule_actors_impl`), we use the returned values to
/// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling.
/// Meta store is updated after the barrier is collected.
///
/// During recovery, we don't need the barrier mechanism, and can directly use the returned values to update meta.
pub(crate) async fn analyze_reschedule_plan(
&self,
mut reschedules: HashMap<FragmentId, WorkerReschedule>,
options: RescheduleOptions,
Expand All @@ -812,6 +822,7 @@ impl ScaleController {
let ctx = self
.build_reschedule_context(&mut reschedules, options, table_parallelisms)
.await?;
let reschedules = reschedules;

// Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.

Expand Down Expand Up @@ -2136,7 +2147,7 @@ impl ScaleController {
WorkerReschedule { worker_actor_diff }
}

pub fn build_no_shuffle_relation_index(
fn build_no_shuffle_relation_index(
actor_map: &HashMap<ActorId, CustomActorInfo>,
no_shuffle_source_fragment_ids: &mut HashSet<FragmentId>,
no_shuffle_target_fragment_ids: &mut HashSet<FragmentId>,
Expand Down Expand Up @@ -2164,7 +2175,7 @@ impl ScaleController {
}
}

pub fn build_fragment_dispatcher_index(
fn build_fragment_dispatcher_index(
actor_map: &HashMap<ActorId, CustomActorInfo>,
fragment_dispatcher_map: &mut HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
) {
Expand Down Expand Up @@ -2303,8 +2314,8 @@ impl ScaleController {
}
}

// At present, for table level scaling, we use the strategy TableResizePolicy.
// Currently, this is used as an internal interface, so it won’t be included in Protobuf for the time being.
/// At present, for table level scaling, we use the strategy `TableResizePolicy`.
/// Currently, this is used as an internal interface, so it won’t be included in Protobuf.
pub struct TableResizePolicy {
pub(crate) worker_ids: BTreeSet<WorkerId>,
pub(crate) table_parallelisms: HashMap<u32, TableParallelism>,
Expand All @@ -2319,6 +2330,15 @@ impl GlobalStreamManager {
self.scale_controller.reschedule_lock.write().await
}

/// The entrypoint of rescheduling actors.
///
/// Used by:
/// - The directly exposed low-level API `risingwave_meta_service::scale_service::ScaleService`
/// * `risectl scale resize` (high-level)
/// * `risectl meta reschedule` (low-level)
/// - High-level parallelism control API
/// * manual `ALTER [TABLE | INDEX | MATERIALIZED VIEW | SINK] SET PARALLELISM`
/// * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed
pub async fn reschedule_actors(
&self,
reschedules: HashMap<FragmentId, WorkerReschedule>,
Expand Down Expand Up @@ -2350,7 +2370,7 @@ impl GlobalStreamManager {

let (reschedule_fragment, applied_reschedules) = self
.scale_controller
.prepare_reschedule_command(reschedules, options, table_parallelism.as_mut())
.analyze_reschedule_plan(reschedules, options, table_parallelism.as_mut())
.await?;

tracing::debug!("reschedule plan: {:?}", reschedule_fragment);
Expand Down Expand Up @@ -2411,6 +2431,14 @@ impl GlobalStreamManager {
Ok(())
}

/// When new worker nodes joined, or the parallelism of existing worker nodes changed,
/// examines if there are any jobs can be scaled, and scales them if found.
///
/// This method will iterate over all `CREATED` jobs, and can be repeatedly called.
///
/// Returns
/// - `Ok(false)` if no jobs can be scaled;
/// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
let background_streaming_jobs = self
.metadata_manager
Expand Down Expand Up @@ -2532,7 +2560,9 @@ impl GlobalStreamManager {

for batch in batches {
let parallelisms: HashMap<_, _> = batch.into_iter().collect();

// `table_parallelisms` contains ALL created jobs.
// We rely on `generate_table_resize_plan` to check if there are
// any jobs that can be scaled.
let plan = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
Expand Down Expand Up @@ -2569,6 +2599,7 @@ impl GlobalStreamManager {
Ok(true)
}

/// Handles notification of worker node activation and deletion, and triggers parallelism control.
async fn run(&self, mut shutdown_rx: Receiver<()>) {
tracing::info!("starting automatic parallelism control monitor");

Expand Down

0 comments on commit 46b90b9

Please sign in to comment.