Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] async cgroups #65

Closed
wants to merge 15 commits into from
396 changes: 333 additions & 63 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ chrono = "0.4"
once_cell = "1.6.0"
futures = { version = "0.3", features = ["thread-pool"] }
regex = "1.5"
smol = "1.2.5"
async-trait = "0.1.50"
oci_spec = { version = "0.1.0", path = "./oci_spec" }
168 changes: 93 additions & 75 deletions src/cgroups/blkio.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{
fs::{self, OpenOptions},
io::Write,
path::Path,
};
use std::path::Path;


use async_trait::async_trait;
use smol::{fs::{OpenOptions, create_dir_all}, io::AsyncWriteExt};

use crate::{
cgroups::Controller,
Expand All @@ -16,71 +16,76 @@ const CGROUP_BLKIO_THROTTLE_WRITE_IOPS: &str = "blkio.throttle.write_iops_device

pub struct Blkio {}

#[async_trait]
impl Controller for Blkio {
fn apply(
async fn apply(
linux_resources: &LinuxResources,
cgroup_root: &Path,
pid: nix::unistd::Pid,
) -> anyhow::Result<()> {
match &linux_resources.block_io {
None => return Ok(()),
Some(block_io) => {
fs::create_dir_all(cgroup_root)?;
Self::apply(cgroup_root, block_io)?;
create_dir_all(cgroup_root).await?;
Self::apply(cgroup_root, block_io).await?;
}
}

OpenOptions::new()
let mut file = OpenOptions::new()
.create(false)
.write(true)
.truncate(false)
.open(cgroup_root.join("cgroup.procs"))?
.write_all(pid.to_string().as_bytes())?;
.open(cgroup_root.join("cgroup.procs")).await?;

file.write_all(pid.to_string().as_bytes()).await?;
file.sync_data().await?;

Ok(())
}
}

impl Blkio {
fn apply(root_path: &Path, blkio: &LinuxBlockIo) -> anyhow::Result<()> {
async fn apply(root_path: &Path, blkio: &LinuxBlockIo) -> anyhow::Result<()> {
for trbd in &blkio.blkio_throttle_read_bps_device {
Self::write_file(
&root_path.join(CGROUP_BLKIO_THROTTLE_READ_BPS),
&format!("{}:{} {}", trbd.major, trbd.minor, trbd.rate),
)?;
).await?;
}

for twbd in &blkio.blkio_throttle_write_bps_device {
Self::write_file(
&root_path.join(CGROUP_BLKIO_THROTTLE_WRITE_BPS),
&format!("{}:{} {}", twbd.major, twbd.minor, twbd.rate),
)?;
).await?;
}

for trid in &blkio.blkio_throttle_read_iops_device {
Self::write_file(
&root_path.join(CGROUP_BLKIO_THROTTLE_READ_IOPS),
&format!("{}:{} {}", trid.major, trid.minor, trid.rate),
)?;
).await?;
}

for twid in &blkio.blkio_throttle_write_iops_device {
Self::write_file(
&root_path.join(CGROUP_BLKIO_THROTTLE_WRITE_IOPS),
&format!("{}:{} {}", twid.major, twid.minor, twid.rate),
)?;
).await?;
}

Ok(())
}

fn write_file(file_path: &Path, data: &str) -> anyhow::Result<()> {
fs::OpenOptions::new()
async fn write_file(file_path: &Path, data: &str) -> anyhow::Result<()> {
let mut file = OpenOptions::new()
.create(false)
.write(true)
.truncate(false)
.open(file_path)?
.write_all(data.as_bytes())?;
.open(file_path).await?;

file.write_all(data.as_bytes()).await?;
file.sync_data().await?;

Ok(())
}
Expand All @@ -92,6 +97,7 @@ mod tests {

use super::*;
use oci_spec::{LinuxBlockIo, LinuxThrottleDevice};
use std::io::Write;

struct BlockIoBuilder {
block_io: LinuxBlockIo,
Expand Down Expand Up @@ -152,12 +158,14 @@ mod tests {
) -> anyhow::Result<PathBuf> {
let full_path = temp_dir.join(filename);

std::fs::OpenOptions::new()
let mut file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&full_path)?
.write_all(val.as_bytes())?;
.open(&full_path)?;

file.write_all(val.as_bytes())?;
file.sync_data()?;

Ok(full_path)
}
Expand All @@ -172,59 +180,65 @@ mod tests {
let (test_root, throttle) =
setup("test_set_blkio_read_bps", CGROUP_BLKIO_THROTTLE_READ_BPS);

let blkio = BlockIoBuilder::new()
.with_read_bps(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).expect("apply blkio");
let content = fs::read_to_string(throttle)
.unwrap_or_else(|_| panic!("read {} content", CGROUP_BLKIO_THROTTLE_READ_BPS));

assert_eq!("8:0 102400", content);
smol::block_on(async {
let blkio = BlockIoBuilder::new()
.with_read_bps(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).await.expect("apply blkio");
let content = std::fs::read_to_string(throttle)
.expect(&format!("read {} content", CGROUP_BLKIO_THROTTLE_READ_BPS));

assert_eq!("8:0 102400", content);
});
}

#[test]
fn test_set_blkio_write_bps() {
let (test_root, throttle) =
setup("test_set_blkio_write_bps", CGROUP_BLKIO_THROTTLE_WRITE_BPS);

let blkio = BlockIoBuilder::new()
.with_write_bps(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).expect("apply blkio");
let content = fs::read_to_string(throttle)
.unwrap_or_else(|_| panic!("read {} content", CGROUP_BLKIO_THROTTLE_WRITE_BPS));

assert_eq!("8:0 102400", content);
smol::block_on(async {
let blkio = BlockIoBuilder::new()
.with_write_bps(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).await.expect("apply blkio");
let content = std::fs::read_to_string(throttle)
.expect(&format!("read {} content", CGROUP_BLKIO_THROTTLE_WRITE_BPS));

assert_eq!("8:0 102400", content);
});
}

#[test]
fn test_set_blkio_read_iops() {
let (test_root, throttle) =
setup("test_set_blkio_read_iops", CGROUP_BLKIO_THROTTLE_READ_IOPS);

let blkio = BlockIoBuilder::new()
.with_read_iops(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).expect("apply blkio");
let content = fs::read_to_string(throttle)
.unwrap_or_else(|_| panic!("read {} content", CGROUP_BLKIO_THROTTLE_READ_IOPS));

assert_eq!("8:0 102400", content);
smol::block_on(async {
let blkio = BlockIoBuilder::new()
.with_read_iops(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).await.expect("apply blkio");
let content = std::fs::read_to_string(throttle)
.expect(&format!("read {} content", CGROUP_BLKIO_THROTTLE_READ_IOPS));

assert_eq!("8:0 102400", content);
});
}

#[test]
Expand All @@ -234,18 +248,22 @@ mod tests {
CGROUP_BLKIO_THROTTLE_WRITE_IOPS,
);

let blkio = BlockIoBuilder::new()
.with_write_iops(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).expect("apply blkio");
let content = fs::read_to_string(throttle)
.unwrap_or_else(|_| panic!("read {} content", CGROUP_BLKIO_THROTTLE_WRITE_IOPS));

assert_eq!("8:0 102400", content);
smol::block_on(async {
let blkio = BlockIoBuilder::new()
.with_write_iops(vec![LinuxThrottleDevice {
major: 8,
minor: 0,
rate: 102400,
}])
.build();

Blkio::apply(&test_root, &blkio).await.expect("apply blkio");
let content = std::fs::read_to_string(throttle).expect(&format!(
"read {} content",
CGROUP_BLKIO_THROTTLE_WRITE_IOPS
));

assert_eq!("8:0 102400", content);
});
}
}
4 changes: 3 additions & 1 deletion src/cgroups/controller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::path::Path;

use anyhow::Result;
use async_trait::async_trait;
use nix::unistd::Pid;

use oci_spec::LinuxResources;

#[async_trait]
pub trait Controller {
fn apply(linux_resources: &LinuxResources, cgroup_root: &Path, pid: Pid) -> Result<()>;
async fn apply(linux_resources: &LinuxResources, cgroup_root: &Path, pid: Pid) -> Result<()>;
}
51 changes: 32 additions & 19 deletions src/cgroups/devices.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io::Write;
use std::{
fs::{create_dir_all, OpenOptions},
path::Path,
};

use anyhow::Result;
use async_trait::async_trait;
use nix::unistd::Pid;
use smol::{fs::{OpenOptions, create_dir_all}, io::AsyncWriteExt};

use crate::{
cgroups::Controller,
Expand All @@ -15,13 +15,21 @@ use oci_spec::{LinuxDeviceCgroup, LinuxDeviceType, LinuxResources};

pub struct Devices {}

#[async_trait]
impl Controller for Devices {
fn apply(linux_resources: &LinuxResources, cgroup_root: &Path, pid: Pid) -> Result<()> {
async fn apply(linux_resources: &LinuxResources, cgroup_root: &Path, pid: Pid) -> Result<()> {
log::debug!("Apply Devices cgroup config");
create_dir_all(&cgroup_root)?;
create_dir_all(&cgroup_root).await?;

let mut allowed: Vec<String> = Vec::new();
let mut denied: Vec<String> = Vec::new();

for d in &linux_resources.devices {
Self::apply_device(d, cgroup_root)?;
if d.allow {
allowed.push(d.to_string())
} else {
denied.push(d.to_string())
}
}

for d in [
Expand All @@ -30,33 +38,38 @@ impl Controller for Devices {
]
.concat()
{
Self::apply_device(&d, &cgroup_root)?;
if d.allow {
allowed.push(d.to_string())
} else {
denied.push(d.to_string())
}
}

OpenOptions::new()
Self::write_file(&allowed.join("\n"), &cgroup_root.join("devices.allow")).await?;
Self::write_file(&denied.join("\n"), &cgroup_root.join("devices.deny")).await?;

let mut file = OpenOptions::new()
.create(false)
.write(true)
.truncate(false)
.open(cgroup_root.join("cgroup.procs"))?
.write_all(pid.to_string().as_bytes())?;
.open(cgroup_root.join("cgroup.procs")).await?;

file.write_all(pid.to_string().as_bytes()).await?;
file.sync_data().await?;
Ok(())
}
}

impl Devices {
fn apply_device(device: &LinuxDeviceCgroup, cgroup_root: &Path) -> Result<()> {
let path = if device.allow {
cgroup_root.join("devices.allow")
} else {
cgroup_root.join("devices.deny")
};

OpenOptions::new()
async fn write_file(data: &str, path: &Path) -> Result<()> {
let mut file = OpenOptions::new()
.create(false)
.write(true)
.truncate(false)
.open(path)?
.write_all(device.to_string().as_bytes())?;
.open(path).await?;

file.write_all(data.as_bytes()).await?;
file.sync_data().await?;
Ok(())
}

Expand Down
Loading