Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasm: add wasmEdge container process to cgroup #87

Merged
merged 1 commit into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions quark/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ impl Sandboxer for QuarkSandboxer {
}

async fn sandbox(&self, id: &str) -> Result<Arc<Mutex<Self::Sandbox>>> {
return Ok(self
Ok(self
.sandboxes
.read()
.await
.get(id)
.ok_or_else(|| Error::NotFound(id.to_string()))?
.clone());
.clone())
}

async fn stop(&self, id: &str, _force: bool) -> Result<()> {
Expand Down Expand Up @@ -210,10 +210,9 @@ impl Sandbox for QuarkSandbox {
}

async fn container(&self, id: &str) -> Result<&Self::Container> {
return self
.containers
self.containers
.get(id)
.ok_or(Error::NotFound(format!("no container id {} found", id)));
.ok_or(Error::NotFound(format!("no container id {} found", id)))
}

async fn append_container(&mut self, id: &str, option: ContainerOption) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion vmm/sandbox/src/cloud_hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl VM for CloudHypervisorVM {
}

async fn wait_channel(&self) -> Option<Receiver<(u32, i128)>> {
return self.wait_chan.clone();
self.wait_chan.clone()
}

async fn vcpus(&self) -> Result<VcpuThreads> {
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/qemu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl VM for QemuVM {
}

async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> {
return match device_info {
match device_info {
DeviceInfo::Block(blk_info) => {
let device = VirtioBlockDevice::new(
"",
Expand Down Expand Up @@ -259,7 +259,7 @@ impl VM for QemuVM {
// address is not import for char devices as guest will find the device by the name
Ok((BusType::PCI, char_info.name.clone()))
}
};
}
}

async fn hot_detach(&mut self, id: &str) -> Result<()> {
Expand Down Expand Up @@ -300,7 +300,7 @@ impl VM for QemuVM {
}

async fn wait_channel(&self) -> Option<Receiver<(u32, i128)>> {
return self.wait_chan.clone();
self.wait_chan.clone()
}

async fn vcpus(&self) -> Result<VcpuThreads> {
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,13 @@ where
}

async fn sandbox(&self, id: &str) -> Result<Arc<Mutex<Self::Sandbox>>> {
return Ok(self
Ok(self
.sandboxes
.read()
.await
.get(id)
.ok_or_else(|| Error::NotFound(id.to_string()))?
.clone());
.clone())
}

async fn stop(&self, id: &str, force: bool) -> Result<()> {
Expand Down Expand Up @@ -384,7 +384,7 @@ where
}

async fn exit_signal(&self) -> Result<Arc<ExitSignal>> {
return Ok(self.exit_signal.clone());
Ok(self.exit_signal.clone())
}

fn get_data(&self) -> Result<SandboxData> {
Expand Down
6 changes: 3 additions & 3 deletions vmm/sandbox/src/stratovirt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl VM for StratoVirtVM {
}

async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> {
return match device_info {
match device_info {
DeviceInfo::Block(blk_info) => {
let device = VirtioBlockDevice::new(
"",
Expand All @@ -234,7 +234,7 @@ impl VM for StratoVirtVM {
DeviceInfo::Char(_char_info) => Err(Error::Unimplemented(
"hot attach for char device".to_string(),
)),
};
}
}

async fn hot_detach(&mut self, _id: &str) -> Result<()> {
Expand All @@ -252,7 +252,7 @@ impl VM for StratoVirtVM {
}

async fn wait_channel(&self) -> Option<Receiver<(u32, i128)>> {
return self.wait_chan.clone();
self.wait_chan.clone()
}

async fn vcpus(&self) -> Result<VcpuThreads> {
Expand Down
16 changes: 15 additions & 1 deletion wasm/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ oci-spec = "0.5.4"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
log = { version = "0.4.17", features = ["std"] }
time = "0.3.5"
cgroups-rs = "0.3.2"

wasmedge-sdk = { version = "0.7.1", optional = true }

Expand Down
8 changes: 4 additions & 4 deletions wasm/src/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ impl Sandboxer for WasmSandboxer {
}

async fn sandbox(&self, id: &str) -> Result<Arc<Mutex<Self::Sandbox>>> {
return Ok(self
Ok(self
.sandboxes
.read()
.await
.get(id)
.ok_or_else(|| Error::NotFound(id.to_string()))?
.clone());
.clone())
}

async fn stop(&self, id: &str, _force: bool) -> Result<()> {
Expand Down Expand Up @@ -212,9 +212,9 @@ impl Sandbox for WasmSandbox {
}

async fn container(&self, id: &str) -> Result<&Self::Container> {
return self.containers.get(id).ok_or(Error::NotFound(format!(
self.containers.get(id).ok_or(Error::NotFound(format!(
"failed to find container by id {id}"
)));
)))
}

async fn append_container(&mut self, id: &str, option: ContainerOption) -> Result<()> {
Expand Down
14 changes: 14 additions & 0 deletions wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ pub fn get_memory_limit(spec: &Spec) -> Option<i64> {
.and_then(|x| x.memory().as_ref())
.and_then(|x| x.limit())
}

pub(crate) fn get_rootfs(spec: &Spec) -> Option<String> {
spec.root()
.as_ref()
.map(|root| root.path().display().to_string())
}

#[cfg(feature = "wasmedge")]
pub(crate) fn get_cgroup_path(spec: &Spec) -> Option<String> {
spec.linux()
.as_ref()
.and_then(|linux| linux.cgroups_path().as_ref())
.map(|path| path.display().to_string())
}
65 changes: 40 additions & 25 deletions wasm/src/wasmedge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
sync::Arc,
};

use cgroups_rs::{Cgroup, CgroupPid};
use containerd_shim::{
api::{CreateTaskRequest, ExecProcessRequest, Status},
asynchronous::{
Expand Down Expand Up @@ -55,7 +56,7 @@ use wasmedge_sdk::{
params, PluginManager, Vm,
};

use crate::utils;
use crate::utils::{get_args, get_cgroup_path, get_envs, get_preopens, get_rootfs};

pub type ExecProcess = ProcessTemplate<WasmEdgeExecLifecycle>;
pub type InitProcess = ProcessTemplate<WasmEdgeInitLifecycle>;
Expand Down Expand Up @@ -115,16 +116,12 @@ impl ContainerFactory<WasmEdgeContainer> for WasmEdgeContainerFactory {
let mut spec: Spec = read_spec(req.bundle()).await?;
spec.canonicalize_rootfs(req.bundle())
.map_err(|e| Error::InvalidArgument(format!("could not canonicalize rootfs: {e}")))?;
let rootfs = spec
.root()
.as_ref()
.ok_or(Error::InvalidArgument(
"rootfs is not set in runtime spec".to_string(),
))?
.path();
mkdir(rootfs, 0o711).await?;
let rootfs = get_rootfs(&spec).ok_or_else(|| {
Error::InvalidArgument("rootfs is not set in runtime spec".to_string())
})?;
mkdir(&rootfs, 0o711).await?;
for m in req.rootfs() {
mount_rootfs(m, rootfs.as_path()).await?
mount_rootfs(m, &rootfs).await?
}
let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal);
let exit_signal = Arc::new(Default::default());
Expand Down Expand Up @@ -160,17 +157,13 @@ impl ProcessLifecycle<InitProcess> for WasmEdgeInitLifecycle {
async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
let spec = &p.lifecycle.spec;
let vm = p.lifecycle.prototype_vm.clone();
let args = utils::get_args(spec);
let envs = utils::get_envs(spec);
let rootfs = spec
.root()
.as_ref()
.ok_or(Error::InvalidArgument(
"rootfs is not set in runtime spec".to_string(),
))?
.path();
let mut preopens = vec![format!("/:{}", rootfs.display())];
preopens.append(&mut utils::get_preopens(spec));
let args = get_args(spec);
let envs = get_envs(spec);
let rootfs = get_rootfs(spec).ok_or_else(|| {
Error::InvalidArgument("rootfs is not set in runtime spec".to_string())
})?;
let mut preopens = vec![format!("/:{}", rootfs)];
preopens.append(&mut get_preopens(spec));

debug!(
"start wasm with args: {:?}, envs: {:?}, preopens: {:?}",
Expand All @@ -188,6 +181,18 @@ impl ProcessLifecycle<InitProcess> for WasmEdgeInitLifecycle {
p.pid = init_pid;
}
ForkResult::Child => {
if let Some(cgroup_path) = get_cgroup_path(spec) {
// Add child process to Cgroup
Cgroup::new(
cgroups_rs::hierarchies::auto(),
cgroup_path.trim_start_matches('/'),
)
.and_then(|cgroup| cgroup.add_task(CgroupPid::from(std::process::id() as u64)))
.map_err(other_error!(
e,
format!("failed to add task to cgroup: {}", cgroup_path)
))?;
}
match run_wasi_func(vm, args, envs, preopens, p) {
Ok(_) => exit(0),
// TODO add a pipe? to return detailed error message
Expand Down Expand Up @@ -216,7 +221,19 @@ impl ProcessLifecycle<InitProcess> for WasmEdgeInitLifecycle {
Ok(())
}

async fn delete(&self, _p: &mut InitProcess) -> containerd_shim::Result<()> {
async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
if let Some(cgroup_path) = get_cgroup_path(&p.lifecycle.spec) {
// Add child process to Cgroup
Cgroup::load(
cgroups_rs::hierarchies::auto(),
cgroup_path.trim_start_matches('/'),
)
.delete()
.map_err(other_error!(
e,
format!("failed to delete cgroup: {}", cgroup_path)
))?;
}
Ok(())
}

Expand Down Expand Up @@ -263,9 +280,7 @@ impl ProcessLifecycle<ExecProcess> for WasmEdgeExecLifecycle {
}

async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> {
Err(Error::Unimplemented(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exec in wasm container is still not supported, so should return Err here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"exec not supported for wasm containers".to_string(),
))
Ok(())
}

async fn update(
Expand Down
22 changes: 9 additions & 13 deletions wasm/src/wasmtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::fs::read;
use wasmtime::{Config, Engine, Extern, Linker, Module, Store, StoreLimits, StoreLimitsBuilder};
use wasmtime_wasi::{tokio::WasiCtxBuilder, WasiCtx};

use crate::utils;
use crate::utils::{get_args, get_kv_envs, get_memory_limit, get_rootfs};

pub type ExecProcess = ProcessTemplate<WasmtimeExecLifecycle>;
pub type InitProcess = ProcessTemplate<WasmtimeInitLifecycle>;
Expand Down Expand Up @@ -59,16 +59,12 @@ impl ContainerFactory<WasmtimeContainer> for WasmtimeContainerFactory {
let mut spec: Spec = read_spec(req.bundle()).await?;
spec.canonicalize_rootfs(req.bundle())
.map_err(|e| Error::InvalidArgument(format!("could not canonicalize rootfs: {e}")))?;
let rootfs = spec
.root()
.as_ref()
.ok_or(Error::InvalidArgument(
"rootfs is not set in runtime spec".to_string(),
))?
.path();
mkdir(rootfs, 0o711).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this mkdir removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, will update later

let rootfs = get_rootfs(&spec).ok_or_else(|| {
Error::InvalidArgument("rootfs is not set in runtime spec".to_string())
})?;
mkdir(&rootfs, 0o711).await?;
for m in req.rootfs() {
mount_rootfs(m, rootfs.as_path()).await?
mount_rootfs(m, &rootfs).await?
}
let stdio = Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal);
let exit_signal = Arc::new(Default::default());
Expand Down Expand Up @@ -115,8 +111,8 @@ impl WasmtimeInitLifecycle {
#[async_trait::async_trait]
impl ProcessLifecycle<InitProcess> for WasmtimeInitLifecycle {
async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
let args = utils::get_args(&self.spec);
let envs = utils::get_kv_envs(&self.spec);
let args = get_args(&self.spec);
let envs = get_kv_envs(&self.spec);
let root = self
.spec
.root()
Expand Down Expand Up @@ -165,7 +161,7 @@ impl ProcessLifecycle<InitProcess> for WasmtimeInitLifecycle {
let ctx = builder.build();

let mut limits_builder = StoreLimitsBuilder::new();
if let Some(memory_size) = utils::get_memory_limit(&self.spec).map(|x| x as usize) {
if let Some(memory_size) = get_memory_limit(&self.spec).map(|x| x as usize) {
limits_builder = limits_builder.memory_size(memory_size);
}
let limiter = limits_builder.build();
Expand Down
Loading