Skip to content

Commit

Permalink
enhancement(nats source): add subscriber_capacity option (#18899)
Browse files Browse the repository at this point in the history
* enhancement(nats source): add subscriber_capacity option

* feedback and fix tests
  • Loading branch information
dsmith3197 authored Oct 26, 2023
1 parent 42beb3f commit e7b563d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/sources/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,28 @@ pub struct NatsSourceConfig {
/// The `NATS` subject key.
#[serde(default = "default_subject_key_field")]
subject_key_field: OptionalValuePath,

/// The buffer capacity of the underlying NATS subscriber.
///
/// This value determines how many messages the NATS subscriber buffers
/// before incoming messages are dropped.
///
/// See the [async_nats documentation][async_nats_subscription_capacity] for more information.
///
/// [async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity
#[serde(default = "default_subscription_capacity")]
#[derivative(Default(value = "default_subscription_capacity()"))]
subscriber_capacity: usize,
}

fn default_subject_key_field() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("subject"))
}

const fn default_subscription_capacity() -> usize {
4096
}

impl GenerateConfig for NatsSourceConfig {
fn generate_config() -> toml::Value {
toml::from_str(
Expand Down Expand Up @@ -178,6 +194,7 @@ impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions {

fn try_from(config: &NatsSourceConfig) -> Result<Self, Self::Error> {
from_tls_auth_config(&config.connection_name, &config.auth, &config.tls)
.map(|options| options.subscription_capacity(config.subscriber_capacity))
}
}

Expand Down Expand Up @@ -415,6 +432,7 @@ mod integration_tests {
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -447,6 +465,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -479,6 +498,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -510,6 +530,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -541,6 +562,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -573,6 +595,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -605,6 +628,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -638,6 +662,7 @@ mod integration_tests {
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -665,6 +690,7 @@ mod integration_tests {
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -700,6 +726,7 @@ mod integration_tests {
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -733,6 +760,7 @@ mod integration_tests {
auth: None,
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -770,6 +798,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down Expand Up @@ -807,6 +836,7 @@ mod integration_tests {
}),
log_namespace: None,
subject_key_field: default_subject_key_field(),
..Default::default()
};

let r = publish_and_check(conf).await;
Expand Down
14 changes: 14 additions & 0 deletions website/cue/reference/components/sources/base/nats.cue
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,20 @@ base: components: sources: nats: configuration: {
required: false
type: string: default: "subject"
}
subscriber_capacity: {
description: """
The buffer capacity of the underlying NATS subscriber.
This value determines how many messages the NATS subscriber buffers
before incoming messages are dropped.
See the [async_nats documentation][async_nats_subscription_capacity] for more information.
[async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity
"""
required: false
type: uint: default: 4096
}
tls: {
description: "Configures the TLS options for incoming/outgoing connections."
required: false
Expand Down

0 comments on commit e7b563d

Please sign in to comment.