From 56bd0dd57e76c4bb0f4ad43370e44ed74f962a47 Mon Sep 17 00:00:00 2001 From: Dmitriy <49064188+whatcouldbepizza@users.noreply.github.com> Date: Wed, 24 Jul 2024 14:55:09 +0300 Subject: [PATCH] feat(new sink): Add possibility to use nats jetstream in nats sink (#20834) * use nats jetstream as an option * no need to manually flush messages * fix jetstream option annotation + prettify code * generate component docs * add more precise field description + generate docs * check nats stream existence in healthcheck * do not check stream existance * add new field to struct in tests * add changelog * add author to changelog * remove example config from changelog * flush core messages * flush after each message --- .../20834_nats_jetstream_sink.feature.md | 3 + src/sinks/nats/config.rs | 63 +++++++++++++++++++ src/sinks/nats/integration_tests.rs | 13 ++++ src/sinks/nats/mod.rs | 2 + src/sinks/nats/service.rs | 9 ++- src/sinks/nats/sink.rs | 10 +-- .../reference/components/sinks/base/nats.cue | 11 ++++ 7 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 changelog.d/20834_nats_jetstream_sink.feature.md diff --git a/changelog.d/20834_nats_jetstream_sink.feature.md b/changelog.d/20834_nats_jetstream_sink.feature.md new file mode 100644 index 0000000000000..eb8593e17219d --- /dev/null +++ b/changelog.d/20834_nats_jetstream_sink.feature.md @@ -0,0 +1,3 @@ +Add possibility to use NATS JetStream in NATS sink. Can be turned on/off via `jetstream` option (default is false). + +authors: whatcouldbepizza diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index a9f52f4ec88c0..8388648a08c48 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use futures_util::TryFutureExt; use snafu::ResultExt; use vector_lib::codecs::JsonSerializerConfig; @@ -76,6 +77,14 @@ pub struct NatsSinkConfig { #[configurable(derived)] #[serde(default)] pub(super) request: TowerRequestConfig, + + /// Send messages using [Jetstream][jetstream]. + /// + /// If set, the `subject` must belong to an existing JetStream stream. + /// + /// [jetstream]: https://docs.nats.io/nats-concepts/jetstream + #[serde(default)] + pub(super) jetstream: bool, } fn default_name() -> String { @@ -93,6 +102,7 @@ impl GenerateConfig for NatsSinkConfig { tls: None, url: "nats://127.0.0.1:4222".into(), request: Default::default(), + jetstream: Default::default(), }) .unwrap() } @@ -130,8 +140,61 @@ impl NatsSinkConfig { options.connect(&self.url).await.context(ConnectSnafu) } + + pub(super) async fn publisher(&self) -> Result { + let connection = self.connect().await?; + + if self.jetstream { + Ok(NatsPublisher::JetStream(async_nats::jetstream::new( + connection, + ))) + } else { + Ok(NatsPublisher::Core(connection)) + } + } } async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { config.connect().map_ok(|_| ()).map_err(|e| e.into()).await } + +pub enum NatsPublisher { + Core(async_nats::Client), + JetStream(async_nats::jetstream::Context), +} + +impl NatsPublisher { + pub(super) async fn publish( + &self, + subject: S, + payload: Bytes, + ) -> Result<(), NatsError> { + match self { + NatsPublisher::Core(client) => { + client + .publish(subject, payload) + .await + .map_err(|e| NatsError::PublishError { + source: Box::new(e), + })?; + client + .flush() + .map_ok(|_| ()) + .map_err(|e| NatsError::PublishError { + source: Box::new(e), + }) + .await + } + NatsPublisher::JetStream(jetstream) => { + let ack = jetstream.publish(subject, payload).await.map_err(|e| { + NatsError::PublishError { + source: Box::new(e), + } + })?; + ack.await.map(|_| ()).map_err(|e| NatsError::PublishError { + source: Box::new(e), + }) + } + } + } +} diff --git a/src/sinks/nats/integration_tests.rs b/src/sinks/nats/integration_tests.rs index cf2c620db9401..bc0c8dce0611d 100644 --- a/src/sinks/nats/integration_tests.rs +++ b/src/sinks/nats/integration_tests.rs @@ -78,6 +78,7 @@ async fn nats_no_auth() { tls: None, auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -110,6 +111,7 @@ async fn nats_userpass_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; publish_and_check(conf) @@ -139,6 +141,7 @@ async fn nats_userpass_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -170,6 +173,7 @@ async fn nats_token_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -201,6 +205,7 @@ async fn nats_token_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -233,6 +238,7 @@ async fn nats_nkey_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -265,6 +271,7 @@ async fn nats_nkey_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -298,6 +305,7 @@ async fn nats_tls_valid() { }), auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -325,6 +333,7 @@ async fn nats_tls_invalid() { tls: None, auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -360,6 +369,7 @@ async fn nats_tls_client_cert_valid() { }), auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -393,6 +403,7 @@ async fn nats_tls_client_cert_invalid() { }), auth: None, request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -430,6 +441,7 @@ async fn nats_tls_jwt_auth_valid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; @@ -467,6 +479,7 @@ async fn nats_tls_jwt_auth_invalid() { }, }), request: Default::default(), + jetstream: false, }; let r = publish_and_check(conf).await; diff --git a/src/sinks/nats/mod.rs b/src/sinks/nats/mod.rs index a1729c84d574a..affaf422531d4 100644 --- a/src/sinks/nats/mod.rs +++ b/src/sinks/nats/mod.rs @@ -27,4 +27,6 @@ enum NatsError { Connect { source: async_nats::ConnectError }, #[snafu(display("NATS Server Error: {}", source))] ServerError { source: async_nats::Error }, + #[snafu(display("NATS Publish Error: {}", source))] + PublishError { source: async_nats::Error }, } diff --git a/src/sinks/nats/service.rs b/src/sinks/nats/service.rs index 0eb2407ab5738..1aeccced639f9 100644 --- a/src/sinks/nats/service.rs +++ b/src/sinks/nats/service.rs @@ -7,11 +7,11 @@ use futures_util::TryFutureExt; use crate::sinks::prelude::*; -use super::{request_builder::NatsRequest, NatsError}; +use super::{config::NatsPublisher, request_builder::NatsRequest, NatsError}; #[derive(Clone)] pub(super) struct NatsService { - pub(super) connection: Arc, + pub(super) publisher: Arc, } pub(super) struct NatsResponse { @@ -44,13 +44,12 @@ impl Service for NatsService { } fn call(&mut self, req: NatsRequest) -> Self::Future { - let connection = Arc::clone(&self.connection); + let publisher = Arc::clone(&self.publisher); Box::pin(async move { - match connection + match publisher .publish(req.subject, req.bytes) .map_err(async_nats::Error::from) - .and_then(|_| connection.flush().map_err(Into::into)) .await { Err(error) => Err(NatsError::ServerError { source: error }), diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index f2f4524b6ecfb..a5b94d4a9c759 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -5,7 +5,7 @@ use snafu::ResultExt; use crate::sinks::prelude::*; use super::{ - config::{NatsSinkConfig, NatsTowerRequestConfigDefaults}, + config::{NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults}, request_builder::{NatsEncoder, NatsRequestBuilder}, service::{NatsResponse, NatsService}, EncodingSnafu, NatsError, @@ -20,7 +20,7 @@ pub(super) struct NatsSink { request: TowerRequestConfig, transformer: Transformer, encoder: Encoder<()>, - connection: Arc, + publisher: Arc, subject: Template, } @@ -42,7 +42,7 @@ impl NatsSink { } pub(super) async fn new(config: NatsSinkConfig) -> Result { - let connection = Arc::new(config.connect().await?); + let publisher = Arc::new(config.publisher().await?); let transformer = config.encoding.transformer(); let serializer = config.encoding.build().context(EncodingSnafu)?; let encoder = Encoder::<()>::new(serializer); @@ -51,9 +51,9 @@ impl NatsSink { Ok(NatsSink { request, - connection, transformer, encoder, + publisher, subject, }) } @@ -71,7 +71,7 @@ impl NatsSink { let service = ServiceBuilder::new() .settings(request, NatsRetryLogic) .service(NatsService { - connection: Arc::clone(&self.connection), + publisher: Arc::clone(&self.publisher), }); input diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index 91ea526c8217f..870fe8ae31250 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -383,6 +383,17 @@ base: components: sinks: nats: configuration: { } } } + jetstream: { + description: """ + Send messages using [Jetstream][jetstream]. + + If set, the `subject` must belong to an existing JetStream stream. + + [jetstream]: https://docs.nats.io/nats-concepts/jetstream + """ + required: false + type: bool: default: false + } request: { description: """ Middleware settings for outbound requests.