From d06ee2ae2102a3e68a775a19a4ed34a6fab861db Mon Sep 17 00:00:00 2001 From: seeyarh Date: Tue, 8 Mar 2022 20:10:23 -0500 Subject: [PATCH] enhancement(auth): NATS sink+source authentication and TLS support (#10688) * add tls support for nats source/sink, add support for all nats authenticatation options for nats source/sink, integration tests for nats tls and nats auth Signed-off-by: Collins Huff * fix broken podman setup, remove old print statements Signed-off-by: Collins Huff * add registry to image name so podman doesn't prompt Signed-off-by: Collins Huff * format cue files Signed-off-by: Collins Huff * use snafu context to make error handling more concise, add print statements to asserts in integration tests to report what failed, cleanup NATS docs Signed-off-by: Collins Huff * Update auth options to follow external tagging format Signed-off-by: Jesse Szwedko * clippy Signed-off-by: Jesse Szwedko Co-authored-by: Bruce Guenter Co-authored-by: Jesse Szwedko --- Cargo.lock | 83 ++- Cargo.toml | 5 +- .../setup_integration/nats_integration_env.sh | 125 ++++- src/lib.rs | 2 + src/nats.rs | 163 ++++++ src/sinks/nats.rs | 499 +++++++++++++++--- src/sources/nats.rs | 447 ++++++++++++++-- tests/data/localhost-mkcert-key.pem | 28 + tests/data/localhost-mkcert.pem | 24 + tests/data/mkcert_rootCA.pem | 26 + tests/data/nats-bad.creds | 13 + tests/data/nats-jwt.conf | 15 + tests/data/nats-nkey.conf | 5 + tests/data/nats-tls-client-cert.conf | 12 + tests/data/nats-tls.conf | 4 + tests/data/nats.creds | 13 + tests/data/nats_client_cert.pem | 25 + tests/data/nats_client_key.pem | 28 + website/cue/reference/components/nats.cue | 103 ++++ .../cue/reference/components/sinks/nats.cue | 8 +- 20 files changed, 1510 insertions(+), 118 deletions(-) create mode 100644 src/nats.rs create mode 100644 tests/data/localhost-mkcert-key.pem create mode 100644 tests/data/localhost-mkcert.pem create mode 100644 tests/data/mkcert_rootCA.pem create mode 100644 tests/data/nats-bad.creds create mode 100644 tests/data/nats-jwt.conf create mode 100644 tests/data/nats-nkey.conf create mode 100644 tests/data/nats-tls-client-cert.conf create mode 100644 tests/data/nats-tls.conf create mode 100644 tests/data/nats.creds create mode 100644 tests/data/nats_client_cert.pem create mode 100644 tests/data/nats_client_key.pem diff --git a/Cargo.lock b/Cargo.lock index 51de9ef5be531..1d6b8fab07052 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,6 +927,12 @@ dependencies = [ "base64 0.13.0", ] +[[package]] +name = "base64ct" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" + [[package]] name = "bindgen" version = "0.56.0" @@ -1498,6 +1504,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-oid" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" + [[package]] name = "const_fn" version = "0.4.8" @@ -1920,6 +1932,15 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76fbd10dce159c002b9c688ae8ab7cd531151e185e0ad360f4bfea3b0eede3a8" +[[package]] +name = "der" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4" +dependencies = [ + "const-oid", +] + [[package]] name = "derivative" version = "2.2.0" @@ -4255,7 +4276,7 @@ dependencies = [ "libc", "log", "memchr", - "nkeys", + "nkeys 0.1.0", "nuid", "once_cell", "parking_lot 0.11.2", @@ -4407,7 +4428,22 @@ dependencies = [ "ed25519-dalek", "log", "rand 0.7.3", - "signatory", + "signatory 0.21.0", +] + +[[package]] +name = "nkeys" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" +dependencies = [ + "byteorder", + "data-encoding", + "ed25519-dalek", + "getrandom 0.2.3", + "log", + "rand 0.8.5", + "signatory 0.23.2", ] [[package]] @@ -4904,6 +4940,15 @@ dependencies = [ "regex", ] +[[package]] +name = "pem-rfc7468" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e93a3b1cc0510b03020f33f21e62acdde3dcaef432edc95bea377fbd4c2cd4" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -5079,6 +5124,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee3ef9b64d26bad0536099c816c6734379e45bbd5f14798def6809e5cc350447" +dependencies = [ + "der", + "pem-rfc7468", + "spki", + "zeroize", +] + [[package]] name = "pkg-config" version = "0.3.22" @@ -6814,6 +6871,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "signatory" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfecc059e81632eef1dd9b79e22fc28b8fe69b30d3357512a77a0ad8ee3c782" +dependencies = [ + "pkcs8", + "rand_core 0.6.3", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "1.4.0" @@ -6977,6 +7046,15 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" +[[package]] +name = "spki" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c01a0c15da1b0b0e1494112e7af814a678fec9bd157881b49beac661e9b6f32" +dependencies = [ + "der", +] + [[package]] name = "standback" version = "0.2.17" @@ -8412,6 +8490,7 @@ dependencies = [ "mongodb", "nats", "nix", + "nkeys 0.2.0", "nom 7.1.0", "notify", "num-format", diff --git a/Cargo.toml b/Cargo.toml index 8fc938bf3b40a..e2216c5ab46e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -251,6 +251,7 @@ md-5 = { version = "0.10", optional = true } memchr = { version = "2.4", default-features = false, optional = true } mongodb = { version = "2.1.0", default-features = false, features = ["tokio-runtime"], optional = true } nats = { version = "0.18.1", default-features = false, optional = true } +nkeys = { version = "0.2.0", optional = true } nom = { version = "7.1.0", default-features = false, optional = true } notify = { version = "4.0.17", default-features = false } num_cpus = { version = "1.13.1", default-features = false } @@ -469,7 +470,7 @@ sources-internal_logs = [] sources-internal_metrics = [] sources-journald = ["codecs"] sources-kafka = ["rdkafka", "codecs"] -sources-nats = ["nats", "codecs"] +sources-nats = ["nats", "nkeys", "codecs"] sources-logstash = ["listenfd", "tokio-util/net", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "codecs"] sources-kubernetes_logs = ["file-source", "kubernetes", "transforms-merge", "transforms-regex_parser"] sources-mongodb_metrics = ["mongodb"] @@ -655,7 +656,7 @@ sinks-influxdb = [] sinks-kafka = ["rdkafka"] sinks-logdna = [] sinks-loki = [] -sinks-nats = ["nats"] +sinks-nats = ["nats", "nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["syslog"] diff --git a/scripts/setup_integration/nats_integration_env.sh b/scripts/setup_integration/nats_integration_env.sh index 9f9e68b4b47c1..cdaaeeafbdc68 100755 --- a/scripts/setup_integration/nats_integration_env.sh +++ b/scripts/setup_integration/nats_integration_env.sh @@ -14,30 +14,133 @@ then fi ACTION=$1 -# -# Functions -# start_podman () { - podman pod create --replace --name vector-test-integration-nats -p 4222:4222 - podman run -d --pod=vector-test-integration-nats --name vector_nats \ - nats + podman pod create --replace --name vector_nats -p 4222:4222 + podman pod create --replace --name vector_nats_userpass -p 4223:4222 + podman pod create --replace --name vector_nats_token -p 4224:4222 + podman pod create --replace --name vector_nats_nkey -p 4225:4222 + podman pod create --replace --name vector_nats_tls -p 4227:4222 + podman pod create --replace --name vector_nats_tls_client_cert -p 4228:4222 + podman pod create --replace --name vector_nats_jwt -p 4229:4222 + + podman run -d --pod=vector_nats --name vector_nats_test docker.io/library/nats:latest + podman run -d --pod=vector_nats_userpass --name vector_nats_userpass_test docker.io/library/nats:latest \ + --user natsuser --pass natspass + podman run -d --pod=vector_nats_token --name vector_nats_token_test docker.io/library/nats:latest \ + --auth secret + podman run -d --pod=vector_nats_nkey --name vector_nats_nkey_test \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + docker.io/library/nats:latest -c /usr/share/nats/config/nats-nkey.conf + + podman run -d --pod=vector_nats_tls --name vector_nats_tls_test \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + docker.io/library/nats:latest -c /usr/share/nats/config/nats-tls.conf + + podman run -d --pod=vector_nats_tls_client_cert --name vector_nats_tls_client_cert_test \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + docker.io/library/nats:latest -c /usr/share/nats/config/nats-tls-client-cert.conf + + podman run -d --pod=vector_nats_jwt --name vector_nats_jwt_test \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + docker.io/library/nats:latest -c /usr/share/nats/config/nats-jwt.conf } start_docker () { docker network create vector-test-integration-nats - docker run -d --network=vector-test-integration-nats -p 4222:4222 --name vector_nats \ - nats + docker run -d --network=vector-test-integration-nats -p 4222:4222 --name vector_nats nats + docker run -d --network=vector-test-integration-nats -p 4223:4222 --name vector_nats_userpass nats \ + --user natsuser --pass natspass + docker run -d --network=vector-test-integration-nats -p 4224:4222 --name vector_nats_token nats \ + --auth secret + + # The following tls tests use mkcert + # https://github.com/FiloSottile/mkcert + # See https://docs.nats.io/running-a-nats-service/configuration/securing_nats/tls + + + # Generate a new NKey with the following command: + # $ nk -gen user -pubout + # SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY + # UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT + # + # The first line of output is the Seed, which is a private key + # The second line of output is the User string, which is a public key + docker run -d --network=vector-test-integration-nats -p 4225:4222 \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + --name vector_nats_nkey nats \ + -c /usr/share/nats/config/nats-nkey.conf + + # First, generate a certificate for the NATS server using the following command + # $ mkcert -cert-file server-cert.pem -key-file server-key.pem localhost ::1 + # + # Next, move the mkcert root CA to the correct location + # $ mv "$(mkcert -CAROOT)/rootCA.pem" tests/data/mkcert_rootCA.pem + docker run -d --network=vector-test-integration-nats -p 4227:4222 \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + --name vector_nats_tls nats \ + -c /usr/share/nats/config/nats-tls.conf + + # Generate a client cert using the following command + # $ mkcert -client -cert-file nats_client_cert.pem -key-file nats_client_key.pem localhost ::1 email@localhost + docker run -d --network=vector-test-integration-nats -p 4228:4222 \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + --name vector_nats_tls_client_cert nats \ + -c /usr/share/nats/config/nats-tls-client-cert.conf + + # Follow the instructions here + # See https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/jwt/mem_resolver + # Then run the following additional commands + # $ mv /tmp/server.conf tests/data/nats-jwt.conf + # $ cat << EOF >> tests/data/nats-jwt.conf + # + #tls: { + # cert_file: "/usr/share/nats/config/localhost-mkcert.pem" + # key_file: "/usr/share/nats/config/localhost-mkcert-key.pem" + #} + #EOF + # + # $ mv ~/.nkeys/creds/memory/A/TA.creds tests/data/nats.creds + # $ cp tests/data/nats.creds tests/data/nats-bad.creds + # # edit test/data/nats-bad.creds and change one of the characters in the Seed + + docker run -d --network=vector-test-integration-nats -p 4229:4222 \ + -v "$(pwd)"/tests/data:/usr/share/nats/config:ro \ + --name vector_nats_jwt nats \ + -c /usr/share/nats/config/nats-jwt.conf } stop_podman () { - podman rm --force vector_nats 2>/dev/null; true - podman pod stop vector-test-integration-nats 2>/dev/null; true - podman pod rm --force vector-test-integration-nats 2>/dev/null; true + podman pod stop vector_nats_test 2>/dev/null; true + podman pod rm --force vector_nats 2>/dev/null; true + + podman pod stop vector_nats_userpass_test 2>/dev/null; true + podman pod rm --force vector_nats_userpass 2>/dev/null; true + + podman pod stop vector_nats_token_test 2>/dev/null; true + podman pod rm --force vector_nats_token 2>/dev/null; true + + podman pod stop vector_nats_nkey_test 2>/dev/null; true + podman pod rm --force vector_nats_nkey 2>/dev/null; true + + podman pod stop vector_nats_tls_test 2>/dev/null; true + podman pod rm --force vector_nats_tls 2>/dev/null; true + + podman pod stop vector_nats_tls_client_cert_test 2>/dev/null; true + podman pod rm --force vector_nats_tls_client_cert 2>/dev/null; true + + podman pod stop vector_nats_jwt_test 2>/dev/null; true + podman pod rm --force vector_nats_jwt 2>/dev/null; true } stop_docker () { docker rm --force vector_nats 2>/dev/null; true + docker rm --force vector_nats_userpass 2>/dev/null; true + docker rm --force vector_nats_token 2>/dev/null; true + docker rm --force vector_nats_nkey 2>/dev/null; true + docker rm --force vector_nats_tls 2>/dev/null; true + docker rm --force vector_nats_tls_client_cert 2>/dev/null; true + docker rm --force vector_nats_jwt 2>/dev/null; true docker network rm vector-test-integration-nats 2>/dev/null; true } diff --git a/src/lib.rs b/src/lib.rs index 46b17e4741e7d..8456a62a47e8a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,8 @@ pub(crate) mod kafka; pub mod kubernetes; pub mod line_agg; pub mod list; +#[cfg(any(feature = "sources-nats", feature = "sinks-nats"))] +pub(crate) mod nats; #[allow(unreachable_pub)] pub(crate) mod proto; pub mod providers; diff --git a/src/nats.rs b/src/nats.rs new file mode 100644 index 0000000000000..89913d87657ae --- /dev/null +++ b/src/nats.rs @@ -0,0 +1,163 @@ +use crate::tls::TlsConfig; +use nkeys::error::Error as NKeysError; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum NatsConfigError { + #[snafu(display("NATS Auth Config Error: {}", source))] + AuthConfigError { source: NKeysError }, + #[snafu(display("NATS TLS Config Error: missing key"))] + TlsMissingKey, + #[snafu(display("NATS TLS Config Error: missing cert"))] + TlsMissingCert, + #[snafu(display("Missing configuration for auth strategy: {}", strategy))] + AuthStrategyMissingConfiguration { strategy: NatsAuthStrategy }, +} + +#[derive(Derivative, Copy, Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +#[derivative(Default)] +pub enum NatsAuthStrategy { + #[derivative(Default)] + UserPassword, + Token, + CredentialsFile, + NKey, +} + +impl std::fmt::Display for NatsAuthStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + use NatsAuthStrategy::*; + match self { + UserPassword => write!(f, "user_password"), + Token => write!(f, "token"), + CredentialsFile => write!(f, "credentials_file"), + NKey => write!(f, "nkey"), + } + } +} + +#[derive(Default, Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")] +pub(crate) struct NatsAuthConfig { + pub(crate) strategy: NatsAuthStrategy, + pub(crate) user_password: Option, + pub(crate) token: Option, + pub(crate) credentials_file: Option, + pub(crate) nkey: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub(crate) struct NatsAuthUserPassword { + pub(crate) user: String, + pub(crate) password: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub(crate) struct NatsAuthToken { + pub(crate) value: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub(crate) struct NatsAuthCredentialsFile { + pub(crate) path: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub(crate) struct NatsAuthNKey { + pub(crate) nkey: String, + pub(crate) seed: String, +} + +impl NatsAuthConfig { + pub(crate) fn to_nats_options(&self) -> Result { + match self.strategy { + NatsAuthStrategy::UserPassword => self + .user_password + .as_ref() + .map(|config| nats::asynk::Options::with_user_pass(&config.user, &config.password)) + .ok_or(NatsConfigError::AuthStrategyMissingConfiguration { + strategy: self.strategy, + }), + NatsAuthStrategy::CredentialsFile => self + .credentials_file + .as_ref() + .map(|config| nats::asynk::Options::with_credentials(&config.path)) + .ok_or(NatsConfigError::AuthStrategyMissingConfiguration { + strategy: self.strategy, + }), + NatsAuthStrategy::NKey => self + .nkey + .as_ref() + .map(|config| { + nkeys::KeyPair::from_seed(&config.seed) + .context(AuthConfigSnafu) + .map(|kp| { + // The following unwrap is safe because the only way the sign method can fail is if + // keypair does not contain a seed. We are constructing the keypair from a seed in + // the preceding line. + nats::asynk::Options::with_nkey(&config.nkey, move |nonce| { + kp.sign(nonce).unwrap() + }) + }) + }) + .ok_or(NatsConfigError::AuthStrategyMissingConfiguration { + strategy: self.strategy, + }) + .and_then(std::convert::identity), + NatsAuthStrategy::Token => self + .token + .as_ref() + .map(|config| nats::asynk::Options::with_token(&config.value)) + .ok_or(NatsConfigError::AuthStrategyMissingConfiguration { + strategy: self.strategy, + }), + } + } +} + +pub(crate) fn from_tls_auth_config( + connection_name: &str, + auth_config: &Option, + tls_config: &Option, +) -> Result { + let nats_options = match &auth_config { + None => nats::asynk::Options::new(), + Some(auth) => auth.to_nats_options()?, + }; + + let nats_options = nats_options + .with_name(connection_name) + // Set reconnect_buffer_size on the nats client to 0 bytes so that the + // client doesn't buffer internally (to avoid message loss). + .reconnect_buffer_size(0); + + match tls_config { + None => Ok(nats_options), + Some(tls_config) => { + let tls_enabled = tls_config.enabled.unwrap_or(false); + let nats_options = nats_options.tls_required(tls_enabled); + if !tls_enabled { + return Ok(nats_options); + } + + let nats_options = match &tls_config.options.ca_file { + None => nats_options, + Some(ca_file) => nats_options.add_root_certificate(ca_file), + }; + + let nats_options = match (&tls_config.options.crt_file, &tls_config.options.key_file) { + (None, None) => nats_options, + (Some(crt_file), Some(key_file)) => nats_options.client_cert(crt_file, key_file), + (Some(_crt_file), None) => return Err(NatsConfigError::TlsMissingKey), + (None, Some(_key_file)) => return Err(NatsConfigError::TlsMissingCert), + }; + Ok(nats_options) + } + } +} diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index ed20d9b9c56ed..44e0c0f917e5a 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -12,17 +12,23 @@ use crate::{ }, event::Event, internal_events::{NatsEventSendError, NatsEventSendSuccess, TemplateRenderingError}, + nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, sinks::util::{ encoding::{EncodingConfig, EncodingConfiguration}, StreamSink, }, template::{Template, TemplateParseError}, + tls::TlsConfig, }; #[derive(Debug, Snafu)] enum BuildError { #[snafu(display("invalid subject template: {}", source))] SubjectTemplate { source: TemplateParseError }, + #[snafu(display("NATS Config Error: {}", source))] + Config { source: NatsConfigError }, + #[snafu(display("NATS Connect Error: {}", source))] + Connect { source: std::io::Error }, } /** @@ -36,6 +42,9 @@ pub struct NatsSinkConfig { connection_name: String, subject: String, url: String, + tls: Option, + #[serde(flatten)] + auth: Option, } fn default_name() -> String { @@ -73,7 +82,7 @@ impl SinkConfig for NatsSinkConfig { &self, cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = NatsSink::new(self.clone(), cx.acker())?; + let sink = NatsSink::new(self.clone(), cx.acker()).await?; let healthcheck = healthcheck(self.clone()).boxed(); Ok((super::VectorSink::from_event_streamsink(sink), healthcheck)) } @@ -91,79 +100,49 @@ impl SinkConfig for NatsSinkConfig { } } -impl NatsSinkConfig { - fn to_nats_options(&self) -> nats::asynk::Options { - // Set reconnect_buffer_size on the nats client to 0 bytes so that the - // client doesn't buffer internally (to avoid message loss). - nats::asynk::Options::new() - .with_name(&self.connection_name) - .reconnect_buffer_size(0) - } +impl std::convert::TryFrom<&NatsSinkConfig> for nats::asynk::Options { + type Error = NatsConfigError; - async fn connect(&self) -> crate::Result { - self.to_nats_options() - .connect(&self.url) - .map_err(|e| e.into()) - .await + fn try_from(config: &NatsSinkConfig) -> Result { + from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) } } -async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { - config.connect().map_ok(|_| ()).await -} +impl NatsSinkConfig { + async fn connect(&self) -> Result { + let options: nats::asynk::Options = self.try_into().context(ConfigSnafu)?; -/** - * Code dealing with the Sink struct. - */ + options.connect(&self.url).await.context(ConnectSnafu) + } +} -#[derive(Clone)] -struct NatsOptions { - connection_name: String, +async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { + config.connect().map_ok(|_| ()).map_err(|e| e.into()).await } pub struct NatsSink { encoding: EncodingConfig, - options: NatsOptions, + connection: nats::asynk::Connection, subject: Template, - url: String, acker: Acker, } impl NatsSink { - fn new(config: NatsSinkConfig, acker: Acker) -> crate::Result { + async fn new(config: NatsSinkConfig, acker: Acker) -> Result { + let connection = config.connect().await?; + Ok(NatsSink { - options: (&config).into(), + connection, encoding: config.encoding, subject: Template::try_from(config.subject).context(SubjectTemplateSnafu)?, - url: config.url, acker, }) } } -impl From for nats::asynk::Options { - fn from(options: NatsOptions) -> Self { - nats::asynk::Options::new() - .with_name(&options.connection_name) - .reconnect_buffer_size(0) - } -} - -impl From<&NatsSinkConfig> for NatsOptions { - fn from(options: &NatsSinkConfig) -> Self { - Self { - connection_name: options.connection_name.clone(), - } - } -} - #[async_trait] impl StreamSink for NatsSink { async fn run(self: Box, mut input: BoxStream<'_, Event>) -> Result<(), ()> { - let nats_options: nats::asynk::Options = self.options.into(); - - let nc = nats_options.connect(&self.url).await.map_err(|_| ())?; - while let Some(event) = input.next().await { let subject = match self.subject.render_string(&event) { Ok(subject) => subject, @@ -181,7 +160,7 @@ impl StreamSink for NatsSink { let log = encode_event(event, &self.encoding); let message_len = log.len(); - match nc.publish(&subject, log).await { + match self.connection.publish(&subject, log).await { Ok(_) => { emit!(&NatsEventSendSuccess { byte_size: message_len, @@ -251,35 +230,38 @@ mod integration_tests { use std::{thread, time::Duration}; use super::*; + use crate::nats::{ + NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthStrategy, NatsAuthToken, + NatsAuthUserPassword, + }; use crate::sinks::VectorSink; use crate::test_util::{random_lines_with_stream, random_string, trace_init}; + use crate::tls::TlsOptions; - #[tokio::test] - async fn nats_happy() { + async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), BuildError> { // Publish `N` messages to NATS. // // Verify with a separate subscriber that the messages were // successfully published. - trace_init(); - - let subject = format!("test-{}", random_string(10)); - - let cnf = NatsSinkConfig { - encoding: EncodingConfig::from(Encoding::Text), - connection_name: "".to_owned(), - subject: subject.clone(), - url: "nats://127.0.0.1:4222".to_owned(), - }; + // Create Sink + let (acker, ack_counter) = Acker::basic(); + let sink = NatsSink::new(conf.clone(), acker).await?; + let sink = VectorSink::from_event_streamsink(sink); // Establish the consumer subscription. - let consumer = cnf.clone().connect().await.unwrap(); - let sub = consumer.subscribe(&subject).await.unwrap(); + let subject = conf.subject.clone(); + let consumer = conf + .clone() + .connect() + .await + .expect("failed to connect with test consumer"); + let sub = consumer + .subscribe(&subject) + .await + .expect("failed to subscribe with test consumer"); // Publish events. - let (acker, ack_counter) = Acker::basic(); - let sink = NatsSink::new(cnf.clone(), acker).unwrap(); - let sink = VectorSink::from_event_streamsink(sink); let num_events = 1_000; let (input, events) = random_lines_with_stream(100, num_events, None); @@ -301,5 +283,390 @@ mod integration_tests { ack_counter.load(std::sync::atomic::Ordering::Relaxed), num_events ); + + Ok(()) + } + + #[tokio::test] + async fn nats_no_auth() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4222".to_owned(), + tls: None, + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_userpass_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4223".to_owned(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::UserPassword, + user_password: Some(NatsAuthUserPassword { + user: "natsuser".into(), + password: "natspass".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_userpass_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4224".to_owned(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::UserPassword, + user_password: Some(NatsAuthUserPassword { + user: "natsuser".into(), + password: "wrongpass".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_token_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4224".to_owned(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::Token, + token: Some(NatsAuthToken { + value: "secret".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_token_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4224".to_owned(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::Token, + token: Some(NatsAuthToken { + value: "wrongsecret".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_nkey_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4225".to_owned(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::NKey, + nkey: Some(NatsAuthNKey { + nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(), + seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_nkey_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4225".to_owned(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::NKey, + nkey: Some(NatsAuthNKey { + nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), + seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Config { .. })), + "publish_and_check failed, expected BuildError::Config, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4227".to_owned(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4227".to_owned(), + tls: None, + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_client_cert_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4228".to_owned(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + crt_file: Some("tests/data/nats_client_cert.pem".into()), + key_file: Some("tests/data/nats_client_key.pem".into()), + ..Default::default() + }, + }), + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_client_cert_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4228".to_owned(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_jwt_auth_valid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4229".to_owned(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::CredentialsFile, + credentials_file: Some(NatsAuthCredentialsFile { + path: "tests/data/nats.creds".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_jwt_auth_invalid() { + trace_init(); + + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSinkConfig { + encoding: EncodingConfig::from(Encoding::Text), + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4229".to_owned(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::CredentialsFile, + credentials_file: Some(NatsAuthCredentialsFile { + path: "tests/data/nats-bad.creds".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); } } diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 5f455e177688f..598e7561f73db 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use chrono::Utc; use futures::{pin_mut, stream, Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; use tokio_util::codec::FramedRead; use vector_core::ByteSizeOf; @@ -17,18 +17,22 @@ use crate::{ }, event::Event, internal_events::{BytesReceived, NatsEventsReceived, StreamClosedError}, + nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, serde::{default_decoding, default_framing_message_based}, shutdown::ShutdownSignal, sources::util::StreamDecodingError, + tls::TlsConfig, SourceSender, }; #[derive(Debug, Snafu)] enum BuildError { - #[snafu(display("Could not create Nats subscriber: {}", source))] - NatsCreateError { source: std::io::Error }, - #[snafu(display("Could not subscribe to Nats topics: {}", source))] - NatsSubscribeError { source: std::io::Error }, + #[snafu(display("NATS Config Error: {}", source))] + Config { source: NatsConfigError }, + #[snafu(display("NATS Connect Error: {}", source))] + Connect { source: std::io::Error }, + #[snafu(display("NATS Subscribe Error: {}", source))] + Subscribe { source: std::io::Error }, } #[derive(Clone, Debug, Derivative, Deserialize, Serialize)] @@ -40,6 +44,9 @@ struct NatsSourceConfig { connection_name: String, subject: String, queue: Option, + tls: Option, + #[serde(flatten)] + auth: Option, #[serde(default = "default_framing_message_based")] #[derivative(Default(value = "default_framing_message_based()"))] framing: FramingConfig, @@ -94,27 +101,17 @@ impl SourceConfig for NatsSourceConfig { } impl NatsSourceConfig { - fn to_nats_options(&self) -> nats::asynk::Options { - // Set reconnect_buffer_size on the nats client to 0 bytes so that the - // client doesn't buffer internally (to avoid message loss). - nats::asynk::Options::new() - .with_name(&self.connection_name) - .reconnect_buffer_size(0) - } - - async fn connect(&self) -> crate::Result { - self.to_nats_options() - .connect(&self.url) - .await - .map_err(|e| e.into()) + async fn connect(&self) -> Result { + let options: nats::asynk::Options = self.try_into().context(ConfigSnafu)?; + options.connect(&self.url).await.context(ConnectSnafu) } } -impl From for nats::asynk::Options { - fn from(config: NatsSourceConfig) -> Self { - nats::asynk::Options::new() - .with_name(&config.connection_name) - .reconnect_buffer_size(0) +impl std::convert::TryFrom<&NatsSourceConfig> for nats::asynk::Options { + type Error = NatsConfigError; + + fn try_from(config: &NatsSourceConfig) -> Result { + from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) } } @@ -180,7 +177,7 @@ async fn nats_source( async fn create_subscription( config: &NatsSourceConfig, -) -> crate::Result<(nats::asynk::Connection, nats::asynk::Subscription)> { +) -> Result<(nats::asynk::Connection, nats::asynk::Subscription), BuildError> { let nc = config.connect().await?; let subscription = match &config.queue { @@ -188,7 +185,7 @@ async fn create_subscription( Some(queue) => nc.queue_subscribe(&config.subject, queue).await, }; - let subscription = subscription?; + let subscription = subscription.context(SubscribeSnafu)?; Ok((nc, subscription)) } @@ -211,10 +208,32 @@ mod integration_tests { #![allow(clippy::print_stdout)] //tests use super::*; + use crate::nats::{ + NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthStrategy, NatsAuthToken, + NatsAuthUserPassword, + }; use crate::test_util::{collect_n, random_string}; + use crate::tls::TlsOptions; + + async fn publish_and_check(conf: NatsSourceConfig) -> Result<(), BuildError> { + let subject = conf.subject.clone(); + let (nc, sub) = create_subscription(&conf).await?; + let nc_pub = nc.clone(); + + let (tx, rx) = SourceSender::new_test(); + let decoder = DecodingConfig::new(conf.framing.clone(), conf.decoding.clone()).build(); + tokio::spawn(nats_source(nc, sub, decoder, ShutdownSignal::noop(), tx)); + let msg = "my message"; + nc_pub.publish(&subject, msg).await.unwrap(); + + let events = collect_n(rx, 1).await; + println!("Received event {:?}", events[0].as_log()); + assert_eq!(events[0].as_log()[log_schema().message_key()], msg.into()); + Ok(()) + } #[tokio::test] - async fn nats_happy() { + async fn nats_no_auth() { let subject = format!("test-{}", random_string(10)); let conf = NatsSourceConfig { @@ -224,19 +243,375 @@ mod integration_tests { queue: None, framing: default_framing_message_based(), decoding: default_decoding(), + tls: None, + auth: None, }; - let (nc, sub) = create_subscription(&conf).await.unwrap(); - let nc_pub = nc.clone(); + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } - let (tx, rx) = SourceSender::new_test(); - let decoder = DecodingConfig::new(conf.framing.clone(), conf.decoding.clone()).build(); - tokio::spawn(nats_source(nc, sub, decoder, ShutdownSignal::noop(), tx)); - let msg = "my message"; - nc_pub.publish(&subject, msg).await.unwrap(); + #[tokio::test] + async fn nats_userpass_auth_valid() { + let subject = format!("test-{}", random_string(10)); - let events = collect_n(rx, 1).await; - println!("Received event {:?}", events[0].as_log()); - assert_eq!(events[0].as_log()[log_schema().message_key()], msg.into()); + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4223".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::UserPassword, + user_password: Some(NatsAuthUserPassword { + user: "natsuser".into(), + password: "natspass".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_userpass_auth_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4223".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::UserPassword, + user_password: Some(NatsAuthUserPassword { + user: "natsuser".into(), + password: "wrongpass".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_token_auth_valid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4224".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::Token, + token: Some(NatsAuthToken { + value: "secret".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_token_auth_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4224".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::Token, + token: Some(NatsAuthToken { + value: "wrongsecret".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_nkey_auth_valid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4225".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::NKey, + nkey: Some(NatsAuthNKey { + nkey: "UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT".into(), + seed: "SUANIRXEZUROTXNFN3TJYMT27K7ZZVMD46FRIHF6KXKS4KGNVBS57YAFGY".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_nkey_auth_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://127.0.0.1:4225".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::NKey, + nkey: Some(NatsAuthNKey { + nkey: "UAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), + seed: "SBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Config { .. })), + "publish_and_check failed, expected BuildError::Config, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_valid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4227".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4227".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: None, + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_client_cert_valid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4228".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + crt_file: Some("tests/data/nats_client_cert.pem".into()), + key_file: Some("tests/data/nats_client_key.pem".into()), + ..Default::default() + }, + }), + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_client_cert_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4228".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: None, + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_jwt_auth_valid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4229".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::NKey, + credentials_file: Some(NatsAuthCredentialsFile { + path: "tests/data/nats.creds".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + r.is_ok(), + "publish_and_check failed, expected Ok(()), got: {:?}", + r + ); + } + + #[tokio::test] + async fn nats_tls_jwt_auth_invalid() { + let subject = format!("test-{}", random_string(10)); + + let conf = NatsSourceConfig { + connection_name: "".to_owned(), + subject: subject.clone(), + url: "nats://localhost:4229".to_owned(), + queue: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + tls: Some(TlsConfig { + enabled: Some(true), + options: TlsOptions { + ca_file: Some("tests/data/mkcert_rootCA.pem".into()), + ..Default::default() + }, + }), + auth: Some(NatsAuthConfig { + strategy: NatsAuthStrategy::NKey, + credentials_file: Some(NatsAuthCredentialsFile { + path: "tests/data/nats-bad.creds".into(), + }), + ..Default::default() + }), + }; + + let r = publish_and_check(conf).await; + assert!( + matches!(r, Err(BuildError::Connect { .. })), + "publish_and_check failed, expected BuildError::Connect, got: {:?}", + r + ); } } diff --git a/tests/data/localhost-mkcert-key.pem b/tests/data/localhost-mkcert-key.pem new file mode 100644 index 0000000000000..1d868e788560a --- /dev/null +++ b/tests/data/localhost-mkcert-key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDs8rp/CfwqnvVw ++ZwLFMRPYJq6MDHk+tJznRLeWsBQgz9JX9ZYcsdbIGSxGpTddXreVtXpX7f9ZbRZ +J9zSadaS0CgVtxpeyOYHYHHbWsZcOa0LSpqxxM+F+EZEmmOkkwTl/IW5/hnrxK3F +tfLA729CksDmul204KjCKirB0qnqX9ZuES5ucTeZCvGUPJURwcPqlsBr2csmG/mq +XxzhWuqctw0EdwkHzlB2tYxXSfafi2VTRCRb83z9TQLHj5MqKe+7ZnZJqW6uNbM3 +2mJnGNGZEuQYxAW4w1mpwRdsAkp7IPDbdZbSCEWuWdXs3YzsdXGbrYDHX2upFf+e +NQiD52RRAgMBAAECggEBALyO+LyJ/KMKNnW+HWBwGTy1usu5+Kqos3sPn2lVPL+V +gnJoktF2HZE1+Qs7BQYrLCjtX3NFGczL97YPU9oaIYhZn9yQBqsxImTyagjOdxYD +S+K1bt9SqHaLOZf0aKEZ4gcHan+1Z3lI3aR9g33nzFllIZ865cFAGuvQ1I/DjjYU +oDZUdfjLpI2sAMziGCbLYwiuOuqADEXvhAMhKbIDToixSzapEeQ0UPt6wJKh+d9J +WQT0FOANfIPPk+5nRXtUAUGR9r027+WshjxogqzBptsqj1kMKrQGgCOVMifBH0M9 +/n4L0DXhAtj38e8p+2tmJMPJlBcfXQPm8Q8sozO7TAECgYEA+BJOjjyJ8DNZESYH +ryfQNgiwqDsaaJnKC2MUC6oVVCYizo1PFZ6Zx60xARKjELPytQefbd4v0hkU1+EC +1vQ/VSrYnQvMWA6L/HNK614i7UzZyId1X425sssWTDlXc8Qk8QdaZPDs03vgbHbT +GaFukxJSaCVmM8LxMBGc26zJ8MECgYEA9IVpYCBNjvLomck2liToibRwS04rjX+u +bKcsN6PL+8SBJl6DaSNnRiJDIt6UQK3988hmmE1xJAuAtCfpgQi5oyRi6AaMwMxR +00XIDSKDZRdk8bVtZeN7yV4FIO1uAEO7uNl4SSwcLITqcYzMM3vesPmVzdiiQt+X +SkO1JiNgx5ECgYBee5KB6BrdaqE0v9hTZQWPXvXxzTj8t/5VIfJjku82sqO1KesC +H/sQKy2r6+5BkVgSjQmy7dom2jivFFthhXuNs5j3D6RPsZKtnzfct2j0Gr4j+zvA +HhmpOGKQu5JMXFt5lcRfB8gbsGXEyekTLQIRb/wFAKGRe2EjXsHwhToQwQKBgQDF +ISVB9OjyHMz+5mPWiezaz5hUbbHZAp25RND2DlxuHg7MwxeIVJ12wjqDyxOUTXFV +7zMfCBf1qjxZgW/TshgO+U+vpVAmQtKY1EIirLdncYPVBaIrqUjrn5vc3u120yRt +Mw2xWBlinslvP8aEnxF6dcyaxlKSPTX4DjvlglIMsQKBgQDW9PbIy+4mSGPLccFd +PveRYbjuNt94j0NVjBewnuL8qpiGkAHMoHEqlUu4LCLT516UxXjTCO8ZreTkwUue +Coi7534stMg+e1m1pVvXo5VbpNhUQTq6RN+WP3S1Y+hc4ZWtFHn6DnKN8PG3sDZH +ktV0ZDqnQz0q4lhNAMo45hJHwQ== +-----END PRIVATE KEY----- diff --git a/tests/data/localhost-mkcert.pem b/tests/data/localhost-mkcert.pem new file mode 100644 index 0000000000000..4a1ed64137d7d --- /dev/null +++ b/tests/data/localhost-mkcert.pem @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIID/TCCAmWgAwIBAgIQa1kguzCJFVSBMjFD9NGa4DANBgkqhkiG9w0BAQsFADBV +MR4wHAYDVQQKExVta2NlcnQgZGV2ZWxvcG1lbnQgQ0ExFTATBgNVBAsMDHNlZXlh +cmhAYm9uazEcMBoGA1UEAwwTbWtjZXJ0IHNlZXlhcmhAYm9uazAeFw0yMTEyMzAy +MjExNDFaFw0yNDAzMzAyMTExNDFaMEAxJzAlBgNVBAoTHm1rY2VydCBkZXZlbG9w +bWVudCBjZXJ0aWZpY2F0ZTEVMBMGA1UECwwMc2VleWFyaEBib25rMIIBIjANBgkq +hkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA7PK6fwn8Kp71cPmcCxTET2CaujAx5PrS +c50S3lrAUIM/SV/WWHLHWyBksRqU3XV63lbV6V+3/WW0WSfc0mnWktAoFbcaXsjm +B2Bx21rGXDmtC0qascTPhfhGRJpjpJME5fyFuf4Z68StxbXywO9vQpLA5rpdtOCo +wioqwdKp6l/WbhEubnE3mQrxlDyVEcHD6pbAa9nLJhv5ql8c4VrqnLcNBHcJB85Q +drWMV0n2n4tlU0QkW/N8/U0Cx4+TKinvu2Z2SalurjWzN9piZxjRmRLkGMQFuMNZ +qcEXbAJKeyDw23WW0ghFrlnV7N2M7HVxm62Ax19rqRX/njUIg+dkUQIDAQABo14w +XDAOBgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAwwCgYIKwYBBQUHAwEwHwYDVR0jBBgw +FoAUDXvFWPLSkHpFztJmDgvOBMzzCqkwFAYDVR0RBA0wC4IJbG9jYWxob3N0MA0G +CSqGSIb3DQEBCwUAA4IBgQAlcPyqM4cgqbXb7EOI4QfS8ZzxchymouZNGTorRnN/ +KlFx8Pe/yYpTkrLAf4NaO4ZsuPPYKIoj3xo/1wGXRhFyLhpjRLLgdrtZfC+yIeEW +x1+sYn5czjUIqoiqTmaRk+GdlJ1UzFnzyRFJKP9siv1ec/ayCHncMfzL9hUebc83 +4CinCNNl/FwxMYdSPYAI59oX0cKcc4OEMTZP6KP6k8JytYGUPTWYJpMY3/mV9jDD +hKEkX90jqWNiiLVY8upyBrZuT7ooiZRP1a1ZwZQvkKjt/dgOzJv9eeQtnz3o7kRK +V5MNTQL9xhIUtbGN84RA3eMehveY48omvBoMgDMMdD7mxlZNlyBC1/bl4ML5uDGc +j7sxdIQCJ8CLevPElxPzHG/E66F1+YcM9358FCDwzhySVUfN48zzZ27E822wAT2s +qKmIATbRdm/98gXR9wrzALeIXHr3a/95xdJ0MMoSuusLdCHHMf6eOT1DkMxuBUwr +1s40HhPGPs1mycsKDdO5LPY= +-----END CERTIFICATE----- diff --git a/tests/data/mkcert_rootCA.pem b/tests/data/mkcert_rootCA.pem new file mode 100644 index 0000000000000..9a843c68afabc --- /dev/null +++ b/tests/data/mkcert_rootCA.pem @@ -0,0 +1,26 @@ +-----BEGIN CERTIFICATE----- +MIIEejCCAuKgAwIBAgIRAP4t4j/o9cM9Qxwhptm1xL8wDQYJKoZIhvcNAQELBQAw +VTEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMRUwEwYDVQQLDAxzZWV5 +YXJoQGJvbmsxHDAaBgNVBAMME21rY2VydCBzZWV5YXJoQGJvbmswHhcNMjExMjE1 +MTc0MTE4WhcNMzExMjE1MTc0MTE4WjBVMR4wHAYDVQQKExVta2NlcnQgZGV2ZWxv +cG1lbnQgQ0ExFTATBgNVBAsMDHNlZXlhcmhAYm9uazEcMBoGA1UEAwwTbWtjZXJ0 +IHNlZXlhcmhAYm9uazCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBANGI +laqatUh78PyCzGGMS32/MRDfLhLkvBatBTMCTqn3zD02OINkVAIyIwnxedmFIqh4 +iWFbqQMjPx9xBmbGwNq17O+zR62QcKaiVSwXzrGs5wpPfoYPSjQDNnM4cAcGdlEq +fa+3jDUka+s82QITAzrfpH2dvEPMEBBw+6rbSbj2kopN70YqP7Ad8FTts6XXXpVy +CaNXm4XZKSjAAJ1iQRpxmmAaw9pbdudxtyfYtDzwkRyugtNX5liLDXCS5U+MzaTC +SpwnKpTOHuw025z490lBDRulVeyl8kjKDcMgZXMsVeytUBMBl9iSuln09VZWRYMw +H9iX+iGhr2FCmOc18GKpB9YO9a7rZ+C/06qQfNetYNDew9PjXwCxI9es79HQMxa0 +MwfAGyce3ByRIb+dX/Kq41qhF9DOv0QFhw3CAV6VJM1ajKI9VniIxOAfqZw54FQB +FQYycL2+FLyO+3Nt+Smd2VFITsgsCBAPVLxEpJvvBBGqiLphjKfJl7vb4I/xFQID +AQABo0UwQzAOBgNVHQ8BAf8EBAMCAgQwEgYDVR0TAQH/BAgwBgEB/wIBADAdBgNV +HQ4EFgQUDXvFWPLSkHpFztJmDgvOBMzzCqkwDQYJKoZIhvcNAQELBQADggGBAGm9 +0hcYnQzDd6hzfqe111QpxfkG93omrDcrRVHayZRZb7VBYAGv9ybQ2NtcR60SNEs2 +9m6FUjgjnDAbqhwt142+BNiwPSpmOMMHLUWYiP6B8rhxW5Z66Z5wigVlrDs+ZpDH +1/Pbz6AWZA4+2rU/Rt2lrgNLs4Rcbb9g0FtnRVj5R1BX5oTbFbmKYyZsleQjSdct +8tFzyUxyDsl4q7473mjON5Br2fJq8Ep3ihgJlA6tT6WecvHglVcJgwE5mRY1Xsim +oHX9HSdSpM/x+KTUg4G6y7KjabHhYAELLTZ/hSBEb25sWZ4PvBBKuWWyH4CZY6OH +EuRCEbIxjHl/EM57S5yyIvMpi8mOJx9XMktaYH/0tE62RAJ6se7GQCvv8mI7aFLk +iYCJxzrrub47XkH1bzQFwmPpit3DsnKMRtDK4CY0RE0KaIFF2HTgI3vXEZIBA6Uh +yX4SSCv6QRlthWP26cY/r6q1i0e/evvmJW290FeLfvp0kOlCEU/iaJWaP/ZuOw== +-----END CERTIFICATE----- diff --git a/tests/data/nats-bad.creds b/tests/data/nats-bad.creds new file mode 100644 index 0000000000000..866e6390ecf3d --- /dev/null +++ b/tests/data/nats-bad.creds @@ -0,0 +1,13 @@ +-----BEGIN NATS USER JWT----- +eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJGSzdKSVE1Q0M3NDJTRllGQVlZT1JPNUNRQ1JSTDVYRFpJRUM0RkwyRDJDUkVPV1lJUk5RIiwiaWF0IjoxNjQxMjYxMTIwLCJpc3MiOiJBRFUySFdJQlg0SEozV0ZBSVFGQ0g1QUkzS1c3TjU1VEJSTE9FTk0yU0RWUFZCRUxDM1hOU0g0UiIsIm5hbWUiOiJUQSIsInN1YiI6IlVDVk1CQ0dGTlNJQzJFTjVUQjdSWVdGQ0ZUUFVHU0YyMlNEUDJRRVFLR1RQVkJVVjNVUVRMNUhKIiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjJ9fQ.nJicJeHD7uIrl8ljgtD7eO5_b9xkTl4eUm66SJKv_ommNKg2YLzJ1OHM0qL6p3ircaUMezoEtKLfeaeBpGcMBQ +------END NATS USER JWT------ + +************************* IMPORTANT ************************* +NKEY Seed printed below can be used to sign and prove identity. +NKEYs are sensitive and should be treated as secrets. + +-----BEGIN USER NKEY SEED----- +SUABAK66UDZGLRIKZJN7CS2YZZ5JYJYJDX6SCV64X7T2IYEMU7HWWC3TBM +------END USER NKEY SEED------ + +************************************************************* diff --git a/tests/data/nats-jwt.conf b/tests/data/nats-jwt.conf new file mode 100644 index 0000000000000..b9991b288a133 --- /dev/null +++ b/tests/data/nats-jwt.conf @@ -0,0 +1,15 @@ +// Operator "memory" +operator: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiI1RFVZWUUzVFlQQU1WVU9MRU9TSUJCRVlKQzdDNDRJT1EzQ0Y3NzZNTkNVTU9BVENCV0pRIiwiaWF0IjoxNjQxMjYxMDYxLCJpc3MiOiJPQlpZWVBDU04yVUw3R01MRU9GWFVLU1JQRjc0SlNKSzJOUUVJRkpIR1BNQVo0T1BQMzZXWlVZNiIsIm5hbWUiOiJtZW1vcnkiLCJzdWIiOiJPQlpZWVBDU04yVUw3R01MRU9GWFVLU1JQRjc0SlNKSzJOUUVJRkpIR1BNQVo0T1BQMzZXWlVZNiIsIm5hdHMiOnsidHlwZSI6Im9wZXJhdG9yIiwidmVyc2lvbiI6Mn19.Su5lbisERxV0Wth8tAn57Nqr20gaATqRemo-9_gufGbuPG4QG9sLX60xTwOqDAayZWMuMDcXgJMzDxovrFciCQ + +resolver: MEMORY + +resolver_preload: { + // Account "A" + ADU2HWIBX4HJ3WFAIQFCH5AI3KW7N55TBRLOENM2SDVPVBELC3XNSH4R: eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJFR0lSUU9YS0lSVDRLRkdMSVNPWUxRR0tONkdPT0FUN0ZTVUdVQUM0VTNNN1VEUVdSNktBIiwiaWF0IjoxNjQxMjYxMTAwLCJpc3MiOiJPQlpZWVBDU04yVUw3R01MRU9GWFVLU1JQRjc0SlNKSzJOUUVJRkpIR1BNQVo0T1BQMzZXWlVZNiIsIm5hbWUiOiJBIiwic3ViIjoiQURVMkhXSUJYNEhKM1dGQUlRRkNINUFJM0tXN041NVRCUkxPRU5NMlNEVlBWQkVMQzNYTlNINFIiLCJuYXRzIjp7ImxpbWl0cyI6eyJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJpbXBvcnRzIjotMSwiZXhwb3J0cyI6LTEsIndpbGRjYXJkcyI6dHJ1ZSwiY29ubiI6LTEsImxlYWYiOi0xfSwiZGVmYXVsdF9wZXJtaXNzaW9ucyI6eyJwdWIiOnt9LCJzdWIiOnt9fSwidHlwZSI6ImFjY291bnQiLCJ2ZXJzaW9uIjoyfX0.PrX_C9-Txq3N8Ea23fLjocK0V_WTac1BTIF58-qNl98YK9Ga36ussyeT7dljC1zUHI-mSEcIVoWgCTDlT-vAAg + +} + +tls: { + cert_file: "/usr/share/nats/config/localhost-mkcert.pem" + key_file: "/usr/share/nats/config/localhost-mkcert-key.pem" +} diff --git a/tests/data/nats-nkey.conf b/tests/data/nats-nkey.conf new file mode 100644 index 0000000000000..e3985fc3692de --- /dev/null +++ b/tests/data/nats-nkey.conf @@ -0,0 +1,5 @@ +authorization: { + users: [ + { nkey: UD345ZYSUJQD7PNCTWQPINYSO3VH4JBSADBSYUZOBT666DRASFRAWAWT } + ] +} diff --git a/tests/data/nats-tls-client-cert.conf b/tests/data/nats-tls-client-cert.conf new file mode 100644 index 0000000000000..a0eb08175de93 --- /dev/null +++ b/tests/data/nats-tls-client-cert.conf @@ -0,0 +1,12 @@ +tls: { + cert_file: "/usr/share/nats/config/localhost-mkcert.pem" + key_file: "/usr/share/nats/config/localhost-mkcert-key.pem" + ca_file: "/usr/share/nats/config/mkcert_rootCA.pem" + verify_and_map: true +} + +authorization { + users = [ + {user: "email@localhost"} + ] +} diff --git a/tests/data/nats-tls.conf b/tests/data/nats-tls.conf new file mode 100644 index 0000000000000..025cad1159751 --- /dev/null +++ b/tests/data/nats-tls.conf @@ -0,0 +1,4 @@ +tls: { + cert_file: "/usr/share/nats/config/localhost-mkcert.pem" + key_file: "/usr/share/nats/config/localhost-mkcert-key.pem" +} diff --git a/tests/data/nats.creds b/tests/data/nats.creds new file mode 100644 index 0000000000000..52833c5bb5328 --- /dev/null +++ b/tests/data/nats.creds @@ -0,0 +1,13 @@ +-----BEGIN NATS USER JWT----- +eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJGSzdKSVE1Q0M3NDJTRllGQVlZT1JPNUNRQ1JSTDVYRFpJRUM0RkwyRDJDUkVPV1lJUk5RIiwiaWF0IjoxNjQxMjYxMTIwLCJpc3MiOiJBRFUySFdJQlg0SEozV0ZBSVFGQ0g1QUkzS1c3TjU1VEJSTE9FTk0yU0RWUFZCRUxDM1hOU0g0UiIsIm5hbWUiOiJUQSIsInN1YiI6IlVDVk1CQ0dGTlNJQzJFTjVUQjdSWVdGQ0ZUUFVHU0YyMlNEUDJRRVFLR1RQVkJVVjNVUVRMNUhKIiwibmF0cyI6eyJwdWIiOnt9LCJzdWIiOnt9LCJzdWJzIjotMSwiZGF0YSI6LTEsInBheWxvYWQiOi0xLCJ0eXBlIjoidXNlciIsInZlcnNpb24iOjJ9fQ.nJicJeHD7uIrl8ljgtD7eO5_b9xkTl4eUm66SJKv_ommNKg2YLzJ1OHM0qL6p3ircaUMezoEtKLfeaeBpGcMBQ +------END NATS USER JWT------ + +************************* IMPORTANT ************************* +NKEY Seed printed below can be used to sign and prove identity. +NKEYs are sensitive and should be treated as secrets. + +-----BEGIN USER NKEY SEED----- +SUABJK66UDZGLRIKZJN7CS2YZZ5JYJYJDX6SCV64X7T2IYEMU7HWWC3TBM +------END USER NKEY SEED------ + +************************************************************* diff --git a/tests/data/nats_client_cert.pem b/tests/data/nats_client_cert.pem new file mode 100644 index 0000000000000..f0d128b79d0d8 --- /dev/null +++ b/tests/data/nats_client_cert.pem @@ -0,0 +1,25 @@ +-----BEGIN CERTIFICATE----- +MIIENzCCAp+gAwIBAgIRAMTyzZsmTGFreOt16/vg0oEwDQYJKoZIhvcNAQELBQAw +VTEeMBwGA1UEChMVbWtjZXJ0IGRldmVsb3BtZW50IENBMRUwEwYDVQQLDAxzZWV5 +YXJoQGJvbmsxHDAaBgNVBAMME21rY2VydCBzZWV5YXJoQGJvbmswHhcNMjIwMTA0 +MDA1NjQwWhcNMjQwNDAzMjM1NjQwWjBAMScwJQYDVQQKEx5ta2NlcnQgZGV2ZWxv +cG1lbnQgY2VydGlmaWNhdGUxFTATBgNVBAsMDHNlZXlhcmhAYm9uazCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAMje+Jzdzf5j8yaMXFrxq9ouP6Wp/wDC +0UsMdfIe8HpvoPrQwM9FKuWIjPqBxRUvz+O0hSpFgmkhWq6aR2lg0xN8FBMDRbbs +fdvlMwaLivAI6tunn7ndhN7SETdnB5Rx7VDHzanKzb7NgXa50uI6v3PSg/bBrCgo +WASm65uiCQPt9agUuyS4TSdys8kVdJ0g+LBPDo3i3diNU+p0qtwij+P1jnhUWHcU +vqvTFAPAgmLeGmACK7eDbMb0Lg1hqYwK5jXwUC8BYe6EPHPRn7eqcFKS/8lB9/kw +rNdeuBvnFF3Mii3lZ8A+IlDnPgXlXs8zGLNxY6WiYY/rxE5pgfIBJBECAwEAAaOB +ljCBkzAOBgNVHQ8BAf8EBAMCBaAwJwYDVR0lBCAwHgYIKwYBBQUHAwIGCCsGAQUF +BwMBBggrBgEFBQcDBDAfBgNVHSMEGDAWgBQNe8VY8tKQekXO0mYOC84EzPMKqTA3 +BgNVHREEMDAugglsb2NhbGhvc3SBD2VtYWlsQGxvY2FsaG9zdIcQAAAAAAAAAAAA +AAAAAAAAATANBgkqhkiG9w0BAQsFAAOCAYEAzZGLX+5jKJCU9bWKfT8X2hvd3J4U +0HkdqKKGvSXpi4aJ1hjzCMOMi104HFZygcI8ACdw9GacWzbLHFy5Cl/ITEIBI/od +Gn4f7INJjSdhHfy/9x/kuy24SW9avwerfDLqF9PYoh1fBWqLJE8ssxGPYyzSBju4 +/BcuVYaQlA8iTTYpc/CVfEg+xrI/ijmYhD4Xs1w7DACR7P9Bdytw7q3r6D5xRoGb +7AFvZeAQYXyegs4Dt8HnP7G99nZNT7/f4rKJ0cW0L8wg8+QTgnEWsBlfwlqBpgcP +887ejylWX0NJkyQ5CX1rixZq+cg386YUeWY/lUyVmywk5kqjbHX79w4kqFZAfSu/ +jGgNzQQ565V+ApH7qFTqQRuxoOIOHD8XH6rcAYytC6QK9Do2m3ftVPmIPuff+Fe4 +O7/FlzNPVhjjHNPEZGMOIJs/AYYBbZJX+FVueOBq3NUAieZhT8IRSKW7rQpgnDSm +Cr81t/NkmplFZkTRQmbkFOx4vh9kfOBzLSRN +-----END CERTIFICATE----- diff --git a/tests/data/nats_client_key.pem b/tests/data/nats_client_key.pem new file mode 100644 index 0000000000000..219aa2123d648 --- /dev/null +++ b/tests/data/nats_client_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDI3vic3c3+Y/Mm +jFxa8avaLj+lqf8AwtFLDHXyHvB6b6D60MDPRSrliIz6gcUVL8/jtIUqRYJpIVqu +mkdpYNMTfBQTA0W27H3b5TMGi4rwCOrbp5+53YTe0hE3ZweUce1Qx82pys2+zYF2 +udLiOr9z0oP2wawoKFgEpuubogkD7fWoFLskuE0ncrPJFXSdIPiwTw6N4t3YjVPq +dKrcIo/j9Y54VFh3FL6r0xQDwIJi3hpgAiu3g2zG9C4NYamMCuY18FAvAWHuhDxz +0Z+3qnBSkv/JQff5MKzXXrgb5xRdzIot5WfAPiJQ5z4F5V7PMxizcWOlomGP68RO +aYHyASQRAgMBAAECggEBAL/Wot09wEQaNNg16OY6NIRU8xhGeHHDnZrtNZ+dCNIF +1lp0EBBj4IoClZGRaiXH2eWAPEWYyOmanz5tvZHYlP3g3FMkJuqChdkC1m6Dypdg +DNznA9bAxur++M5OqaCOBbXGfJaSR1jYb7Jx2gORU9NL1BhP8xkPHrCishWa9L04 +TnlNaHlvcFT7lHzRE2Ou3bQbaONU9HB46xCd9hBLS0U+tmYqc/o8CbI/8sCQgDCJ +mzwEFUzvfC8C6V/NLGPGyKE25yDhsdRv/KVUDyxQ2v30WPhQqykMBaizLyymlTQH +Vl4Xkux6CK4A0hmNd6qOzu40tkm3/i6KMvVMcUesHpkCgYEA7QNT2GNOig27FH7r +6+ZaOeuMx6CuVa6B6xaFrG/iihqPmxQY01r09QzyfliBY5mWtpIQZQRlgSj0Gf2u +HYmOzqlLUkICwUw0JxWab7bUtkHKdVihlNuKsEoocC33Y6O57m1suyWYy8ESTyy6 +XPZZRtQrBF9DC74216q/cXxsFOMCgYEA2PZxDk/OfHI4LTLTa4YaXaxvPmZWqzw1 +EKe5NMNUxxEW+viTyslCK1k5YpCjrTdLpP6DYPMuX9kQsNlElvFZvdhYnrV43JHp +iCOqtveZarjn7KxqzxJDzfyhoWneHJSfFfEvUWtBAMR9aZMbLoLPMh/qnNRMTvZD +OKsYKu4WaXsCgYAgzwUcPyLwGVZ8T9V4ojp7KhAi0itP0QhK8qMua46Ab4zResa0 ++zJNea1XsAi5MNmcDDwYv7MNXg3sSyF/FTuff+/dQyX+e/38KxFlxXlMgW71L3+u +i3V3UcynOrW0JUXnq14RyfKMwf022IoztCZHl7ccs9MkHQCaehhmLUMI4QKBgQCv +8sukq5vs2EQug3VpUYu4Q8rDdAL9QwpVw28zFuV9uqJETaqtXaHHvcgH4vyZ0KoL +2Et7KDuHh2U9xjUVpAz/JKBZtRma5snYmfQsBYfpQKL1EK5d15YdSq9ZU53I185b +4QhaovxIPxDluWLlyTd2kuOU46Yusw+4uSgUSJjsWQKBgQCmA564cLKdfoQVrPay +O/D0asC1LrvNo8g0BW1LwUBN8VB0UsFt3kfO19nwKnXGC+hFSf6tNQ3jwCbd6Hnn +QwyF9+vro8UHrD2KS0TilY7u1optN8ctJsiV1nqgv0Gw4XAgHLFvhHa23tz8GK6O +2izfkwnfzA0N/gHl66kSfkVj0Q== +-----END PRIVATE KEY----- diff --git a/website/cue/reference/components/nats.cue b/website/cue/reference/components/nats.cue index 127cda1a8387f..af494f7bc53b3 100644 --- a/website/cue/reference/components/nats.cue +++ b/website/cue/reference/components/nats.cue @@ -65,6 +65,109 @@ components: _nats: { examples: ["foo", "API Name Option Example"] } } + auth: { + common: false + description: "Configuration for how Vector should authenticate to NATS." + required: false + type: object: options: { + strategy: { + common: false + description: "The strategy used to authenticate with the NATS server. See https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro. For TLS Client Certiificate Auth, use the TLS configuration." + required: false + type: string: { + default: "user_password" + enum: { + user_password: "Username and password authentication: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/username_password" + token: "Token authentication: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/tokens" + credentials_file: "Credentials file authentication: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/jwt" + nkey: "Nkey authentication: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/nkey_auth" + } + } + } + user_password: { + common: false + description: "User password auth options. Required if auth.strategy=`user_password`." + required: false + type: object: options: { + username: { + common: false + description: "username" + required: false + type: string: { + default: "" + examples: ["nats-user"] + } + } + password: { + common: false + description: "password" + required: false + type: string: { + default: "" + examples: ["nats-password"] + } + } + } + } + token: { + common: false + description: "Token auth options. Required if auth.strategy=`token`." + required: false + type: object: options: { + value: { + common: false + description: "token" + required: false + type: string: { + default: "" + examples: ["secret-token"] + } + } + } + } + credentials_file: { + common: false + description: "Credentials file auth options. Required if auth.strategy=`credentials_file`." + required: false + type: object: options: { + path: { + common: false + description: "Path to credentials file" + required: false + type: string: { + default: "" + examples: ["/etc/nats/nats.creds"] + } + } + } + } + nkey: { + common: false + description: "NKey auth options. Required if auth.strategy=`nkey`." + required: false + type: object: options: { + nkey: { + common: false + description: "User string representing nkey public key" + required: false + type: string: { + default: "" + examples: ["UDXU4RCSJNZOIQHZNWXHXORDPRTGNJAHAHFRGZNEEJCPQTT2M7NLCNF4"] + } + } + seed: { + common: false + description: "Seed string representing nkey private key" + required: false + type: string: { + default: "" + examples: ["SUACSSL3UAHUDXKFSNVUZRF5UHPMWZ6BFDTJ7M6USDXIEDNPPQYYYCU3VY"] + } + } + } + } + } + } } how_it_works: { diff --git a/website/cue/reference/components/sinks/nats.cue b/website/cue/reference/components/sinks/nats.cue index 1296f9b4766bd..d9ea94a9ce54d 100644 --- a/website/cue/reference/components/sinks/nats.cue +++ b/website/cue/reference/components/sinks/nats.cue @@ -25,7 +25,13 @@ components: sinks: nats: { } } request: enabled: false - tls: enabled: false + tls: { + enabled: true + can_enable: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: false + } to: { service: services.nats