Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaned up bastion-executor #97

Merged
merged 2 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bastion-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ unstable = ["numanji", "allocator-suite", "jemallocator"]
[dependencies]
crossbeam-utils = "0.6"
crossbeam-epoch = "0.7"
fxhash = "0.2"
lazy_static = "1.4"
libc = "0.2"
num_cpus = "1.10"
rustc-hash = "1.0.1"
pin-utils = "0.1.0-alpha.4"
lightproc = { version = "= 0.3.3-alpha.1", "path" = "../lightproc" }

Expand Down
15 changes: 6 additions & 9 deletions bastion-executor/src/distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,24 @@
//!
//! Distributor provides a fair distribution of threads and pinning them to cores for fair execution.
//! It assigns threads in round-robin fashion to all cores.
use super::placement;
use super::placement::CoreId;
use super::run_queue::{Stealer, Worker};

use lightproc::prelude::*;

use crate::placement::{self, CoreId};
use crate::run_queue::{Stealer, Worker};
use crate::worker;
use lightproc::prelude::*;
use std::thread;

pub(crate) struct Distributor {
pub cores: Vec<CoreId>,
pub(crate) cores: Vec<CoreId>,
}

impl Distributor {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Distributor {
cores: placement::get_core_ids().expect("Core mapping couldn't be fetched"),
}
}

pub fn assign(self) -> Vec<Stealer<LightProc>> {
pub(crate) fn assign(self) -> Vec<Stealer<LightProc>> {
let mut stealers = Vec::<Stealer<LightProc>>::new();

for core in self.cores {
Expand Down
15 changes: 6 additions & 9 deletions bastion-executor/src/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@
//! Load balancer calculates sampled mean to provide average process execution amount
//! to all runtime.
//!

use super::placement;
use crate::load_balancer;
use crate::placement;
use crossbeam_utils::sync::ShardedLock;
use fxhash::FxHashMap;
use lazy_static::*;

use std::thread;

use super::load_balancer;
use crossbeam_utils::sync::ShardedLock;
use rustc_hash::FxHashMap;
use std::time::Duration;

///
/// Loadbalancer struct which is just a convenience wrapper over the statistics calculations.
/// Load-balancer struct which is just a convenience wrapper over the statistics calculations.
#[derive(Debug)]
pub struct LoadBalancer();
pub struct LoadBalancer;

impl LoadBalancer {
///
Expand Down
18 changes: 8 additions & 10 deletions bastion-executor/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
//! Pool management and tracking belongs here.
//! We spawn futures onto the pool with [spawn] method of global run queue or
//! with corresponding [Worker]'s spawn method.

use super::distributor::Distributor;

use super::run_queue::{Injector, Stealer};
use super::sleepers::Sleepers;
use super::worker;
use lazy_static::*;
use crate::distributor::Distributor;
use crate::run_queue::{Injector, Stealer};
use crate::sleepers::Sleepers;
use crate::worker;
use lazy_static::lazy_static;
use lightproc::prelude::*;
use std::future::Future;

Expand Down Expand Up @@ -53,13 +51,13 @@ where
pub struct Pool {
///
/// Global run queue implementation
pub injector: Injector<LightProc>,
pub(crate) injector: Injector<LightProc>,
///
/// Stealers of the workers
pub stealers: Vec<Stealer<LightProc>>,
pub(crate) stealers: Vec<Stealer<LightProc>>,
///
/// Container of parked threads
pub sleepers: Sleepers,
pub(crate) sleepers: Sleepers,
}

impl Pool {
Expand Down
5 changes: 2 additions & 3 deletions bastion-executor/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
//! Blocking run of the async processes
//!
//!
use super::worker;
use crate::worker;
use crossbeam_utils::sync::Parker;
use lightproc::proc_stack::ProcStack;
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::cell::{Cell, UnsafeCell};
use std::future::Future;
use std::mem::ManuallyDrop;
use std::pin::Pin;
Expand Down
13 changes: 3 additions & 10 deletions bastion-executor/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,15 @@
//! [`steal()`]: struct.Stealer.html#method.steal
//! [`steal_batch()`]: struct.Stealer.html#method.steal_batch
//! [`steal_batch_and_pop()`]: struct.Stealer.html#method.steal_batch_and_pop

extern crate crossbeam_epoch as epoch;
extern crate crossbeam_utils as utils;

use crossbeam_epoch::{self as epoch, Atomic, Owned};
use crossbeam_utils::{Backoff, CachePadded};
use std::cell::{Cell, UnsafeCell};
use std::cmp;
use std::fmt;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::ptr;
use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;

use epoch::{Atomic, Owned};
use utils::{Backoff, CachePadded};
use std::{cmp, fmt, ptr};

// Minimum buffer capacity.
const MIN_CAP: usize = 64;
Expand Down
13 changes: 4 additions & 9 deletions bastion-executor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@
//!
//! This worker implementation relies on worker run queue statistics which are hold in the pinned global memory
//! where workload distribution calculated and amended to their own local queues.

use std::cell::{Cell, UnsafeCell};
use std::ptr;

use super::pool;
use super::run_queue::Worker;
use crate::load_balancer;
use crate::pool::Pool;
use crate::run_queue::Steal;
use core::iter;
use crate::pool::{self, Pool};
use crate::run_queue::{Steal, Worker};
use lightproc::prelude::*;
use std::cell::{Cell, UnsafeCell};
use std::{iter, ptr};

///
/// Get the current process's stack
Expand Down
1 change: 0 additions & 1 deletion lightproc/src/proc_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//!
//! If we want to make an analogy, stack abstraction is similar to actor lifecycle abstractions
//! in frameworks like Akka, but tailored version for Rust environment.

use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down