Skip to content

Commit

Permalink
Thread throttle configuration through project
Browse files Browse the repository at this point in the history
This commit builds on #573, #574, #575 and threads the configuration of the
throttle through all of our generators. This commit finishes up our work
introducing new throttles into the project.

Signed-off-by: Brian L. Troutwine <brian.troutwine@datadoghq.com>
  • Loading branch information
blt committed Apr 29, 2023
1 parent a1c2859 commit fee8507
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 46 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Add 'stable' throttle
- Add 'all-out' throttle
- Allow users to configure generators to use the various throttles

## [0.15.1-rc1]
### Added
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ mod tests {

use http::HeaderMap;

use crate::throttle;

use super::*;

#[test]
Expand Down Expand Up @@ -115,6 +117,7 @@ blackhole:
.unwrap(),
block_sizes: Option::default(),
parallel_connections: 5,
throttle: throttle::Config::default(),
})],
blackhole: Some(vec![
blackhole::Config::Tcp(blackhole::tcp::Config {
Expand Down
7 changes: 5 additions & 2 deletions src/generator/file_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
block::{self, chunk_bytes, construct_block_cache, Block},
payload,
signals::Shutdown,
throttle::Throttle,
throttle::{self, Throttle},
};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -86,6 +86,9 @@ pub struct Config {
/// tailing software to remove old files.
#[serde(default = "default_rotation")]
rotate: bool,
/// The load throttle configuration
#[serde(default)]
pub throttle: throttle::Config,
}

#[derive(Debug)]
Expand Down Expand Up @@ -154,7 +157,7 @@ impl FileGen {
let mut handles = Vec::new();
let file_index = Arc::new(AtomicU32::new(0));
for _ in 0..config.duplicates {
let throttle = Throttle::new(bytes_per_second);
let throttle = Throttle::new_with_config(config.throttle, bytes_per_second);

let block_cache =
construct_block_cache(&mut rng, &config.variant, &block_chunks, &labels);
Expand Down
27 changes: 16 additions & 11 deletions src/generator/file_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use serde::Deserialize;
use tokio::{fs::create_dir, fs::rename, fs::File};
use tracing::info;

use crate::{signals::Shutdown, throttle::Throttle};
use crate::{
signals::Shutdown,
throttle::{self, Throttle},
};

static FILE_EXTENSION: &str = "txt";

Expand Down Expand Up @@ -68,7 +71,7 @@ fn default_rename_per_name() -> NonZeroU32 {
NonZeroU32::new(1).unwrap()
}

#[derive(Debug, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Deserialize, PartialEq, Clone)]
/// Configuration of [`FileTree`]
pub struct Config {
/// The seed for random operations against this target
Expand Down Expand Up @@ -96,6 +99,9 @@ pub struct Config {
#[serde(default = "default_rename_per_name")]
/// The number of rename per second
pub rename_per_second: NonZeroU32,
/// The load throttle configuration
#[serde(default)]
pub throttle: throttle::Config,
}

#[derive(Debug)]
Expand All @@ -105,8 +111,8 @@ pub struct Config {
/// this without coordination to the target.
pub struct FileTree {
name_len: NonZeroUsize,
open_per_second: NonZeroU32,
rename_per_second: NonZeroU32,
open_throttle: Throttle,
rename_throttle: Throttle,
total_folder: usize,
nodes: VecDeque<PathBuf>,
rng: StdRng,
Expand All @@ -124,10 +130,12 @@ impl FileTree {
let mut rng = StdRng::from_seed(config.seed);
let (nodes, _total_files, total_folder) = generate_tree(&mut rng, config);

let open_throttle = Throttle::new_with_config(config.throttle, config.open_per_second);
let rename_throttle = Throttle::new_with_config(config.throttle, config.rename_per_second);
Ok(Self {
name_len: config.name_len,
open_per_second: config.open_per_second,
rename_per_second: config.rename_per_second,
open_throttle,
rename_throttle,
total_folder,
nodes,
rng,
Expand All @@ -149,15 +157,12 @@ impl FileTree {
///
/// Function will panic if one node is not path is not populated properly
pub async fn spin(mut self) -> Result<(), Error> {
let mut open_throttle = Throttle::new(self.open_per_second);
let mut rename_throttle = Throttle::new(self.rename_per_second);

let mut iter = self.nodes.iter().cycle();
let mut folders = Vec::with_capacity(self.total_folder);

loop {
tokio::select! {
_ = open_throttle.wait() => {
_ = self.open_throttle.wait() => {
let node = iter.next().unwrap();
if node.exists() {
File::open(node.as_path()).await?;
Expand All @@ -169,7 +174,7 @@ impl FileTree {
}
}
},
_ = rename_throttle.wait() => {
_ = self.rename_throttle.wait() => {
if let Some(folder) = folders.choose_mut(&mut self.rng) {
rename_folder(&mut self.rng, folder, self.name_len.get()).await?;
}
Expand Down
8 changes: 6 additions & 2 deletions src/generator/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
block::{self, chunk_bytes, construct_block_cache, Block},
payload,
signals::Shutdown,
throttle::Throttle,
throttle::{self, Throttle},
};

/// Errors produced by [`Grpc`]
Expand Down Expand Up @@ -57,6 +57,9 @@ pub struct Config {
pub maximum_prebuild_cache_size_bytes: byte_unit::Byte,
/// The total number of parallel connections to maintain
pub parallel_connections: u16,
/// The load throttle configuration
#[serde(default)]
pub throttle: throttle::Config,
}

/// No-op tonic codec. Sends raw bytes and returns the number of bytes received.
Expand Down Expand Up @@ -182,13 +185,14 @@ impl Grpc {
.cloned()
.expect("target_uri should have an RPC path");

let throttle = Throttle::new_with_config(config.throttle, bytes_per_second);
Ok(Self {
target_uri,
rpc_path,
config,
shutdown,
block_cache,
throttle: Throttle::new(bytes_per_second),
throttle,
metric_labels: labels,
})
}
Expand Down
7 changes: 5 additions & 2 deletions src/generator/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
block::{self, chunk_bytes, construct_block_cache, Block},
payload,
signals::Shutdown,
throttle::Throttle,
throttle::{self, Throttle},
};

static CONNECTION_SEMAPHORE: OnceCell<Semaphore> = OnceCell::new();
Expand Down Expand Up @@ -56,6 +56,9 @@ pub struct Config {
pub block_sizes: Option<Vec<byte_unit::Byte>>,
/// The total number of parallel connections to maintain
pub parallel_connections: u16,
/// The load throttle configuration
#[serde(default)]
pub throttle: throttle::Config,
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -155,7 +158,7 @@ impl Http {
method: hyper::Method::POST,
headers: config.headers,
block_cache,
throttle: Throttle::new(bytes_per_second),
throttle: Throttle::new_with_config(config.throttle, bytes_per_second),
metric_labels: labels,
shutdown,
})
Expand Down
32 changes: 19 additions & 13 deletions src/generator/process_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
//! losely to the target but instead, without coordination, merely generates
//! a process tree.
use crate::{signals::Shutdown, throttle::Throttle};
use crate::{
signals::Shutdown,
throttle::{self, Throttle},
};
use is_executable::IsExecutable;
use nix::{
sys::wait::{waitpid, WaitPidFlag, WaitStatus},
Expand Down Expand Up @@ -107,7 +110,7 @@ fn default_envs_count() -> NonZeroU32 {
NonZeroU32::new(10).unwrap()
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
/// Configuration of [`ProcessTree`]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum Args {
Expand All @@ -117,14 +120,14 @@ pub enum Args {
Generate(GenerateArgs),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
/// Configuration of [`ProcessTree`]
pub struct StaticArgs {
/// Argumments used with the `static` mode
pub values: Vec<String>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
/// Configuration of [`ProcessTree`]
pub struct GenerateArgs {
/// The maximum number argument per Process. Used by the `generate` mode
Expand All @@ -135,7 +138,7 @@ pub struct GenerateArgs {
pub count: NonZeroU32,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
/// Configuration of [`ProcessTree`]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum Envs {
Expand All @@ -145,14 +148,14 @@ pub enum Envs {
Generate(GenerateEnvs),
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
/// Configuration of [`ProcessTree`]
pub struct StaticEnvs {
/// Environment variables used with the `static` mode
pub values: Vec<String>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)]
/// Configuration of [`ProcessTree`]
pub struct GenerateEnvs {
/// The maximum number environment variable per Process. Used by the `generate` mode
Expand All @@ -179,7 +182,7 @@ impl StaticEnvs {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
/// Configuration of [`ProcessTree`]
pub struct Executable {
/// Path of the executable
Expand All @@ -190,7 +193,7 @@ pub struct Executable {
pub envs: Envs,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
/// Configuration of [`ProcessTree`]
pub struct Config {
/// The seed for random operations against this target
Expand All @@ -209,6 +212,9 @@ pub struct Config {
pub process_sleep_ns: NonZeroU32,
/// List of executables
pub executables: Vec<Executable>,
/// The load throttle configuration
#[serde(default)]
pub throttle: throttle::Config,
}

impl Config {
Expand Down Expand Up @@ -239,7 +245,7 @@ impl Config {
pub struct ProcessTree {
lading_path: PathBuf,
config_content: String,
max_tree_per_second: NonZeroU32,
throttle: Throttle,
shutdown: Shutdown,
}

Expand All @@ -256,11 +262,12 @@ impl ProcessTree {
Err(e) => return Err(Error::from(e)),
};

let throttle = Throttle::new_with_config(config.throttle, config.max_tree_per_second);
match serde_yaml::to_string(config) {
Ok(serialized) => Ok(Self {
lading_path,
config_content: serialized,
max_tree_per_second: config.max_tree_per_second,
throttle,
shutdown,
}),
Err(e) => Err(Error::from(e)),
Expand All @@ -280,12 +287,11 @@ impl ProcessTree {
/// Panic if the lading path can't determine.
///
pub async fn spin(mut self) -> Result<(), Error> {
let mut process_throttle = Throttle::new(self.max_tree_per_second);
let lading_path = self.lading_path.to_str().unwrap();

loop {
tokio::select! {
_ = process_throttle.wait() => {
_ = self.throttle.wait() => {
// using pid as target pid just to pass laging clap constraints
let output = Command::new(lading_path)
.args(["--target-pid", "1"])
Expand Down
11 changes: 7 additions & 4 deletions src/generator/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
payload,
payload::SplunkHecEncoding,
signals::Shutdown,
throttle::Throttle,
throttle::{self, Throttle},
};

static CONNECTION_SEMAPHORE: OnceCell<Semaphore> = OnceCell::new();
Expand All @@ -40,7 +40,7 @@ const SPLUNK_HEC_TEXT_PATH: &str = "/services/collector/raw";
const SPLUNK_HEC_CHANNEL_HEADER: &str = "x-splunk-request-channel";

/// Optional Splunk HEC indexer acknowledgements configuration
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct AckSettings {
/// The time in seconds between queries to /services/collector/ack
pub ack_query_interval_seconds: u64,
Expand All @@ -50,7 +50,7 @@ pub struct AckSettings {
}

/// Configuration for [`SplunkHec`]
#[derive(Deserialize, Debug, PartialEq, Eq)]
#[derive(Deserialize, Debug, PartialEq)]
pub struct Config {
/// The seed for random operations against this target
pub seed: [u8; 32],
Expand All @@ -71,6 +71,9 @@ pub struct Config {
pub block_sizes: Option<Vec<byte_unit::Byte>>,
/// The total number of parallel connections to maintain
pub parallel_connections: u16,
/// The load throttle configuration
#[serde(default)]
pub throttle: throttle::Config,
}

#[derive(thiserror::Error, Debug, Clone, Copy)]
Expand Down Expand Up @@ -192,7 +195,7 @@ impl SplunkHec {
uri,
token: config.token,
block_cache,
throttle: Throttle::new(bytes_per_second),
throttle: Throttle::new_with_config(config.throttle, bytes_per_second),
metric_labels: labels,
shutdown,
})
Expand Down
Loading

0 comments on commit fee8507

Please sign in to comment.