Skip to content

Commit

Permalink
feat(wadm)!: support configuring max stream bytes
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooksmtownsend@gmail.com>
  • Loading branch information
brooksmtownsend committed Sep 17, 2024
1 parent 78343c2 commit 5719f0e
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 3 deletions.
86 changes: 84 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,75 @@ struct Args {
/// account. See the deployment guide for more information
#[arg(long = "multitenant", env = "WADM_MULTITENANT", hide = true)]
multitenant: bool,

//
// Max bytes configuration for streams. Primarily configurable to enable deployment on NATS infra
// with limited resources.
//
/// Maximum bytes to keep for the state bucket
#[arg(
long = "state-bucket-max-bytes",
env = "WADM_STATE_BUCKET_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_state_bucket_bytes: i64,
/// Maximum bytes to keep for the manifest bucket
#[arg(
long = "manifest-bucket-max-bytes",
env = "WADM_MANIFEST_BUCKET_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_manifest_bucket_bytes: i64,
/// Maximum bytes to keep for the command stream
#[arg(
long = "command-stream-max-bytes",
env = "WADM_COMMAND_STREAM_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_command_stream_bytes: i64,
/// Maximum bytes to keep for the event stream
#[arg(
long = "event-stream-max-bytes",
env = "WADM_EVENT_STREAM_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_event_stream_bytes: i64,
/// Maximum bytes to keep for the event consumer stream
#[arg(
long = "event-consumer-stream-max-bytes",
env = "WADM_EVENT_CONSUMER_STREAM_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_event_consumer_stream_bytes: i64,
/// Maximum bytes to keep for the status stream
#[arg(
long = "status-stream-max-bytes",
env = "WADM_STATUS_STREAM_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_status_stream_bytes: i64,
/// Maximum bytes to keep for the notify stream
#[arg(
long = "notify-stream-max-bytes",
env = "WADM_NOTIFY_STREAM_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_notify_stream_bytes: i64,
/// Maximum bytes to keep for the wasmbus event stream
#[arg(
long = "wasmbus-event-stream-max-bytes",
env = "WADM_WASMBUS_EVENT_STREAM_MAX_BYTES",
default_value_t = -1,
hide = true
)]
max_wasmbus_event_stream_bytes: i64,
}

#[tokio::main]
Expand Down Expand Up @@ -189,11 +258,18 @@ async fn main() -> anyhow::Result<()> {

let trimmer: &[_] = &['.', '>', '*'];

let store = nats::ensure_kv_bucket(&context, args.state_bucket, 1).await?;
let store =
nats::ensure_kv_bucket(&context, args.state_bucket, 1, args.max_state_bucket_bytes).await?;

let state_storage = NatsKvStore::new(store);

let manifest_storage = nats::ensure_kv_bucket(&context, args.manifest_bucket, 1).await?;
let manifest_storage = nats::ensure_kv_bucket(
&context,
args.manifest_bucket,
1,
args.max_manifest_bucket_bytes,
)
.await?;

let internal_stream_name = |stream_name: &str| -> String {
match args.stream_prefix.clone() {
Expand All @@ -218,6 +294,7 @@ async fn main() -> anyhow::Result<()> {
"A stream that stores all events coming in on the wadm.evt subject in a cluster"
.to_string(),
),
args.max_event_stream_bytes,
)
.await?;

Expand All @@ -228,13 +305,15 @@ async fn main() -> anyhow::Result<()> {
internal_stream_name(COMMAND_STREAM_NAME),
vec![DEFAULT_COMMANDS_TOPIC.to_owned()],
Some("A stream that stores all commands for wadm".to_string()),
args.max_command_stream_bytes,
)
.await?;

let status_stream = nats::ensure_status_stream(
&context,
internal_stream_name(STATUS_STREAM_NAME),
vec![DEFAULT_STATUS_TOPIC.to_owned()],
args.max_status_stream_bytes,
)
.await?;

Expand Down Expand Up @@ -263,6 +342,7 @@ async fn main() -> anyhow::Result<()> {
"A stream that stores all events coming in on the wasmbus.evt subject in a cluster"
.to_string(),
),
args.max_wasmbus_event_stream_bytes,
)
.await?;

Expand All @@ -272,6 +352,7 @@ async fn main() -> anyhow::Result<()> {
&context,
NOTIFY_STREAM_NAME.to_owned(),
vec![format!("{WADM_NOTIFY_PREFIX}.*")],
args.max_notify_stream_bytes,
)
.await?;

Expand All @@ -286,6 +367,7 @@ async fn main() -> anyhow::Result<()> {
"A stream that sources from wadm_events and wasmbus_events for wadm event consumer's use"
.to_string(),
),
args.max_event_consumer_stream_bytes,
)
.await?;

Expand Down
20 changes: 19 additions & 1 deletion src/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_nats::{
Client, ConnectOptions,
};

use tracing::warn;
use tracing::{debug, warn};
use wadm::DEFAULT_EXPIRY_TIME;

/// Creates a NATS client from the given options
Expand Down Expand Up @@ -120,7 +120,9 @@ pub async fn ensure_stream(
name: String,
subjects: Vec<String>,
description: Option<String>,
max_bytes: i64,
) -> Result<Stream> {
debug!("Ensuring stream {name} exists");
let stream_config = StreamConfig {
name: name.clone(),
description,
Expand All @@ -130,6 +132,7 @@ pub async fn ensure_stream(
max_age: DEFAULT_EXPIRY_TIME,
storage: async_nats::jetstream::stream::StorageType::File,
allow_rollup: false,
max_bytes,
..Default::default()
};

Expand Down Expand Up @@ -157,7 +160,9 @@ pub async fn ensure_limits_stream(
name: String,
subjects: Vec<String>,
description: Option<String>,
max_bytes: i64,
) -> Result<Stream> {
debug!("Ensuring stream {name} exists");
let stream_config = StreamConfig {
name: name.clone(),
description,
Expand All @@ -167,6 +172,7 @@ pub async fn ensure_limits_stream(
max_age: DEFAULT_EXPIRY_TIME,
storage: async_nats::jetstream::stream::StorageType::File,
allow_rollup: false,
max_bytes,
..Default::default()
};

Expand Down Expand Up @@ -195,7 +201,9 @@ pub async fn ensure_event_consumer_stream(
subject: String,
streams: Vec<&Stream>,
description: Option<String>,
max_bytes: i64,
) -> Result<Stream> {
debug!("Ensuring stream {name} exists");
// This maps the upstream (wasmbus.evt.*.> & wadm.evt.*.>) Streams into
// a set of configuration for the downstream wadm event consumer Stream
// that consolidates them into a single set of subjects (wadm_event_consumer.evt.*.>)
Expand Down Expand Up @@ -236,6 +244,7 @@ pub async fn ensure_event_consumer_stream(
sources: Some(sources),
storage: async_nats::jetstream::stream::StorageType::File,
allow_rollup: false,
max_bytes,
..Default::default()
};

Expand All @@ -258,7 +267,9 @@ pub async fn ensure_status_stream(
context: &Context,
name: String,
subjects: Vec<String>,
max_bytes: i64,
) -> Result<Stream> {
debug!("Ensuring stream {name} exists");
context
.get_or_create_stream(StreamConfig {
name,
Expand All @@ -272,6 +283,7 @@ pub async fn ensure_status_stream(
subjects,
max_age: std::time::Duration::from_nanos(0),
storage: async_nats::jetstream::stream::StorageType::File,
max_bytes,
..Default::default()
})
.await
Expand All @@ -283,7 +295,9 @@ pub async fn ensure_notify_stream(
context: &Context,
name: String,
subjects: Vec<String>,
max_bytes: i64,
) -> Result<Stream> {
debug!("Ensuring stream {name} exists");
context
.get_or_create_stream(StreamConfig {
name,
Expand All @@ -293,6 +307,7 @@ pub async fn ensure_notify_stream(
subjects,
max_age: DEFAULT_EXPIRY_TIME,
storage: async_nats::jetstream::stream::StorageType::File,
max_bytes,
..Default::default()
})
.await
Expand All @@ -305,7 +320,9 @@ pub async fn ensure_kv_bucket(
context: &Context,
name: String,
history_to_keep: i64,
max_bytes: i64,
) -> Result<Store> {
debug!("Ensuring kv bucket {name} exists");
if let Ok(kv) = context.get_key_value(&name).await {
Ok(kv)
} else {
Expand All @@ -315,6 +332,7 @@ pub async fn ensure_kv_bucket(
history: history_to_keep,
num_replicas: 1,
storage: jetstream::stream::StorageType::File,
max_bytes,
..Default::default()
})
.await
Expand Down

0 comments on commit 5719f0e

Please sign in to comment.