From e7b563d1955a0acc350364aba90e5dcd8c7b3c6e Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 26 Oct 2023 17:39:39 -0400 Subject: [PATCH] enhancement(nats source): add subscriber_capacity option (#18899) * enhancement(nats source): add subscriber_capacity option * feedback and fix tests --- src/sources/nats.rs | 30 +++++++++++++++++++ .../components/sources/base/nats.cue | 14 +++++++++ 2 files changed, 44 insertions(+) diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 7337d35cc676c..0f5a4613c19e8 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -97,12 +97,28 @@ pub struct NatsSourceConfig { /// The `NATS` subject key. #[serde(default = "default_subject_key_field")] subject_key_field: OptionalValuePath, + + /// The buffer capacity of the underlying NATS subscriber. + /// + /// This value determines how many messages the NATS subscriber buffers + /// before incoming messages are dropped. + /// + /// See the [async_nats documentation][async_nats_subscription_capacity] for more information. + /// + /// [async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity + #[serde(default = "default_subscription_capacity")] + #[derivative(Default(value = "default_subscription_capacity()"))] + subscriber_capacity: usize, } fn default_subject_key_field() -> OptionalValuePath { OptionalValuePath::from(owned_value_path!("subject")) } +const fn default_subscription_capacity() -> usize { + 4096 +} + impl GenerateConfig for NatsSourceConfig { fn generate_config() -> toml::Value { toml::from_str( @@ -178,6 +194,7 @@ impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions { fn try_from(config: &NatsSourceConfig) -> Result { from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) + .map(|options| options.subscription_capacity(config.subscriber_capacity)) } } @@ -415,6 +432,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -447,6 +465,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -479,6 +498,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -510,6 +530,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -541,6 +562,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -573,6 +595,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -605,6 +628,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -638,6 +662,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -665,6 +690,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -700,6 +726,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -733,6 +760,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -770,6 +798,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -807,6 +836,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; diff --git a/website/cue/reference/components/sources/base/nats.cue b/website/cue/reference/components/sources/base/nats.cue index 81bc4bdca5b86..21d814c94df9f 100644 --- a/website/cue/reference/components/sources/base/nats.cue +++ b/website/cue/reference/components/sources/base/nats.cue @@ -345,6 +345,20 @@ base: components: sources: nats: configuration: { required: false type: string: default: "subject" } + subscriber_capacity: { + description: """ + The buffer capacity of the underlying NATS subscriber. + + This value determines how many messages the NATS subscriber buffers + before incoming messages are dropped. + + See the [async_nats documentation][async_nats_subscription_capacity] for more information. + + [async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity + """ + required: false + type: uint: default: 4096 + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false