Skip to content

Commit

Permalink
wasm: add wasmEdge container process to cgroup
Browse files Browse the repository at this point in the history
Signed-off-by: Poorunga <2744323@qq.com>
  • Loading branch information
Poorunga committed Sep 9, 2023
1 parent 117a271 commit 48a54d8
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 58 deletions.
9 changes: 4 additions & 5 deletions quark/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ impl Sandboxer for QuarkSandboxer {
}

async fn sandbox(&self, id: &str) -> Result<Arc<Mutex<Self::Sandbox>>> {
return Ok(self
Ok(self
.sandboxes
.read()
.await
.get(id)
.ok_or_else(|| Error::NotFound(id.to_string()))?
.clone());
.clone())
}

async fn stop(&self, id: &str, _force: bool) -> Result<()> {
Expand Down Expand Up @@ -210,10 +210,9 @@ impl Sandbox for QuarkSandbox {
}

async fn container(&self, id: &str) -> Result<&Self::Container> {
return self
.containers
self.containers
.get(id)
.ok_or(Error::NotFound(format!("no container id {} found", id)));
.ok_or(Error::NotFound(format!("no container id {} found", id)))
}

async fn append_container(&mut self, id: &str, option: ContainerOption) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion vmm/sandbox/src/cloud_hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl VM for CloudHypervisorVM {
}

async fn wait_channel(&self) -> Option<Receiver<(u32, i128)>> {
return self.wait_chan.clone();
self.wait_chan.clone()
}

async fn vcpus(&self) -> Result<VcpuThreads> {
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/qemu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl VM for QemuVM {
}

async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> {
return match device_info {
match device_info {
DeviceInfo::Block(blk_info) => {
let device = VirtioBlockDevice::new(
"",
Expand Down Expand Up @@ -259,7 +259,7 @@ impl VM for QemuVM {
// address is not import for char devices as guest will find the device by the name
Ok((BusType::PCI, char_info.name.clone()))
}
};
}
}

async fn hot_detach(&mut self, id: &str) -> Result<()> {
Expand Down Expand Up @@ -300,7 +300,7 @@ impl VM for QemuVM {
}

async fn wait_channel(&self) -> Option<Receiver<(u32, i128)>> {
return self.wait_chan.clone();
self.wait_chan.clone()
}

async fn vcpus(&self) -> Result<VcpuThreads> {
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,13 @@ where
}

async fn sandbox(&self, id: &str) -> Result<Arc<Mutex<Self::Sandbox>>> {
return Ok(self
Ok(self
.sandboxes
.read()
.await
.get(id)
.ok_or_else(|| Error::NotFound(id.to_string()))?
.clone());
.clone())
}

async fn stop(&self, id: &str, force: bool) -> Result<()> {
Expand Down Expand Up @@ -384,7 +384,7 @@ where
}

async fn exit_signal(&self) -> Result<Arc<ExitSignal>> {
return Ok(self.exit_signal.clone());
Ok(self.exit_signal.clone())
}

fn get_data(&self) -> Result<SandboxData> {
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/stratovirt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl VM for StratoVirtVM {
}

async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> {
return match device_info {
match device_info {
DeviceInfo::Block(blk_info) => {
let device = VirtioBlockDevice::new(
"",
Expand All @@ -234,7 +234,7 @@ impl VM for StratoVirtVM {
DeviceInfo::Char(_char_info) => Err(Error::Unimplemented(
"hot attach for char device".to_string(),
)),
};
}
}

async fn hot_detach(&mut self, _id: &str) -> Result<()> {
Expand All @@ -252,7 +252,7 @@ impl VM for StratoVirtVM {
}

async fn wait_channel(&self) -> Option<Receiver<(u32, i128)>> {
return self.wait_chan.clone();
self.wait_chan.clone()
}

async fn vcpus(&self) -> Result<VcpuThreads> {
Expand Down
16 changes: 15 additions & 1 deletion wasm/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ oci-spec = "0.5.4"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
log = { version = "0.4.17", features = ["std"] }
time = "0.3.5"
cgroups-rs = "0.3.2"

wasmedge-sdk = { version = "0.7.1", optional = true }

Expand Down
8 changes: 4 additions & 4 deletions wasm/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ impl Sandboxer for WasmSandboxer {
}

async fn sandbox(&self, id: &str) -> Result<Arc<Mutex<Self::Sandbox>>> {
return Ok(self
Ok(self
.sandboxes
.read()
.await
.get(id)
.ok_or_else(|| Error::NotFound(id.to_string()))?
.clone());
.clone())
}

async fn stop(&self, id: &str, _force: bool) -> Result<()> {
Expand Down Expand Up @@ -212,9 +212,9 @@ impl Sandbox for WasmSandbox {
}

async fn container(&self, id: &str) -> Result<&Self::Container> {
return self.containers.get(id).ok_or(Error::NotFound(format!(
self.containers.get(id).ok_or(Error::NotFound(format!(
"failed to find container by id {id}"
)));
)))
}

async fn append_container(&mut self, id: &str, option: ContainerOption) -> Result<()> {
Expand Down
14 changes: 14 additions & 0 deletions wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ pub fn get_memory_limit(spec: &Spec) -> Option<i64> {
.and_then(|x| x.memory().as_ref())
.and_then(|x| x.limit())
}

pub(crate) fn get_rootfs(spec: &Spec) -> Option<String> {
spec.root()
.as_ref()
.map(|root| root.path().display().to_string())
}

#[cfg(feature = "wasmedge")]
pub(crate) fn get_cgroup_path(spec: &Spec) -> Option<String> {
spec.linux()
.as_ref()
.and_then(|linux| linux.cgroups_path().as_ref())
.map(|path| path.display().to_string())
}
65 changes: 40 additions & 25 deletions wasm/src/wasmedge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
sync::Arc,
};

use cgroups_rs::{Cgroup, CgroupPid};
use containerd_shim::{
api::{CreateTaskRequest, ExecProcessRequest, Status},
asynchronous::{
Expand Down Expand Up @@ -55,7 +56,7 @@ use wasmedge_sdk::{
params, PluginManager, Vm,
};

use crate::utils;
use crate::utils::{get_args, get_cgroup_path, get_envs, get_preopens, get_rootfs};

pub type ExecProcess = ProcessTemplate<WasmEdgeExecLifecycle>;
pub type InitProcess = ProcessTemplate<WasmEdgeInitLifecycle>;
Expand Down Expand Up @@ -115,16 +116,12 @@ impl ContainerFactory<WasmEdgeContainer> for WasmEdgeContainerFactory {
let mut spec: Spec = read_spec(req.bundle()).await?;
spec.canonicalize_rootfs(req.bundle())
.map_err(|e| Error::InvalidArgument(format!("could not canonicalize rootfs: {e}")))?;
let rootfs = spec
.root()
.as_ref()
.ok_or(Error::InvalidArgument(
"rootfs is not set in runtime spec".to_string(),
))?
.path();
mkdir(rootfs, 0o711).await?;
let rootfs = get_rootfs(&spec).ok_or_else(|| {
Error::InvalidArgument("rootfs is not set in runtime spec".to_string())
})?;
mkdir(&rootfs, 0o711).await?;
for m in req.rootfs() {
mount_rootfs(m, rootfs.as_path()).await?
mount_rootfs(m, &rootfs).await?
}
let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal);
let exit_signal = Arc::new(Default::default());
Expand Down Expand Up @@ -160,17 +157,13 @@ impl ProcessLifecycle<InitProcess> for WasmEdgeInitLifecycle {
async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
let spec = &p.lifecycle.spec;
let vm = p.lifecycle.prototype_vm.clone();
let args = utils::get_args(spec);
let envs = utils::get_envs(spec);
let rootfs = spec
.root()
.as_ref()
.ok_or(Error::InvalidArgument(
"rootfs is not set in runtime spec".to_string(),
))?
.path();
let mut preopens = vec![format!("/:{}", rootfs.display())];
preopens.append(&mut utils::get_preopens(spec));
let args = get_args(spec);
let envs = get_envs(spec);
let rootfs = get_rootfs(spec).ok_or_else(|| {
Error::InvalidArgument("rootfs is not set in runtime spec".to_string())
})?;
let mut preopens = vec![format!("/:{}", rootfs)];
preopens.append(&mut get_preopens(spec));

debug!(
"start wasm with args: {:?}, envs: {:?}, preopens: {:?}",
Expand All @@ -188,6 +181,18 @@ impl ProcessLifecycle<InitProcess> for WasmEdgeInitLifecycle {
p.pid = init_pid;
}
ForkResult::Child => {
if let Some(cgroup_path) = get_cgroup_path(spec) {
// Add child process to Cgroup
Cgroup::new(
cgroups_rs::hierarchies::auto(),
cgroup_path.trim_start_matches('/'),
)
.and_then(|cgroup| cgroup.add_task(CgroupPid::from(std::process::id() as u64)))
.map_err(other_error!(
e,
format!("failed to add task to cgroup: {}", cgroup_path)
))?;
}
match run_wasi_func(vm, args, envs, preopens, p) {
Ok(_) => exit(0),
// TODO add a pipe? to return detailed error message
Expand Down Expand Up @@ -216,7 +221,19 @@ impl ProcessLifecycle<InitProcess> for WasmEdgeInitLifecycle {
Ok(())
}

async fn delete(&self, _p: &mut InitProcess) -> containerd_shim::Result<()> {
async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
if let Some(cgroup_path) = get_cgroup_path(&p.lifecycle.spec) {
// Add child process to Cgroup
Cgroup::load(
cgroups_rs::hierarchies::auto(),
cgroup_path.trim_start_matches('/'),
)
.delete()
.map_err(other_error!(
e,
format!("failed to delete cgroup: {}", cgroup_path)
))?;
}
Ok(())
}

Expand Down Expand Up @@ -263,9 +280,7 @@ impl ProcessLifecycle<ExecProcess> for WasmEdgeExecLifecycle {
}

async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> {
Err(Error::Unimplemented(
"exec not supported for wasm containers".to_string(),
))
Ok(())
}

async fn update(
Expand Down
22 changes: 9 additions & 13 deletions wasm/src/wasmtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::fs::read;
use wasmtime::{Config, Engine, Extern, Linker, Module, Store, StoreLimits, StoreLimitsBuilder};
use wasmtime_wasi::{tokio::WasiCtxBuilder, WasiCtx};

use crate::utils;
use crate::utils::{get_args, get_kv_envs, get_memory_limit, get_rootfs};

pub type ExecProcess = ProcessTemplate<WasmtimeExecLifecycle>;
pub type InitProcess = ProcessTemplate<WasmtimeInitLifecycle>;
Expand Down Expand Up @@ -59,16 +59,12 @@ impl ContainerFactory<WasmtimeContainer> for WasmtimeContainerFactory {
let mut spec: Spec = read_spec(req.bundle()).await?;
spec.canonicalize_rootfs(req.bundle())
.map_err(|e| Error::InvalidArgument(format!("could not canonicalize rootfs: {e}")))?;
let rootfs = spec
.root()
.as_ref()
.ok_or(Error::InvalidArgument(
"rootfs is not set in runtime spec".to_string(),
))?
.path();
mkdir(rootfs, 0o711).await?;
let rootfs = get_rootfs(&spec).ok_or_else(|| {
Error::InvalidArgument("rootfs is not set in runtime spec".to_string())
})?;
mkdir(&rootfs, 0o711).await?;
for m in req.rootfs() {
mount_rootfs(m, rootfs.as_path()).await?
mount_rootfs(m, &rootfs).await?
}
let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal);
let exit_signal = Arc::new(Default::default());
Expand Down Expand Up @@ -115,8 +111,8 @@ impl WasmtimeInitLifecycle {
#[async_trait::async_trait]
impl ProcessLifecycle<InitProcess> for WasmtimeInitLifecycle {
async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
let args = utils::get_args(&self.spec);
let envs = utils::get_kv_envs(&self.spec);
let args = get_args(&self.spec);
let envs = get_kv_envs(&self.spec);
let root = self
.spec
.root()
Expand Down Expand Up @@ -165,7 +161,7 @@ impl ProcessLifecycle<InitProcess> for WasmtimeInitLifecycle {
let ctx = builder.build();

let mut limits_builder = StoreLimitsBuilder::new();
if let Some(memory_size) = utils::get_memory_limit(&self.spec).map(|x| x as usize) {
if let Some(memory_size) = get_memory_limit(&self.spec).map(|x| x as usize) {
limits_builder = limits_builder.memory_size(memory_size);
}
let limiter = limits_builder.build();
Expand Down

0 comments on commit 48a54d8

Please sign in to comment.