diff --git a/Cargo.lock b/Cargo.lock index 9b270bb1bfa62..711b575ca5cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5418,9 +5418,9 @@ dependencies = [ [[package]] name = "no-proxy" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc5956cc74e3574924e108ad12e14340a64183e1cd1d69a7e41e9680c109e67" +checksum = "1b41e7479dc3678ea792431e04bafd62a31879035f4a5fa707602df062f58c77" dependencies = [ "cidr-utils", "serde", diff --git a/lib/vector-config/Cargo.toml b/lib/vector-config/Cargo.toml index 83432b11e813e..c592fe580fef3 100644 --- a/lib/vector-config/Cargo.toml +++ b/lib/vector-config/Cargo.toml @@ -16,7 +16,7 @@ chrono-tz = { version = "0.8.3", default-features = false } encoding_rs = { version = "0.8", default-features = false, features = ["alloc", "serde"] } indexmap = { version = "2.0", default-features = false, features = ["std"] } inventory = { version = "0.3" } -no-proxy = { version = "0.3.3", default-features = false, features = ["serialize"] } +no-proxy = { version = "0.3.4", default-features = false, features = ["serialize"] } num-traits = { version = "0.2.16", default-features = false } once_cell = { version = "1", default-features = false } serde = { version = "1.0", default-features = false } diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 4ff76a7ce88f3..c56538fb510a3 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -29,7 +29,7 @@ metrics = "0.21.1" metrics-tracing-context = { version = "0.14.0", default-features = false } metrics-util = { version = "0.15.1", default-features = false, features = ["registry"] } mlua = { version = "0.8.9", default-features = false, features = ["lua54", "send", "vendored"], optional = true } -no-proxy = { version = "0.3.3", default-features = false, features = ["serialize"] } +no-proxy = { version = "0.3.4", default-features = false, features = ["serialize"] } once_cell = { version = "1.18", default-features = false } ordered-float = { version = "3.7.0", default-features = false } openssl = { version = "0.10.56", default-features = false, features = ["vendored"] } diff --git a/scripts/integration/.gitignore b/scripts/integration/.gitignore new file mode 100644 index 0000000000000..a86d6b738ce29 --- /dev/null +++ b/scripts/integration/.gitignore @@ -0,0 +1 @@ +*/compose-temp*.yaml diff --git a/src/app.rs b/src/app.rs index ed0bf2545c7d6..345e1437cd533 100644 --- a/src/app.rs +++ b/src/app.rs @@ -62,7 +62,7 @@ pub struct Application { pub require_healthy: Option, pub config: ApplicationConfig, pub signals: SignalPair, - pub openssl_legacy_provider: Option, + pub openssl_providers: Option>, } impl ApplicationConfig { @@ -196,11 +196,11 @@ impl Application { debug!(message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."); } - let openssl_legacy_provider = opts + let openssl_providers = opts .root .openssl_legacy_provider - .then(load_openssl_legacy_provider) - .flatten(); + .then(load_openssl_legacy_providers) + .transpose()?; let runtime = build_runtime(opts.root.threads, "vector-worker")?; @@ -222,7 +222,7 @@ impl Application { require_healthy: opts.root.require_healthy, config, signals, - openssl_legacy_provider, + openssl_providers, }, )) } @@ -239,7 +239,7 @@ impl Application { require_healthy, config, signals, - openssl_legacy_provider, + openssl_providers, } = self; let topology_controller = SharedTopologyController::new(TopologyController { @@ -257,7 +257,7 @@ impl Application { graceful_crash_receiver: config.graceful_crash_receiver, signals, topology_controller, - openssl_legacy_provider, + openssl_providers, }) } } @@ -267,7 +267,7 @@ pub struct StartedApplication { pub graceful_crash_receiver: mpsc::UnboundedReceiver, pub signals: SignalPair, pub topology_controller: SharedTopologyController, - pub openssl_legacy_provider: Option, + pub openssl_providers: Option>, } impl StartedApplication { @@ -281,7 +281,7 @@ impl StartedApplication { graceful_crash_receiver, signals, topology_controller, - openssl_legacy_provider, + openssl_providers, } = self; let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver); @@ -313,7 +313,7 @@ impl StartedApplication { signal, signal_rx, topology_controller, - openssl_legacy_provider, + openssl_providers, } } } @@ -368,7 +368,7 @@ pub struct FinishedApplication { pub signal: SignalTo, pub signal_rx: SignalRx, pub topology_controller: SharedTopologyController, - pub openssl_legacy_provider: Option, + pub openssl_providers: Option>, } impl FinishedApplication { @@ -377,7 +377,7 @@ impl FinishedApplication { signal, signal_rx, topology_controller, - openssl_legacy_provider, + openssl_providers, } = self; // At this point, we'll have the only reference to the shared topology controller and can @@ -392,7 +392,7 @@ impl FinishedApplication { SignalTo::Quit => Self::quit(), _ => unreachable!(), }; - drop(openssl_legacy_provider); + drop(openssl_providers); status } @@ -571,13 +571,17 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) /// /// The returned [Provider] must stay in scope for the entire lifetime of the application, as it /// will be unloaded when it is dropped. -pub fn load_openssl_legacy_provider() -> Option { +pub fn load_openssl_legacy_providers() -> Result, ExitCode> { warn!(message = "DEPRECATED The openssl legacy provider provides algorithms and key sizes no longer recommended for use. Set `--openssl-legacy-provider=false` or `VECTOR_OPENSSL_LEGACY_PROVIDER=false` to disable. See https://vector.dev/highlights/2023-08-15-0-32-0-upgrade-guide/#legacy-openssl for details."); - Provider::try_load(None, "legacy", true) - .map(|provider| { - info!(message = "Loaded openssl legacy provider."); - provider - }) - .map_err(|error| error!(message = "Failed to load openssl legacy provider.", %error)) - .ok() + ["legacy", "default"].into_iter().map(|provider_name| { + Provider::try_load(None, provider_name, true) + .map(|provider| { + info!(message = "Loaded openssl provider.", provider = provider_name); + provider + }) + .map_err(|error| { + error!(message = "Failed to load openssl provider.", provider = provider_name, %error); + exitcode::UNAVAILABLE + }) + }).collect() } diff --git a/src/cli.rs b/src/cli.rs index 32a9ac4f277fd..6d21421d2a1c1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -196,7 +196,15 @@ pub struct RootOpts { pub allocation_tracing_reporting_interval_ms: u64, /// Load the OpenSSL legacy provider. - #[arg(long, env = "VECTOR_OPENSSL_LEGACY_PROVIDER", default_value = "true")] + #[arg( + long, + env = "VECTOR_OPENSSL_LEGACY_PROVIDER", + default_value = "true", + default_missing_value = "true", + num_args = 0..=1, + require_equals = true, + action = ArgAction::Set + )] pub openssl_legacy_provider: bool, /// Disable probing and configuration of root certificate locations on the system for OpenSSL. diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 7d30daba29d97..f1073573cb51c 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -86,8 +86,6 @@ mod lua; mod metric_to_log; #[cfg(feature = "sources-mongodb_metrics")] mod mongodb_metrics; -#[cfg(feature = "sinks-nats")] -mod nats; #[cfg(feature = "sources-nginx_metrics")] mod nginx_metrics; mod open; @@ -224,8 +222,6 @@ pub(crate) use self::loki::*; pub(crate) use self::lua::*; #[cfg(feature = "transforms-metric_to_log")] pub(crate) use self::metric_to_log::*; -#[cfg(feature = "sinks-nats")] -pub(crate) use self::nats::*; #[cfg(feature = "sources-nginx_metrics")] pub(crate) use self::nginx_metrics::*; pub(crate) use self::parser::*; diff --git a/src/internal_events/nats.rs b/src/internal_events/nats.rs deleted file mode 100644 index c68c756a3a377..0000000000000 --- a/src/internal_events/nats.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::emit; -use metrics::counter; -use vector_common::internal_event::{ - error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL, -}; -use vector_core::internal_event::InternalEvent; - -#[derive(Debug)] -pub struct NatsEventSendError { - pub error: async_nats::Error, -} - -impl InternalEvent for NatsEventSendError { - fn emit(self) { - let reason = "Failed to send message."; - error!( - message = reason, - error = %self.error, - error_type = error_type::WRITER_FAILED, - stage = error_stage::SENDING, - internal_log_rate_limit = true, - ); - counter!( - "component_errors_total", 1, - "error_type" => error_type::WRITER_FAILED, - "stage" => error_stage::SENDING, - ); - emit!(ComponentEventsDropped:: { count: 1, reason }); - - // deprecated - counter!("send_errors_total", 1); - } -} diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs deleted file mode 100644 index 02265df21972b..0000000000000 --- a/src/sinks/nats.rs +++ /dev/null @@ -1,716 +0,0 @@ -use std::convert::TryFrom; - -use async_trait::async_trait; -use bytes::BytesMut; -use codecs::JsonSerializerConfig; -use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; -use snafu::{ResultExt, Snafu}; -use tokio_util::codec::Encoder as _; -use vector_common::internal_event::{ - ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, -}; -use vector_config::configurable_component; - -use crate::{ - codecs::{Encoder, EncodingConfig, Transformer}, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{EstimatedJsonEncodedSizeOf, Event, EventStatus, Finalizable}, - internal_events::{NatsEventSendError, TemplateRenderingError}, - nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, - sinks::util::StreamSink, - template::{Template, TemplateParseError}, - tls::TlsEnableableConfig, -}; - -#[derive(Debug, Snafu)] -enum BuildError { - #[snafu(display("invalid encoding: {}", source))] - Encoding { - source: codecs::encoding::BuildError, - }, - #[snafu(display("invalid subject template: {}", source))] - SubjectTemplate { source: TemplateParseError }, - #[snafu(display("NATS Config Error: {}", source))] - Config { source: NatsConfigError }, - #[snafu(display("NATS Connect Error: {}", source))] - Connect { source: async_nats::ConnectError }, -} - -/** - * Code dealing with the SinkConfig struct. - */ - -/// Configuration for the `nats` sink. -#[configurable_component(sink( - "nats", - "Publish observability data to subjects on the NATS messaging system." -))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -pub struct NatsSinkConfig { - #[configurable(derived)] - encoding: EncodingConfig, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub acknowledgements: AcknowledgementsConfig, - - /// A NATS [name][nats_connection_name] assigned to the NATS connection. - /// - /// [nats_connection_name]: https://docs.nats.io/using-nats/developer/connecting/name - #[serde(default = "default_name", alias = "name")] - #[configurable(metadata(docs::examples = "foo"))] - connection_name: String, - - /// The NATS [subject][nats_subject] to publish messages to. - /// - /// [nats_subject]: https://docs.nats.io/nats-concepts/subjects - #[configurable(metadata(docs::templateable))] - #[configurable(metadata( - docs::examples = "{{ host }}", - docs::examples = "foo", - docs::examples = "time.us.east", - docs::examples = "time.*.east", - docs::examples = "time.>", - docs::examples = ">" - ))] - subject: String, - - /// The NATS [URL][nats_url] to connect to. - /// - /// The URL must take the form of `nats://server:port`. - /// If the port is not specified it defaults to 4222. - /// - /// [nats_url]: https://docs.nats.io/using-nats/developer/connecting#nats-url - #[configurable(metadata(docs::examples = "nats://demo.nats.io"))] - #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))] - url: String, - - #[configurable(derived)] - tls: Option, - - #[configurable(derived)] - auth: Option, -} - -fn default_name() -> String { - String::from("vector") -} - -impl GenerateConfig for NatsSinkConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - acknowledgements: Default::default(), - auth: None, - connection_name: "vector".into(), - encoding: JsonSerializerConfig::default().into(), - subject: "from.vector".into(), - tls: None, - url: "nats://127.0.0.1:4222".into(), - }) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "nats")] -impl SinkConfig for NatsSinkConfig { - async fn build( - &self, - _cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = NatsSink::new(self.clone()).await?; - let healthcheck = healthcheck(self.clone()).boxed(); - Ok((super::VectorSink::from_event_streamsink(sink), healthcheck)) - } - - fn input(&self) -> Input { - Input::new(self.encoding.config().input_type() & DataType::Log) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions { - type Error = NatsConfigError; - - fn try_from(config: &NatsSinkConfig) -> Result { - from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) - } -} - -impl NatsSinkConfig { - async fn connect(&self) -> Result { - let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; - - options.connect(&self.url).await.context(ConnectSnafu) - } -} - -async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { - config.connect().map_ok(|_| ()).map_err(|e| e.into()).await -} - -pub struct NatsSink { - transformer: Transformer, - encoder: Encoder<()>, - connection: async_nats::Client, - subject: Template, -} - -impl NatsSink { - async fn new(config: NatsSinkConfig) -> Result { - let connection = config.connect().await?; - let transformer = config.encoding.transformer(); - let serializer = config.encoding.build().context(EncodingSnafu)?; - let encoder = Encoder::<()>::new(serializer); - - Ok(NatsSink { - connection, - transformer, - encoder, - subject: Template::try_from(config.subject).context(SubjectTemplateSnafu)?, - }) - } -} - -#[async_trait] -impl StreamSink for NatsSink { - async fn run(mut self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { - let bytes_sent = register!(BytesSent::from(Protocol::TCP)); - let events_sent = register!(EventsSent::from(Output(None))); - - while let Some(mut event) = input.next().await { - let finalizers = event.take_finalizers(); - - let subject = match self.subject.render_string(&event) { - Ok(subject) => subject, - Err(error) => { - emit!(TemplateRenderingError { - error, - field: Some("subject"), - drop_event: true, - }); - finalizers.update_status(EventStatus::Rejected); - continue; - } - }; - - self.transformer.transform(&mut event); - - let event_byte_size = event.estimated_json_encoded_size_of(); - - let mut bytes = BytesMut::new(); - if self.encoder.encode(event, &mut bytes).is_err() { - // Error is handled by `Encoder`. - finalizers.update_status(EventStatus::Rejected); - continue; - } - - let message_size = bytes.len(); - match self - .connection - .publish(subject.clone(), bytes.freeze()) - .map_err(Into::into) - .and_then(|_| self.connection.flush().map_err(Into::into)) - .await - { - Err(error) => { - finalizers.update_status(EventStatus::Errored); - - emit!(NatsEventSendError { error }); - } - Ok(_) => { - finalizers.update_status(EventStatus::Delivered); - - events_sent.emit(CountByteSize(1, event_byte_size)); - bytes_sent.emit(ByteSize(message_size)); - } - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } -} - -#[cfg(feature = "nats-integration-tests")] -#[cfg(test)] -mod integration_tests { - use codecs::TextSerializerConfig; - use std::time::Duration; - - use super::*; - use crate::nats::{NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword}; - use crate::sinks::VectorSink; - use crate::test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - random_lines_with_stream, random_string, trace_init, - }; - use crate::tls::TlsConfig; - - async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), BuildError> { - // Publish `N` messages to NATS. - // - // Verify with a separate subscriber that the messages were - // successfully published. - - // Create Sink - let sink = NatsSink::new(conf.clone()).await?; - let sink = VectorSink::from_event_streamsink(sink); - - // Establish the consumer subscription. - let subject = conf.subject.clone(); - let consumer = conf - .clone() - .connect() - .await - .expect("failed to connect with test consumer"); - let mut sub = consumer - .subscribe(subject) - .await - .expect("failed to subscribe with test consumer"); - consumer - .flush() - .await - .expect("failed to flush with the test consumer"); - - // Publish events. - let num_events = 1_000; - let (input, events) = random_lines_with_stream(100, num_events, None); - - run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; - - // Unsubscribe from the channel. - tokio::time::sleep(Duration::from_secs(3)).await; - sub.unsubscribe().await.unwrap(); - - let mut output: Vec = Vec::new(); - while let Some(msg) = sub.next().await { - output.push(String::from_utf8_lossy(&msg.payload).to_string()) - } - - assert_eq!(output.len(), input.len()); - assert_eq!(output, input); - - Ok(()) - } - - #[tokio::test] - async fn nats_no_auth() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = - std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_userpass_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_USERPASS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::UserPassword { - user_password: NatsAuthUserPassword { - user: "natsuser".to_string(), - password: "natspass".to_string().into(), - }, - }), - }; - - publish_and_check(conf) - .await - .expect("publish_and_check failed"); - } - - #[tokio::test] - async fn nats_userpass_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_USERPASS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::UserPassword { - user_password: NatsAuthUserPassword { - user: "natsuser".to_string(), - password: "wrongpass".to_string().into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_token_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TOKEN_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Token { - token: NatsAuthToken { - value: "secret".to_string().into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_token_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TOKEN_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Token { - token: NatsAuthToken { - value: "wrongsecret".to_string().into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_nkey_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_NKEY_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Nkey { - nkey: NatsAuthNKey { - nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(), - seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_nkey_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_NKEY_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: Some(NatsAuthConfig::Nkey { - nkey: NatsAuthNKey { - nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), - seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Config, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: None, - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_client_cert_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - crt_file: Some("tests/data/nats/nats-client.pem".into()), - key_file: Some("tests/data/nats/nats-client.key".into()), - ..Default::default() - }, - }), - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_client_cert_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: None, - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_jwt_auth_valid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_JWT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: Some(NatsAuthConfig::CredentialsFile { - credentials_file: NatsAuthCredentialsFile { - path: "tests/data/nats/nats.creds".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - r.is_ok(), - "publish_and_check failed, expected Ok(()), got: {:?}", - r - ); - } - - #[tokio::test] - async fn nats_tls_jwt_auth_invalid() { - trace_init(); - - let subject = format!("test-{}", random_string(10)); - let url = std::env::var("NATS_JWT_ADDRESS") - .unwrap_or_else(|_| String::from("nats://localhost:4222")); - - let conf = NatsSinkConfig { - acknowledgements: Default::default(), - encoding: TextSerializerConfig::default().into(), - connection_name: "".to_owned(), - subject: subject.clone(), - url, - tls: Some(TlsEnableableConfig { - enabled: Some(true), - options: TlsConfig { - ca_file: Some("tests/data/nats/rootCA.pem".into()), - ..Default::default() - }, - }), - auth: Some(NatsAuthConfig::CredentialsFile { - credentials_file: NatsAuthCredentialsFile { - path: "tests/data/nats/nats-bad.creds".into(), - }, - }), - }; - - let r = publish_and_check(conf).await; - assert!( - matches!(r, Err(BuildError::Connect { .. })), - "publish_and_check failed, expected BuildError::Connect, got: {:?}", - r - ); - } -} diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs new file mode 100644 index 0000000000000..4ce334cda070a --- /dev/null +++ b/src/sinks/nats/config.rs @@ -0,0 +1,130 @@ +use codecs::JsonSerializerConfig; +use futures_util::TryFutureExt; +use snafu::ResultExt; +use vector_core::tls::TlsEnableableConfig; + +use crate::{ + nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, + sinks::prelude::*, +}; + +use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError}; + +/// Configuration for the `nats` sink. +#[configurable_component(sink( + "nats", + "Publish observability data to subjects on the NATS messaging system." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct NatsSinkConfig { + #[configurable(derived)] + pub(super) encoding: EncodingConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, + + /// A NATS [name][nats_connection_name] assigned to the NATS connection. + /// + /// [nats_connection_name]: https://docs.nats.io/using-nats/developer/connecting/name + #[serde(default = "default_name", alias = "name")] + #[configurable(metadata(docs::examples = "foo"))] + pub(super) connection_name: String, + + /// The NATS [subject][nats_subject] to publish messages to. + /// + /// [nats_subject]: https://docs.nats.io/nats-concepts/subjects + #[configurable(metadata(docs::templateable))] + #[configurable(metadata( + docs::examples = "{{ host }}", + docs::examples = "foo", + docs::examples = "time.us.east", + docs::examples = "time.*.east", + docs::examples = "time.>", + docs::examples = ">" + ))] + pub(super) subject: Template, + + /// The NATS [URL][nats_url] to connect to. + /// + /// The URL must take the form of `nats://server:port`. + /// If the port is not specified it defaults to 4222. + /// + /// [nats_url]: https://docs.nats.io/using-nats/developer/connecting#nats-url + #[configurable(metadata(docs::examples = "nats://demo.nats.io"))] + #[configurable(metadata(docs::examples = "nats://127.0.0.1:4242"))] + pub(super) url: String, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + pub(super) auth: Option, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, +} + +fn default_name() -> String { + String::from("vector") +} + +impl GenerateConfig for NatsSinkConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + acknowledgements: Default::default(), + auth: None, + connection_name: "vector".into(), + encoding: JsonSerializerConfig::default().into(), + subject: Template::try_from("from.vector").unwrap(), + tls: None, + url: "nats://127.0.0.1:4222".into(), + request: Default::default(), + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "nats")] +impl SinkConfig for NatsSinkConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sink = NatsSink::new(self.clone()).await?; + let healthcheck = healthcheck(self.clone()).boxed(); + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(self.encoding.config().input_type() & DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions { + type Error = NatsConfigError; + + fn try_from(config: &NatsSinkConfig) -> Result { + from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) + } +} + +impl NatsSinkConfig { + pub(super) async fn connect(&self) -> Result { + let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; + + options.connect(&self.url).await.context(ConnectSnafu) + } +} + +async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { + config.connect().map_ok(|_| ()).map_err(|e| e.into()).await +} diff --git a/src/sinks/nats/integration_tests.rs b/src/sinks/nats/integration_tests.rs new file mode 100644 index 0000000000000..c8972d8fb11e9 --- /dev/null +++ b/src/sinks/nats/integration_tests.rs @@ -0,0 +1,478 @@ +use codecs::TextSerializerConfig; +use std::time::Duration; + +use super::{config::NatsSinkConfig, sink::NatsSink, NatsError}; +use crate::{ + nats::{ + NatsAuthConfig, NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword, + }, + sinks::prelude::*, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + random_lines_with_stream, random_string, trace_init, + }, + tls::TlsEnableableConfig, +}; + +async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), NatsError> { + // Publish `N` messages to NATS. + // + // Verify with a separate subscriber that the messages were + // successfully published. + + // Create Sink + let sink = NatsSink::new(conf.clone()).await?; + let sink = VectorSink::from_event_streamsink(sink); + + // Establish the consumer subscription. + let subject = conf.subject.clone(); + let consumer = conf + .clone() + .connect() + .await + .expect("failed to connect with test consumer"); + let mut sub = consumer + .subscribe(subject.to_string()) + .await + .expect("failed to subscribe with test consumer"); + consumer + .flush() + .await + .expect("failed to flush with the test consumer"); + + // Publish events. + let num_events = 10; + let (input, events) = random_lines_with_stream(100, num_events, None); + + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + + // Unsubscribe from the channel. + tokio::time::sleep(Duration::from_secs(3)).await; + sub.unsubscribe().await.unwrap(); + + let mut output: Vec = Vec::new(); + while let Some(msg) = sub.next().await { + output.push(String::from_utf8_lossy(&msg.payload).to_string()) + } + + assert_eq!(output.len(), input.len()); + assert_eq!(output, input); + + Ok(()) +} + +#[tokio::test] +async fn nats_no_auth() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_userpass_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_USERPASS_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::UserPassword { + user_password: NatsAuthUserPassword { + user: "natsuser".to_string(), + password: "natspass".to_string().into(), + }, + }), + request: Default::default(), + }; + + publish_and_check(conf) + .await + .expect("publish_and_check failed"); +} + +#[tokio::test] +async fn nats_userpass_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_USERPASS_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::UserPassword { + user_password: NatsAuthUserPassword { + user: "natsuser".to_string(), + password: "wrongpass".to_string().into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_token_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TOKEN_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Token { + token: NatsAuthToken { + value: "secret".to_string().into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_token_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TOKEN_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Token { + token: NatsAuthToken { + value: "wrongsecret".to_string().into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_nkey_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_NKEY_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Nkey { + nkey: NatsAuthNKey { + nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(), + seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_nkey_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_NKEY_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: Some(NatsAuthConfig::Nkey { + nkey: NatsAuthNKey { + nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), + seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Config, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_TLS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_TLS_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: None, + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_client_cert_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + crt_file: Some("tests/data/nats/nats-client.pem".into()), + key_file: Some("tests/data/nats/nats-client.key".into()), + ..Default::default() + }, + }), + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_client_cert_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = std::env::var("NATS_TLS_CLIENT_CERT_ADDRESS") + .unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_jwt_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_JWT_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig::CredentialsFile { + credentials_file: NatsAuthCredentialsFile { + path: "tests/data/nats/nats.creds".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); +} + +#[tokio::test] +async fn nats_tls_jwt_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + let url = + std::env::var("NATS_JWT_ADDRESS").unwrap_or_else(|_| String::from("nats://localhost:4222")); + + let conf = NatsSinkConfig { + acknowledgements: Default::default(), + encoding: TextSerializerConfig::default().into(), + connection_name: "".to_owned(), + subject: Template::try_from(subject.as_str()).unwrap(), + url, + tls: Some(TlsEnableableConfig { + enabled: Some(true), + options: TlsConfig { + ca_file: Some("tests/data/nats/rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig::CredentialsFile { + credentials_file: NatsAuthCredentialsFile { + path: "tests/data/nats/nats-bad.creds".into(), + }, + }), + request: Default::default(), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(NatsError::Connect { .. })), + "publish_and_check failed, expected NatsError::Connect, got: {:?}", + r + ); +} diff --git a/src/sinks/nats/mod.rs b/src/sinks/nats/mod.rs new file mode 100644 index 0000000000000..a9670aeea514a --- /dev/null +++ b/src/sinks/nats/mod.rs @@ -0,0 +1,30 @@ +//! `NATS` sink +//! Publishes data using [NATS](nats.io)(Neural Autonomic Transport System). + +use snafu::Snafu; + +use crate::nats::NatsConfigError; + +mod config; +#[cfg(feature = "nats-integration-tests")] +#[cfg(test)] +mod integration_tests; +mod request_builder; +mod service; +mod sink; +#[cfg(test)] +mod tests; + +#[derive(Debug, Snafu)] +enum NatsError { + #[snafu(display("invalid encoding: {}", source))] + Encoding { + source: codecs::encoding::BuildError, + }, + #[snafu(display("NATS Config Error: {}", source))] + Config { source: NatsConfigError }, + #[snafu(display("NATS Connect Error: {}", source))] + Connect { source: async_nats::ConnectError }, + #[snafu(display("NATS Server Error: {}", source))] + ServerError { source: async_nats::Error }, +} diff --git a/src/sinks/nats/request_builder.rs b/src/sinks/nats/request_builder.rs new file mode 100644 index 0000000000000..6ff785b15a28e --- /dev/null +++ b/src/sinks/nats/request_builder.rs @@ -0,0 +1,117 @@ +use std::io; + +use bytes::{Bytes, BytesMut}; +use tokio_util::codec::Encoder as _; +use vector_core::config::telemetry; + +use crate::sinks::prelude::*; + +use super::sink::NatsEvent; + +pub(super) struct NatsEncoder { + pub(super) transformer: Transformer, + pub(super) encoder: Encoder<()>, +} + +impl encoding::Encoder for NatsEncoder { + fn encode_input( + &self, + mut input: Event, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut body = BytesMut::new(); + self.transformer.transform(&mut input); + + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&input, input.estimated_json_encoded_size_of()); + + let mut encoder = self.encoder.clone(); + encoder + .encode(input, &mut body) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode"))?; + + let body = body.freeze(); + write_all(writer, 1, body.as_ref())?; + + Ok((body.len(), byte_size)) + } +} + +pub(super) struct NatsMetadata { + subject: String, + finalizers: EventFinalizers, +} + +pub(super) struct NatsRequestBuilder { + pub(super) encoder: NatsEncoder, +} + +#[derive(Clone)] +pub(super) struct NatsRequest { + pub(super) bytes: Bytes, + pub(super) subject: String, + finalizers: EventFinalizers, + pub(super) metadata: RequestMetadata, +} + +impl Finalizable for NatsRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for NatsRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl RequestBuilder for NatsRequestBuilder { + type Metadata = NatsMetadata; + type Events = Event; + type Encoder = NatsEncoder; + type Payload = Bytes; + type Request = NatsRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut input: NatsEvent, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let builder = RequestMetadataBuilder::from_event(&input.event); + + let metadata = NatsMetadata { + subject: input.subject, + finalizers: input.event.take_finalizers(), + }; + + (metadata, builder, input.event) + } + + fn build_request( + &self, + nats_metadata: Self::Metadata, + metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + let body = payload.into_payload(); + NatsRequest { + bytes: body, + subject: nats_metadata.subject, + finalizers: nats_metadata.finalizers, + metadata, + } + } +} diff --git a/src/sinks/nats/service.rs b/src/sinks/nats/service.rs new file mode 100644 index 0000000000000..0eb2407ab5738 --- /dev/null +++ b/src/sinks/nats/service.rs @@ -0,0 +1,63 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use futures_util::TryFutureExt; + +use crate::sinks::prelude::*; + +use super::{request_builder::NatsRequest, NatsError}; + +#[derive(Clone)] +pub(super) struct NatsService { + pub(super) connection: Arc, +} + +pub(super) struct NatsResponse { + metadata: RequestMetadata, +} + +impl DriverResponse for NatsResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_encoded_size()) + } +} + +impl Service for NatsService { + type Response = NatsResponse; + + type Error = NatsError; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: NatsRequest) -> Self::Future { + let connection = Arc::clone(&self.connection); + + Box::pin(async move { + match connection + .publish(req.subject, req.bytes) + .map_err(async_nats::Error::from) + .and_then(|_| connection.flush().map_err(Into::into)) + .await + { + Err(error) => Err(NatsError::ServerError { source: error }), + Ok(_) => Ok(NatsResponse { + metadata: req.metadata, + }), + } + }) + } +} diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs new file mode 100644 index 0000000000000..97fc747dc2f89 --- /dev/null +++ b/src/sinks/nats/sink.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; + +use snafu::ResultExt; + +use crate::sinks::prelude::*; + +use super::{ + config::NatsSinkConfig, + request_builder::{NatsEncoder, NatsRequestBuilder}, + service::{NatsResponse, NatsService}, + EncodingSnafu, NatsError, +}; + +pub(super) struct NatsEvent { + pub(super) event: Event, + pub(super) subject: String, +} + +pub(super) struct NatsSink { + request: TowerRequestConfig, + transformer: Transformer, + encoder: Encoder<()>, + connection: Arc, + subject: Template, +} + +impl NatsSink { + fn make_nats_event(&self, event: Event) -> Option { + let subject = self + .subject + .render_string(&event) + .map_err(|missing_keys| { + emit!(TemplateRenderingError { + error: missing_keys, + field: Some("subject"), + drop_event: true, + }); + }) + .ok()?; + + Some(NatsEvent { event, subject }) + } + + pub(super) async fn new(config: NatsSinkConfig) -> Result { + let connection = Arc::new(config.connect().await?); + let transformer = config.encoding.transformer(); + let serializer = config.encoding.build().context(EncodingSnafu)?; + let encoder = Encoder::<()>::new(serializer); + let request = config.request; + let subject = config.subject; + + Ok(NatsSink { + request, + connection, + transformer, + encoder, + subject, + }) + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let request = self.request.unwrap_with(&TowerRequestConfig { + concurrency: Concurrency::Fixed(1), + ..Default::default() + }); + + let request_builder = NatsRequestBuilder { + encoder: NatsEncoder { + encoder: self.encoder.clone(), + transformer: self.transformer.clone(), + }, + }; + + let service = ServiceBuilder::new() + .settings(request, NatsRetryLogic) + .service(NatsService { + connection: Arc::clone(&self.connection), + }); + + input + .filter_map(|event| std::future::ready(self.make_nats_event(event))) + .request_builder(None, request_builder) + .filter_map(|request| async move { + match request { + Err(e) => { + error!("Failed to build NATS request: {:?}.", e); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(service) + .protocol("nats") + .run() + .await + } +} + +#[async_trait] +impl StreamSink for NatsSink { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} + +#[derive(Debug, Clone)] +pub(super) struct NatsRetryLogic; + +impl RetryLogic for NatsRetryLogic { + type Error = NatsError; + type Response = NatsResponse; + + fn is_retriable_error(&self, _error: &Self::Error) -> bool { + true + } +} diff --git a/src/sinks/nats/tests.rs b/src/sinks/nats/tests.rs new file mode 100644 index 0000000000000..92e90e0d77fba --- /dev/null +++ b/src/sinks/nats/tests.rs @@ -0,0 +1,6 @@ +use super::config::NatsSinkConfig; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index b66de220ed5ed..9c7e81cf39418 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -6,16 +6,16 @@ pub use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext}, event::{Event, LogEvent}, internal_events::{SinkRequestBuildError, TemplateRenderingError}, - sinks::util::retries::RetryLogic, sinks::{ util::{ builder::SinkBuilderExt, encoding::{self, write_all}, metadata::RequestMetadataBuilder, request_builder::EncodeResult, + retries::{RetryAction, RetryLogic}, service::{ServiceBuilderExt, Svc}, - BatchConfig, Compression, NoDefaultsBatchSettings, RequestBuilder, SinkBatchSettings, - TowerRequestConfig, + BatchConfig, Compression, Concurrency, NoDefaultsBatchSettings, RequestBuilder, + SinkBatchSettings, TowerRequestConfig, }, Healthcheck, HealthcheckError, }, @@ -33,6 +33,7 @@ pub use vector_common::{ request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata}, }; pub use vector_config::configurable_component; + pub use vector_core::{ config::{AcknowledgementsConfig, Input}, event::Value, diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 0478d9d192fb1..5338ba144bef4 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -3,7 +3,7 @@ use std::{convert::TryInto, io::ErrorKind}; use async_compression::tokio::bufread; use aws_sdk_s3::types::ByteStream; use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions}; -use codecs::{BytesDeserializerConfig, NewlineDelimitedDecoderConfig}; +use codecs::NewlineDelimitedDecoderConfig; use futures::{stream, stream::StreamExt, TryStreamExt}; use lookup::owned_value_path; use snafu::Snafu; @@ -14,7 +14,6 @@ use vrl::value::{kind::Collection, Kind}; use super::util::MultilineConfig; use crate::codecs::DecodingConfig; -use crate::config::DataType; use crate::{ aws::{auth::AwsAuthentication, create_client, create_client_and_region, RegionOrEndpoint}, common::{s3::S3ClientBuilder, sqs::SqsClientBuilder}, @@ -163,7 +162,8 @@ impl SourceConfig for AwsS3Config { fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); - let mut schema_definition = BytesDeserializerConfig + let mut schema_definition = self + .decoding .schema_definition(log_namespace) .with_source_metadata( Self::NAME, @@ -199,7 +199,7 @@ impl SourceConfig for AwsS3Config { Self::NAME, None, &owned_value_path!("metadata"), - Kind::object(Collection::empty().with_unknown(Kind::bytes())), + Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), None, ); @@ -208,7 +208,10 @@ impl SourceConfig for AwsS3Config { schema_definition = schema_definition.unknown_fields(Kind::bytes()); } - vec![SourceOutput::new_logs(DataType::Log, schema_definition)] + vec![SourceOutput::new_logs( + self.decoding.output_type(), + schema_definition, + )] } fn can_acknowledge(&self) -> bool { @@ -440,6 +443,7 @@ mod integration_tests { use aws_sdk_s3::{types::ByteStream, Client as S3Client}; use aws_sdk_sqs::{model::QueueAttributeName, Client as SqsClient}; + use codecs::{decoding::DeserializerConfig, JsonDeserializerConfig}; use lookup::path; use similar_asserts::assert_eq; use vrl::value::Value; @@ -483,6 +487,35 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, + ) + .await; + } + + #[tokio::test] + async fn s3_process_json_message() { + trace_init(); + + let logs: Vec = random_lines(100).take(10).collect(); + + let json_logs: Vec = logs + .iter() + .map(|msg| { + // convert to JSON object + format!(r#"{{"message": "{}"}}"#, msg) + }) + .collect(); + + test_event( + None, + None, + None, + None, + json_logs.join("\n").into_bytes(), + logs, + Delivered, + false, + DeserializerConfig::Json(JsonDeserializerConfig::default()), ) .await; } @@ -502,6 +535,7 @@ mod integration_tests { logs, Delivered, true, + DeserializerConfig::Bytes, ) .await; } @@ -522,6 +556,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -542,6 +577,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -570,6 +606,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -599,6 +636,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -628,6 +666,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -655,6 +694,7 @@ mod integration_tests { vec!["abc\ndef\ngeh".to_owned()], Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -677,6 +717,7 @@ mod integration_tests { logs, Errored, false, + DeserializerConfig::Bytes, ) .await; } @@ -696,6 +737,7 @@ mod integration_tests { logs, Rejected, false, + DeserializerConfig::Bytes, ) .await; } @@ -708,6 +750,7 @@ mod integration_tests { queue_url: &str, multiline: Option, log_namespace: bool, + decoding: DeserializerConfig, ) -> AwsS3Config { AwsS3Config { region: RegionOrEndpoint::with_both("us-east-1", s3_address()), @@ -723,6 +766,7 @@ mod integration_tests { }), acknowledgements: true.into(), log_namespace: Some(log_namespace), + decoding, ..Default::default() } } @@ -738,6 +782,7 @@ mod integration_tests { expected_lines: Vec, status: EventStatus, log_namespace: bool, + decoding: DeserializerConfig, ) { assert_source_compliance(&SOURCE_TAGS, async move { let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); @@ -750,7 +795,7 @@ mod integration_tests { tokio::time::sleep(Duration::from_secs(1)).await; - let config = config(&queue, multiline, log_namespace); + let config = config(&queue, multiline, log_namespace, decoding); s3.put_object() .bucket(bucket.clone()) @@ -831,6 +876,11 @@ mod integration_tests { assert_eq!(expected_lines.len(), events.len()); for (i, event) in events.iter().enumerate() { + + if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition { + schema_definition.is_valid_for_event(event).unwrap(); + } + let message = expected_lines[i].as_str(); let log = event.as_log(); diff --git a/src/transforms/sample.rs b/src/transforms/sample.rs index a27b57f2cca9c..1d06ef96dcb98 100644 --- a/src/transforms/sample.rs +++ b/src/transforms/sample.rs @@ -1,6 +1,7 @@ use vector_config::configurable_component; -use vector_core::config::LogNamespace; -use vrl::event_path; +use vector_core::config::{LegacyKey, LogNamespace}; +use vrl::value::Kind; +use vrl::{event_path, owned_value_path}; use crate::{ conditions::{AnyCondition, Condition}, @@ -85,7 +86,18 @@ impl TransformConfig for SampleConfig { DataType::Log | DataType::Trace, input_definitions .iter() - .map(|(output, definition)| (output.clone(), definition.clone())) + .map(|(output, definition)| { + ( + output.clone(), + definition.clone().with_source_metadata( + SampleConfig::NAME, + Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))), + &owned_value_path!("sample_rate"), + Kind::bytes(), + None, + ), + ) + }) .collect(), )] } @@ -153,10 +165,16 @@ impl FunctionTransform for Sample { if num % self.rate == 0 { match event { Event::Log(ref mut event) => { - event.insert(event_path!("sample_rate"), self.rate.to_string()) + event.namespace().insert_source_metadata( + SampleConfig::NAME, + event, + Some(LegacyKey::Overwrite(vrl::path!("sample_rate"))), + vrl::path!("sample_rate"), + self.rate.to_string(), + ); } Event::Trace(ref mut event) => { - event.insert(event_path!("sample_rate"), self.rate.to_string()) + event.insert(event_path!("sample_rate"), self.rate.to_string()); } Event::Metric(_) => panic!("component can never receive metric events"), }; diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index 2708c111c08cc..3565f375d2f67 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -266,6 +266,160 @@ base: components: sinks: nats: configuration: { } } } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, etc. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + Note that the new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency). + + It is recommended to set this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: "Configuration for outbound request concurrency." + required: false + type: { + string: { + default: "none" + enum: { + adaptive: """ + Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: """ + The maximum number of retries to make for failed requests. + + The default, for all intents and purposes, represents an infinite number of retries. + """ + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } subject: { description: """ The NATS [subject][nats_subject] to publish messages to.