Skip to content

Commit

Permalink
feat(workers): separate AVM pools [fixes NET-753] (#2125)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu authored Mar 8, 2024
1 parent f3a4d62 commit fd2552f
Show file tree
Hide file tree
Showing 45 changed files with 959 additions and 616 deletions.
1 change: 0 additions & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ jobs:
uses: fluencelabs/aqua/.github/workflows/tests.yml@main
with:
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
ref: renovate/fluencelabs-js-client-0.x

# registry:
# needs:
Expand Down
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ members = [
"crates/chain-data",
"crates/chain-types",
"crates/types",
"crates/core-manager"
]
"crates/core-manager",
]
exclude = [
"nox/tests/tetraplets",
]
Expand Down
40 changes: 36 additions & 4 deletions aquamarine/src/aqua_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
* limitations under the License.
*/

use std::str::FromStr;
use std::{error::Error, task::Waker};

use avm_server::avm_runner::{AVMRunner, RawAVMOutcome};
use avm_server::{AVMMemoryStats, AVMRuntimeLimits, CallResults, ParticleParameters, RunnerError};
use avm_server::{
AVMMemoryStats, AVMRuntimeLimits, CallRequests, CallResults, ParticleParameters, RunnerError,
};
use fluence_keypair::KeyPair;
use log::LevelFilter;
use libp2p::PeerId;
use tracing::Level;

use crate::config::VmConfig;
use crate::invoke::{parse_outcome, ExecutionError};
use crate::error::{ExecutionError, FieldError};
use crate::particle_effects::ParticleEffects;

pub trait AquaRuntime: Sized + Send + 'static {
Expand Down Expand Up @@ -98,10 +102,12 @@ impl AquaRuntime for AVMRunner {
particle_id,
"Executed particle, next_peer_pks is empty, no call requests. Nothing to do.",
);
if log::max_level() >= LevelFilter::Debug {

if tracing::enabled!(Level::DEBUG) {
let data = String::from_utf8_lossy(data.as_slice());
tracing::debug!(particle_id, "particle next_peer_pks = [], data: {}", data);
}

ParticleEffects::empty()
}
Err(ExecutionError::AquamarineError(err)) => {
Expand Down Expand Up @@ -144,3 +150,29 @@ impl AquaRuntime for AVMRunner {
self.memory_stats()
}
}

pub fn parse_outcome(
outcome: Result<RawAVMOutcome, RunnerError>,
) -> Result<(Vec<u8>, Vec<PeerId>, CallRequests), ExecutionError> {
let outcome = outcome.map_err(ExecutionError::AquamarineError)?;

let peer_ids = outcome
.next_peer_pks
.into_iter()
.map(|id| {
parse_peer_id(id.as_str()).map_err(|error| ExecutionError::InvalidResultField {
field: "next_peer_pks[..]",
error,
})
})
.collect::<Result<_, ExecutionError>>()?;

Ok((outcome.data, peer_ids, outcome.call_requests))
}

fn parse_peer_id(s: &str) -> Result<PeerId, FieldError> {
PeerId::from_str(s).map_err(|err| FieldError::InvalidPeerId {
peer_id: s.to_string(),
err: err.to_string(),
})
}
41 changes: 35 additions & 6 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::ExtendedParticle;
use particle_services::PeerScope;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};
use workers::{KeyStorage, PeerScopes, Workers};
use workers::{Event, KeyStorage, PeerScopes, Receiver, Workers};

use crate::aqua_runtime::AquaRuntime;
use crate::command::Command;
use crate::command::Command::{AddService, Ingest, RemoveService};
use crate::error::AquamarineApiError;
use crate::particle_effects::RemoteRoutingEffects;
use crate::vm_pool::VmPool;
use crate::{DataStoreConfig, ParticleDataStore, Plumber, VmPoolConfig};
use crate::{
AquaRuntime, DataStoreConfig, ParticleDataStore, Plumber, RemoteRoutingEffects, VmPoolConfig,
};

pub type EffectsChannel = mpsc::Sender<Result<RemoteRoutingEffects, AquamarineApiError>>;

pub struct AquamarineBackend<RT: AquaRuntime, F> {
inlet: mpsc::Receiver<Command>,
worker_events: Receiver<Event>,
plumber: Plumber<RT, F>,
out: EffectsChannel,
data_store: Arc<ParticleDataStore>,
Expand All @@ -62,6 +63,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
workers: Arc<Workers>,
key_storage: Arc<KeyStorage>,
scopes: PeerScopes,
worker_events: Receiver<Event>,
) -> eyre::Result<(Self, AquamarineApi)> {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
Expand All @@ -75,11 +77,12 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
let data_store: Arc<ParticleDataStore> = Arc::new(data_store);
let vm_pool = VmPool::new(
config.pool_size,
runtime_config,
runtime_config.clone(),
vm_pool_metrics,
health_registry,
);
let plumber = Plumber::new(
runtime_config,
vm_pool,
data_store.clone(),
builtins,
Expand All @@ -90,6 +93,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
);
let this = Self {
inlet,
worker_events,
plumber,
out,
data_store,
Expand All @@ -99,7 +103,7 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
}

pub fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut wake = false;
let mut wake = self.process_worker_events();

// check if there are new particles
loop {
Expand Down Expand Up @@ -143,6 +147,31 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
}
}

fn process_worker_events(&mut self) -> bool {
let mut wake = false;
loop {
let res = self.worker_events.try_recv();
match res {
Ok(event) => match event {
Event::WorkerCreated {
worker_id,
thread_count,
} => {
wake = true;
self.plumber.create_worker_pool(worker_id, thread_count);
}
Event::WorkerRemoved { worker_id } => {
self.plumber.remove_worker_pool(worker_id);
}
},
Err(_) => {
break;
}
}
}
wake
}

pub fn start(mut self) -> JoinHandle<()> {
let data_store = self.data_store.clone();
let mut stream = futures::stream::poll_fn(move |cx| self.poll(cx).map(|_| Some(()))).fuse();
Expand Down
30 changes: 15 additions & 15 deletions aquamarine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ use libp2p::PeerId;
use std::path::PathBuf;
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct VmPoolConfig {
/// Number of VMs to create
pub pool_size: usize,
/// Timeout of a particle execution
pub execution_timeout: Duration,
}

#[derive(Debug, Clone)]
pub struct VmConfig {
pub current_peer_id: PeerId,
Expand All @@ -44,13 +36,12 @@ pub struct VmConfig {
pub hard_limit_enabled: bool,
}

impl VmPoolConfig {
pub fn new(pool_size: usize, execution_timeout: Duration) -> Self {
Self {
pool_size,
execution_timeout,
}
}
#[derive(Debug, Clone)]
pub struct VmPoolConfig {
/// Number of VMs to create
pub pool_size: usize,
/// Timeout of a particle execution
pub execution_timeout: Duration,
}

impl VmConfig {
Expand All @@ -75,6 +66,15 @@ impl VmConfig {
}
}

impl VmPoolConfig {
pub fn new(pool_size: usize, execution_timeout: Duration) -> Self {
Self {
pool_size,
execution_timeout,
}
}
}

#[derive(Debug, Clone)]
pub struct DataStoreConfig {
/// Dir for the interpreter to persist particle data
Expand Down
54 changes: 51 additions & 3 deletions aquamarine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* limitations under the License.
*/

use avm_server::RunnerError;
use humantime::FormattedDuration;
use std::error::Error;
use std::fmt::{Display, Formatter};
use thiserror::Error;

use particle_protocol::ParticleError;
Expand Down Expand Up @@ -42,9 +45,7 @@ pub enum AquamarineApiError {
particle_id: String,
timeout: FormattedDuration,
},
#[error(
"AquamarineApiError::AquamarineQueueFull: can't send particle {particle_id:?} to Aquamarine"
)]
#[error("AquamarineApiError::AquamarineQueueFull: can't send particle {particle_id:?} to Aquamarine")]
AquamarineQueueFull { particle_id: Option<String> },
#[error("AquamarineApiError::SignatureVerificationFailed: particle_id = {particle_id}, error = {err}")]
SignatureVerificationFailed {
Expand Down Expand Up @@ -75,3 +76,50 @@ impl AquamarineApiError {
}
}
}

impl std::error::Error for ExecutionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self {
ExecutionError::InvalidResultField { error, .. } => Some(error),
ExecutionError::AquamarineError(err) => Some(err),
}
}
}

impl Display for ExecutionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ExecutionError::InvalidResultField { field, error } => {
write!(f, "Execution error: invalid result field {field}: {error}")
}
ExecutionError::AquamarineError(err) => {
write!(f, "Execution error: aquamarine error: {err}")
}
}
}
}

#[derive(Debug)]
pub enum FieldError {
InvalidPeerId { peer_id: String, err: String },
}

impl std::error::Error for FieldError {}
impl Display for FieldError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FieldError::InvalidPeerId { peer_id, err } => {
write!(f, "invalid PeerId '{peer_id}': {err}")
}
}
}
}

#[derive(Debug)]
pub enum ExecutionError {
InvalidResultField {
field: &'static str,
error: FieldError,
},
AquamarineError(RunnerError),
}
1 change: 1 addition & 0 deletions aquamarine/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl HealthCheck for VMPoolHealth {
#[cfg(test)]
mod tests {
use super::*;
use crate::health::VMPoolHealth;
use std::thread;

#[test]
Expand Down
Loading

0 comments on commit fd2552f

Please sign in to comment.