Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: round-robin selector #4024

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ impl TableMetadataAllocator {

pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;

/// [PeerAllocator] allocates [Peer]s for creating regions.
/// [`PeerAllocator`] allocates [`Peer`]s for creating regions.
#[async_trait]
pub trait PeerAllocator: Send + Sync {
/// Allocates `regions` size [Peer]s.
/// Allocates `regions` size [`Peer`]s.
async fn alloc(&self, ctx: &TableMetadataAllocatorContext, regions: usize)
-> Result<Vec<Peer>>;
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
use crate::selector::SelectorType;
use crate::service::admin;
use crate::{error, Result};
Expand Down Expand Up @@ -228,6 +229,7 @@ pub async fn metasrv_builder(
let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef,
};

Ok(MetasrvBuilder::new()
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ impl MetaPeerClient {
.map(|election| election.is_leader())
.unwrap_or(true)
}

#[cfg(test)]
pub(crate) fn memory_backend(&self) -> ResettableKvBackendRef {
self.in_memory.clone()
}
}

fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<StatKey, StatValue>> {
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod common;
pub mod lease_based;
pub mod load_based;
pub mod round_robin;
mod weight_compute;
mod weighted_choose;

Expand Down Expand Up @@ -61,6 +62,7 @@ pub enum SelectorType {
#[default]
LoadBased,
LeaseBased,
RoundRobin,
}

impl TryFrom<&str> for SelectorType {
Expand All @@ -70,6 +72,7 @@ impl TryFrom<&str> for SelectorType {
match value {
"load_based" | "LoadBased" => Ok(SelectorType::LoadBased),
"lease_based" | "LeaseBased" => Ok(SelectorType::LeaseBased),
"round_robin" | "RoundRobin" => Ok(SelectorType::RoundRobin),
other => error::UnsupportedSelectorTypeSnafu {
selector_type: other,
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/selector/lease_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::selector::common::choose_peers;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Namespace, Selector, SelectorOptions};

/// Select all alive datanodes based using a random weighted choose.
pub struct LeaseBasedSelector;

#[async_trait::async_trait]
Expand Down
138 changes: 138 additions & 0 deletions src/meta-srv/src/selector/round_robin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicUsize;

use common_meta::peer::Peer;
use snafu::ensure;

use crate::error::{NoEnoughAvailableDatanodeSnafu, Result};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::{Namespace, Selector, SelectorOptions};

/// Round-robin selector that returns the next peer in the list in sequence.
/// Datanodes are ordered by their node_id.
///
/// This selector is useful when you want to distribute the load evenly across
/// all datanodes. But **it's not recommended** to use this selector in serious
/// production environments because it doesn't take into account the load of
/// each datanode.
#[derive(Default)]
pub struct RoundRobinSelector {
counter: AtomicUsize,
}

#[async_trait::async_trait]
impl Selector for RoundRobinSelector {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(
&self,
ns: Namespace,
ctx: &Self::Context,
opts: SelectorOptions,
) -> Result<Vec<Peer>> {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?;

// 2. map into peers and sort on node id
let mut peers: Vec<Peer> = lease_kvs
.into_iter()
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect();
peers.sort_by_key(|p| p.id);
ensure!(
!peers.is_empty(),
NoEnoughAvailableDatanodeSnafu {
required: opts.min_required_items,
available: 0usize,
}
);

// 3. choose peers
let mut selected = Vec::with_capacity(opts.min_required_items);
for _ in 0..opts.min_required_items {
let idx = self
.counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% peers.len();
selected.push(peers[idx].clone());
}

Ok(selected)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::test_util::{create_selector_context, put_datanodes};

#[tokio::test]
async fn test_round_robin_selector() {
let selector = RoundRobinSelector::default();
let ctx = create_selector_context();
let ns = 0;

// add three nodes
let peer1 = Peer {
id: 2,
addr: "node1".to_string(),
};
let peer2 = Peer {
id: 5,
addr: "node2".to_string(),
};
let peer3 = Peer {
id: 8,
addr: "node3".to_string(),
};
let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
put_datanodes(ns, &ctx.meta_peer_client, peers).await;

let peers = selector
.select(
ns,
&ctx,
SelectorOptions {
min_required_items: 4,
allow_duplication: true,
},
)
.await
.unwrap();
assert_eq!(peers.len(), 4);
assert_eq!(
peers,
vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
);

let peers = selector
.select(
ns,
&ctx,
SelectorOptions {
min_required_items: 2,
allow_duplication: true,
},
)
.await
.unwrap();
assert_eq!(peers.len(), 2);
assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
}
}
8 changes: 8 additions & 0 deletions src/meta-srv/src/table_meta_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ pub struct MetasrvPeerAllocator {
}

impl MetasrvPeerAllocator {
/// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
Self { ctx, selector }
}

/// Allocates a specified number (by `regions`) of [`Peer`] instances based on the given
/// [`TableMetadataAllocatorContext`] and number of regions. The returned peers will have
/// the same length as the number of regions.
///
/// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
/// no guarantee that how the returned peers are used, like whether they are from the same
/// table or not. So this method isn't idempotent.
async fn alloc(
&self,
ctx: &TableMetadataAllocatorContext,
Expand Down
68 changes: 52 additions & 16 deletions src/meta-srv/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::ClusterId;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_time::util as time_util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;

use crate::cluster::MetaPeerClientBuilder;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::{LeaseKey, LeaseValue};
use crate::lock::memory::MemLock;
use crate::metasrv::SelectorContext;
use crate::procedure::region_failover::RegionFailoverManager;
Expand All @@ -54,17 +57,9 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64)
}
}

pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());

let pushers = Pushers::default();
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);

let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));

/// Builds and returns a [`SelectorContext`]. To access its inner state,
/// use `memory_backend` on [`MetaPeerClientRef`].
pub(crate) fn create_selector_context() -> SelectorContext {
let in_memory = Arc::new(MemoryKvBackend::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
Expand All @@ -74,15 +69,30 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
// Safety: all required fields set at initialization
.unwrap();

let selector = Arc::new(LeaseBasedSelector);
let selector_ctx = SelectorContext {
SelectorContext {
datanode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_backend: kv_backend.clone(),
kv_backend: in_memory,
meta_peer_client,
table_id: None,
};
}
}

pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());

let pushers = Pushers::default();
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);

let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));

let selector = Arc::new(LeaseBasedSelector);
let selector_ctx = create_selector_context();

let in_memory = Arc::new(MemoryKvBackend::new());
Arc::new(RegionFailoverManager::new(
10,
in_memory,
Expand Down Expand Up @@ -157,3 +167,29 @@ pub(crate) async fn prepare_table_region_and_info_value(
.await
.unwrap();
}

pub(crate) async fn put_datanodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
datanodes: Vec<Peer>,
) {
let backend = meta_peer_client.memory_backend();
for datanode in datanodes {
let lease_key = LeaseKey {
cluster_id,
node_id: datanode.id,
};
let lease_value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: datanode.addr,
};
let lease_key_bytes: Vec<u8> = lease_key.try_into().unwrap();
let lease_value_bytes: Vec<u8> = lease_value.try_into().unwrap();
let put_request = common_meta::rpc::store::PutRequest {
key: lease_key_bytes,
value: lease_value_bytes,
..Default::default()
};
backend.put(put_request).await.unwrap();
}
}