From 48a54d86f1acefee4a843753ed4255c17e4fe522 Mon Sep 17 00:00:00 2001 From: Poorunga <2744323@qq.com> Date: Fri, 8 Sep 2023 18:02:24 +0800 Subject: [PATCH] wasm: add wasmEdge container process to cgroup Signed-off-by: Poorunga <2744323@qq.com> --- quark/src/sandbox.rs | 9 ++-- vmm/sandbox/src/cloud_hypervisor/mod.rs | 2 +- vmm/sandbox/src/qemu/mod.rs | 6 +-- vmm/sandbox/src/sandbox.rs | 6 +-- vmm/sandbox/src/stratovirt/mod.rs | 6 +-- wasm/Cargo.lock | 16 +++++- wasm/Cargo.toml | 1 + wasm/src/sandbox.rs | 8 +-- wasm/src/utils.rs | 14 ++++++ wasm/src/wasmedge.rs | 65 +++++++++++++++---------- wasm/src/wasmtime.rs | 22 ++++----- 11 files changed, 97 insertions(+), 58 deletions(-) diff --git a/quark/src/sandbox.rs b/quark/src/sandbox.rs index d1ae7c8d..cf5b8c24 100644 --- a/quark/src/sandbox.rs +++ b/quark/src/sandbox.rs @@ -148,13 +148,13 @@ impl Sandboxer for QuarkSandboxer { } async fn sandbox(&self, id: &str) -> Result>> { - return Ok(self + Ok(self .sandboxes .read() .await .get(id) .ok_or_else(|| Error::NotFound(id.to_string()))? - .clone()); + .clone()) } async fn stop(&self, id: &str, _force: bool) -> Result<()> { @@ -210,10 +210,9 @@ impl Sandbox for QuarkSandbox { } async fn container(&self, id: &str) -> Result<&Self::Container> { - return self - .containers + self.containers .get(id) - .ok_or(Error::NotFound(format!("no container id {} found", id))); + .ok_or(Error::NotFound(format!("no container id {} found", id))) } async fn append_container(&mut self, id: &str, option: ContainerOption) -> Result<()> { diff --git a/vmm/sandbox/src/cloud_hypervisor/mod.rs b/vmm/sandbox/src/cloud_hypervisor/mod.rs index 7223b764..bd13ede7 100644 --- a/vmm/sandbox/src/cloud_hypervisor/mod.rs +++ b/vmm/sandbox/src/cloud_hypervisor/mod.rs @@ -253,7 +253,7 @@ impl VM for CloudHypervisorVM { } async fn wait_channel(&self) -> Option> { - return self.wait_chan.clone(); + self.wait_chan.clone() } async fn vcpus(&self) -> Result { diff --git a/vmm/sandbox/src/qemu/mod.rs b/vmm/sandbox/src/qemu/mod.rs index c42eb77e..d4ac72af 100644 --- a/vmm/sandbox/src/qemu/mod.rs +++ b/vmm/sandbox/src/qemu/mod.rs @@ -212,7 +212,7 @@ impl VM for QemuVM { } async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> { - return match device_info { + match device_info { DeviceInfo::Block(blk_info) => { let device = VirtioBlockDevice::new( "", @@ -259,7 +259,7 @@ impl VM for QemuVM { // address is not import for char devices as guest will find the device by the name Ok((BusType::PCI, char_info.name.clone())) } - }; + } } async fn hot_detach(&mut self, id: &str) -> Result<()> { @@ -300,7 +300,7 @@ impl VM for QemuVM { } async fn wait_channel(&self) -> Option> { - return self.wait_chan.clone(); + self.wait_chan.clone() } async fn vcpus(&self) -> Result { diff --git a/vmm/sandbox/src/sandbox.rs b/vmm/sandbox/src/sandbox.rs index 4ee69de8..b4209080 100644 --- a/vmm/sandbox/src/sandbox.rs +++ b/vmm/sandbox/src/sandbox.rs @@ -284,13 +284,13 @@ where } async fn sandbox(&self, id: &str) -> Result>> { - return Ok(self + Ok(self .sandboxes .read() .await .get(id) .ok_or_else(|| Error::NotFound(id.to_string()))? - .clone()); + .clone()) } async fn stop(&self, id: &str, force: bool) -> Result<()> { @@ -384,7 +384,7 @@ where } async fn exit_signal(&self) -> Result> { - return Ok(self.exit_signal.clone()); + Ok(self.exit_signal.clone()) } fn get_data(&self) -> Result { diff --git a/vmm/sandbox/src/stratovirt/mod.rs b/vmm/sandbox/src/stratovirt/mod.rs index b91fd182..30bb2f29 100644 --- a/vmm/sandbox/src/stratovirt/mod.rs +++ b/vmm/sandbox/src/stratovirt/mod.rs @@ -209,7 +209,7 @@ impl VM for StratoVirtVM { } async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> { - return match device_info { + match device_info { DeviceInfo::Block(blk_info) => { let device = VirtioBlockDevice::new( "", @@ -234,7 +234,7 @@ impl VM for StratoVirtVM { DeviceInfo::Char(_char_info) => Err(Error::Unimplemented( "hot attach for char device".to_string(), )), - }; + } } async fn hot_detach(&mut self, _id: &str) -> Result<()> { @@ -252,7 +252,7 @@ impl VM for StratoVirtVM { } async fn wait_channel(&self) -> Option> { - return self.wait_chan.clone(); + self.wait_chan.clone() } async fn vcpus(&self) -> Result { diff --git a/wasm/Cargo.lock b/wasm/Cargo.lock index f2c98ecb..117f7330 100644 --- a/wasm/Cargo.lock +++ b/wasm/Cargo.lock @@ -333,6 +333,19 @@ dependencies = [ "regex", ] +[[package]] +name = "cgroups-rs" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb3af90c8d48ad5f432d8afb521b5b40c2a2fce46dd60e05912de51c47fba64" +dependencies = [ + "libc", + "log", + "nix 0.25.1", + "regex", + "thiserror", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -397,7 +410,7 @@ version = "0.3.0" source = "git+https://github.com/kuasar-io/rust-extensions.git#6ae99540b754cd28c5389d5d6fdeff6ec7290ec5" dependencies = [ "async-trait", - "cgroups-rs", + "cgroups-rs 0.2.11", "command-fds", "containerd-shim-protos", "futures", @@ -2924,6 +2937,7 @@ dependencies = [ "anyhow", "async-trait", "cap-std", + "cgroups-rs 0.3.3", "containerd-sandbox", "containerd-shim", "env_logger 0.9.3", diff --git a/wasm/Cargo.toml b/wasm/Cargo.toml index a2ca7873..9ed6c230 100644 --- a/wasm/Cargo.toml +++ b/wasm/Cargo.toml @@ -22,6 +22,7 @@ oci-spec = "0.5.4" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } log = { version = "0.4.17", features = ["std"] } time = "0.3.5" +cgroups-rs = "0.3.2" wasmedge-sdk = { version = "0.7.1", optional = true } diff --git a/wasm/src/sandbox.rs b/wasm/src/sandbox.rs index 923d42cd..ddc13742 100644 --- a/wasm/src/sandbox.rs +++ b/wasm/src/sandbox.rs @@ -88,13 +88,13 @@ impl Sandboxer for WasmSandboxer { } async fn sandbox(&self, id: &str) -> Result>> { - return Ok(self + Ok(self .sandboxes .read() .await .get(id) .ok_or_else(|| Error::NotFound(id.to_string()))? - .clone()); + .clone()) } async fn stop(&self, id: &str, _force: bool) -> Result<()> { @@ -212,9 +212,9 @@ impl Sandbox for WasmSandbox { } async fn container(&self, id: &str) -> Result<&Self::Container> { - return self.containers.get(id).ok_or(Error::NotFound(format!( + self.containers.get(id).ok_or(Error::NotFound(format!( "failed to find container by id {id}" - ))); + ))) } async fn append_container(&mut self, id: &str, option: ContainerOption) -> Result<()> { diff --git a/wasm/src/utils.rs b/wasm/src/utils.rs index d4eff693..5686f787 100644 --- a/wasm/src/utils.rs +++ b/wasm/src/utils.rs @@ -75,3 +75,17 @@ pub fn get_memory_limit(spec: &Spec) -> Option { .and_then(|x| x.memory().as_ref()) .and_then(|x| x.limit()) } + +pub(crate) fn get_rootfs(spec: &Spec) -> Option { + spec.root() + .as_ref() + .map(|root| root.path().display().to_string()) +} + +#[cfg(feature = "wasmedge")] +pub(crate) fn get_cgroup_path(spec: &Spec) -> Option { + spec.linux() + .as_ref() + .and_then(|linux| linux.cgroups_path().as_ref()) + .map(|path| path.display().to_string()) +} diff --git a/wasm/src/wasmedge.rs b/wasm/src/wasmedge.rs index a5c97d14..ac14ad49 100644 --- a/wasm/src/wasmedge.rs +++ b/wasm/src/wasmedge.rs @@ -23,6 +23,7 @@ use std::{ sync::Arc, }; +use cgroups_rs::{Cgroup, CgroupPid}; use containerd_shim::{ api::{CreateTaskRequest, ExecProcessRequest, Status}, asynchronous::{ @@ -55,7 +56,7 @@ use wasmedge_sdk::{ params, PluginManager, Vm, }; -use crate::utils; +use crate::utils::{get_args, get_cgroup_path, get_envs, get_preopens, get_rootfs}; pub type ExecProcess = ProcessTemplate; pub type InitProcess = ProcessTemplate; @@ -115,16 +116,12 @@ impl ContainerFactory for WasmEdgeContainerFactory { let mut spec: Spec = read_spec(req.bundle()).await?; spec.canonicalize_rootfs(req.bundle()) .map_err(|e| Error::InvalidArgument(format!("could not canonicalize rootfs: {e}")))?; - let rootfs = spec - .root() - .as_ref() - .ok_or(Error::InvalidArgument( - "rootfs is not set in runtime spec".to_string(), - ))? - .path(); - mkdir(rootfs, 0o711).await?; + let rootfs = get_rootfs(&spec).ok_or_else(|| { + Error::InvalidArgument("rootfs is not set in runtime spec".to_string()) + })?; + mkdir(&rootfs, 0o711).await?; for m in req.rootfs() { - mount_rootfs(m, rootfs.as_path()).await? + mount_rootfs(m, &rootfs).await? } let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal); let exit_signal = Arc::new(Default::default()); @@ -160,17 +157,13 @@ impl ProcessLifecycle for WasmEdgeInitLifecycle { async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> { let spec = &p.lifecycle.spec; let vm = p.lifecycle.prototype_vm.clone(); - let args = utils::get_args(spec); - let envs = utils::get_envs(spec); - let rootfs = spec - .root() - .as_ref() - .ok_or(Error::InvalidArgument( - "rootfs is not set in runtime spec".to_string(), - ))? - .path(); - let mut preopens = vec![format!("/:{}", rootfs.display())]; - preopens.append(&mut utils::get_preopens(spec)); + let args = get_args(spec); + let envs = get_envs(spec); + let rootfs = get_rootfs(spec).ok_or_else(|| { + Error::InvalidArgument("rootfs is not set in runtime spec".to_string()) + })?; + let mut preopens = vec![format!("/:{}", rootfs)]; + preopens.append(&mut get_preopens(spec)); debug!( "start wasm with args: {:?}, envs: {:?}, preopens: {:?}", @@ -188,6 +181,18 @@ impl ProcessLifecycle for WasmEdgeInitLifecycle { p.pid = init_pid; } ForkResult::Child => { + if let Some(cgroup_path) = get_cgroup_path(spec) { + // Add child process to Cgroup + Cgroup::new( + cgroups_rs::hierarchies::auto(), + cgroup_path.trim_start_matches('/'), + ) + .and_then(|cgroup| cgroup.add_task(CgroupPid::from(std::process::id() as u64))) + .map_err(other_error!( + e, + format!("failed to add task to cgroup: {}", cgroup_path) + ))?; + } match run_wasi_func(vm, args, envs, preopens, p) { Ok(_) => exit(0), // TODO add a pipe? to return detailed error message @@ -216,7 +221,19 @@ impl ProcessLifecycle for WasmEdgeInitLifecycle { Ok(()) } - async fn delete(&self, _p: &mut InitProcess) -> containerd_shim::Result<()> { + async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> { + if let Some(cgroup_path) = get_cgroup_path(&p.lifecycle.spec) { + // Add child process to Cgroup + Cgroup::load( + cgroups_rs::hierarchies::auto(), + cgroup_path.trim_start_matches('/'), + ) + .delete() + .map_err(other_error!( + e, + format!("failed to delete cgroup: {}", cgroup_path) + ))?; + } Ok(()) } @@ -263,9 +280,7 @@ impl ProcessLifecycle for WasmEdgeExecLifecycle { } async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> { - Err(Error::Unimplemented( - "exec not supported for wasm containers".to_string(), - )) + Ok(()) } async fn update( diff --git a/wasm/src/wasmtime.rs b/wasm/src/wasmtime.rs index 1b95ffc1..3713da34 100644 --- a/wasm/src/wasmtime.rs +++ b/wasm/src/wasmtime.rs @@ -20,7 +20,7 @@ use tokio::fs::read; use wasmtime::{Config, Engine, Extern, Linker, Module, Store, StoreLimits, StoreLimitsBuilder}; use wasmtime_wasi::{tokio::WasiCtxBuilder, WasiCtx}; -use crate::utils; +use crate::utils::{get_args, get_kv_envs, get_memory_limit, get_rootfs}; pub type ExecProcess = ProcessTemplate; pub type InitProcess = ProcessTemplate; @@ -59,16 +59,12 @@ impl ContainerFactory for WasmtimeContainerFactory { let mut spec: Spec = read_spec(req.bundle()).await?; spec.canonicalize_rootfs(req.bundle()) .map_err(|e| Error::InvalidArgument(format!("could not canonicalize rootfs: {e}")))?; - let rootfs = spec - .root() - .as_ref() - .ok_or(Error::InvalidArgument( - "rootfs is not set in runtime spec".to_string(), - ))? - .path(); - mkdir(rootfs, 0o711).await?; + let rootfs = get_rootfs(&spec).ok_or_else(|| { + Error::InvalidArgument("rootfs is not set in runtime spec".to_string()) + })?; + mkdir(&rootfs, 0o711).await?; for m in req.rootfs() { - mount_rootfs(m, rootfs.as_path()).await? + mount_rootfs(m, &rootfs).await? } let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal); let exit_signal = Arc::new(Default::default()); @@ -115,8 +111,8 @@ impl WasmtimeInitLifecycle { #[async_trait::async_trait] impl ProcessLifecycle for WasmtimeInitLifecycle { async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> { - let args = utils::get_args(&self.spec); - let envs = utils::get_kv_envs(&self.spec); + let args = get_args(&self.spec); + let envs = get_kv_envs(&self.spec); let root = self .spec .root() @@ -165,7 +161,7 @@ impl ProcessLifecycle for WasmtimeInitLifecycle { let ctx = builder.build(); let mut limits_builder = StoreLimitsBuilder::new(); - if let Some(memory_size) = utils::get_memory_limit(&self.spec).map(|x| x as usize) { + if let Some(memory_size) = get_memory_limit(&self.spec).map(|x| x as usize) { limits_builder = limits_builder.memory_size(memory_size); } let limiter = limits_builder.build();