From b6c18c49d06d5b26d6affa18e25fa90a749b5ef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luiz=20Felipe=20Gon=C3=A7alves?= Date: Fri, 21 Jun 2024 11:44:26 -0300 Subject: [PATCH] feat(deployer): add round robin node allocator --- ctl/src/deployer/alloc.rs | 24 ++++++++++++++++++++++-- ctl/src/deployer/mod.rs | 2 +- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/ctl/src/deployer/alloc.rs b/ctl/src/deployer/alloc.rs index 3a1c085..1aebf78 100644 --- a/ctl/src/deployer/alloc.rs +++ b/ctl/src/deployer/alloc.rs @@ -1,6 +1,9 @@ //! Worker allocation algorithms. -use std::net::IpAddr; +use std::{ + net::IpAddr, + sync::atomic::{AtomicUsize, Ordering}, +}; use proto::common::instance::InstanceId; use rand::seq::SliceRandom; @@ -10,6 +13,7 @@ use crate::worker_mgr::WorkerDetails; /// Randomly allocates, using an uniform distribution, instances for the give /// amount of instances and the provided pool of `workers`. +#[allow(dead_code)] pub fn rand_many( workers: &[WorkerDetails], instances: u32, @@ -22,6 +26,22 @@ pub fn rand_many( } /// Randomly allocates a single instance from the provided pool of `workers`. -pub fn _rand_single(workers: &[WorkerDetails]) -> (InstanceId, IpAddr) { +#[allow(dead_code)] +pub fn rand_single(workers: &[WorkerDetails]) -> (InstanceId, IpAddr) { rand_many(workers, 1).next().unwrap() } + +pub static COUNTER: AtomicUsize = AtomicUsize::new(0); + +#[allow(dead_code)] +pub fn rr_alloc_many( + workers: &[WorkerDetails], + instances: u32, +) -> impl Iterator + '_ { + (0..instances) + .map(move |_| { + let i = COUNTER.fetch_add(1, Ordering::Relaxed); + &workers[i % workers.len()] + }) + .map(|w| (InstanceId(Uuid::now_v7()), w.addr)) +} diff --git a/ctl/src/deployer/mod.rs b/ctl/src/deployer/mod.rs index 970b35a..0bc4be3 100644 --- a/ctl/src/deployer/mod.rs +++ b/ctl/src/deployer/mod.rs @@ -123,7 +123,7 @@ impl Deployer { if workers.is_empty() { bail!("no workers on cluster pool"); } - let instances = alloc::rand_many(&workers, spec.concurrency); + let instances = alloc::rr_alloc_many(&workers, spec.concurrency); let deployment_id = DeploymentId(Uuid::now_v7()); let service_id = Arc::new(spec.service_id.clone());