Skip to content

Commit

Permalink
fix(meta): correctly resolve update of vnode mapping after scaling (r…
Browse files Browse the repository at this point in the history
…isingwavelabs#8652)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Mar 21, 2023
1 parent cc6e687 commit 8c5489e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 33 deletions.
8 changes: 4 additions & 4 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ pub type ExpandedParallelUnitMapping = ExpandedMapping<marker::ParallelUnit>;

impl ActorMapping {
/// Transform this actor mapping to a parallel unit mapping, essentially `transform`.
pub fn to_parallel_unit(
&self,
to_map: &HashMap<ActorId, ParallelUnitId>,
) -> ParallelUnitMapping {
pub fn to_parallel_unit<M>(&self, to_map: &M) -> ParallelUnitMapping
where
M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>,
{
self.transform(to_map)
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub struct Reschedule {
/// The upstream fragments of this fragment, and the dispatchers that should be updated.
pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
/// New hash mapping of the upstream dispatcher to be updated.
///
/// This field exists only when there's upstream fragment and the current fragment is
/// hash-sharded.
pub upstream_dispatcher_mapping: Option<ActorMapping>,

/// The downstream fragments of this fragment.
Expand Down
67 changes: 39 additions & 28 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use std::sync::Arc;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::hash::{ActorMapping, ParallelUnitMapping};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
Expand Down Expand Up @@ -736,44 +737,54 @@ where
let actor_status = table_fragment.actor_status.clone();
let fragment = table_fragment.fragments.get_mut(&fragment_id).unwrap();

fragment
.actors
.retain(|a| !removed_actor_ids.contains(&a.actor_id));

// update vnode mapping for actors.
for actor in &mut fragment.actors {
if let Some(bitmap) = vnode_bitmap_updates.get(&actor.actor_id) {
actor.vnode_bitmap = Some(bitmap.to_protobuf());
}
}

fragment
.actors
.retain(|a| !removed_actor_ids.contains(&a.actor_id));

// update fragment's vnode mapping
if let Some(vnode_mapping) = fragment.vnode_mapping.as_mut() {
let mut actor_to_parallel_unit = HashMap::with_capacity(fragment.actors.len());
for actor in &fragment.actors {
if let Some(actor_status) = actor_status.get(&actor.actor_id) {
if let Some(parallel_unit) = actor_status.parallel_unit.as_ref() {
actor_to_parallel_unit.insert(
actor.actor_id as ActorId,
parallel_unit.id as ParallelUnitId,
);
}
}
let mut actor_to_parallel_unit = HashMap::with_capacity(fragment.actors.len());
let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment.actors.len());
for actor in &fragment.actors {
let actor_status = &actor_status[&actor.actor_id];
let parallel_unit_id = actor_status.parallel_unit.as_ref().unwrap().id;
actor_to_parallel_unit.insert(actor.actor_id, parallel_unit_id);

if let Some(vnode_bitmap) = &actor.vnode_bitmap {
let bitmap = Bitmap::from(vnode_bitmap);
actor_to_vnode_bitmap.insert(actor.actor_id, bitmap);
}
}

if let Some(actor_mapping) = upstream_dispatcher_mapping.as_ref() {
*vnode_mapping = actor_mapping
.to_parallel_unit(&actor_to_parallel_unit)
.to_protobuf();
}
let vnode_mapping = if actor_to_vnode_bitmap.is_empty() {
// If there's no `vnode_bitmap`, then the fragment must be a singleton fragment.
// We directly use the single parallel unit to construct the mapping.
// TODO: also fill `vnode_bitmap` for the actor of singleton fragment so that we
// don't need this branch.
let parallel_unit = *actor_to_parallel_unit.values().exactly_one().unwrap();
ParallelUnitMapping::new_single(parallel_unit)
} else {
// Generate the parallel unit mapping from the fragment's actor bitmaps.
assert_eq!(actor_to_vnode_bitmap.len(), actor_to_parallel_unit.len());
ActorMapping::from_bitmaps(&actor_to_vnode_bitmap)
.to_parallel_unit(&actor_to_parallel_unit)
}
.to_protobuf();

if !fragment.state_table_ids.is_empty() {
let fragment_mapping = FragmentParallelUnitMapping {
fragment_id: fragment_id as FragmentId,
mapping: Some(vnode_mapping.clone()),
};
fragment_mapping_to_notify.push(fragment_mapping);
}
*fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone();

if !fragment.state_table_ids.is_empty() {
let fragment_mapping = FragmentParallelUnitMapping {
fragment_id: fragment_id as FragmentId,
mapping: Some(vnode_mapping),
};
fragment_mapping_to_notify.push(fragment_mapping);
}

// Second step, update upstream fragments
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/test_scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
for parallel_unit_num in simulated_parallel_unit_nums(None, None) {
let (actor_mapping, _) = generate_actor_mapping(parallel_unit_num);

let actor_to_parallel_unit_map = (0..parallel_unit_num)
let actor_to_parallel_unit_map: HashMap<_, _> = (0..parallel_unit_num)
.map(|i| (i as ActorId, i as ParallelUnitId))
.collect();
let parallel_unit_mapping = actor_mapping.to_parallel_unit(&actor_to_parallel_unit_map);
Expand Down

0 comments on commit 8c5489e

Please sign in to comment.