diff --git a/async-nats/src/service/mod.rs b/async-nats/src/service/mod.rs index 961a05b9a..c782fddb8 100644 --- a/async-nats/src/service/mod.rs +++ b/async-nats/src/service/mod.rs @@ -119,6 +119,8 @@ pub struct Info { pub version: String, /// All service endpoints. pub subjects: Vec, + /// Additional metadata + pub metadata: HashMap, } /// Schema of requests and responses. @@ -145,6 +147,8 @@ pub struct Config { pub schema: Option, /// Custom handler for providing the `EndpointStats.data` value. pub stats_handler: Option, + /// Additional service metadata + pub metadata: Option>, } pub struct ServiceBuilder { @@ -152,6 +156,7 @@ pub struct ServiceBuilder { description: Option, schema: Option, stats_handler: Option, + metadata: Option>, } impl ServiceBuilder { @@ -161,6 +166,7 @@ impl ServiceBuilder { description: None, schema: None, stats_handler: None, + metadata: None, } } @@ -185,6 +191,12 @@ impl ServiceBuilder { self } + /// Adds additional metadata. + pub fn metadata(mut self, metadata: HashMap) -> Self { + self.metadata = Some(metadata); + self + } + /// Stats the service with configured options. pub async fn start(self, name: S, version: S) -> Result { Service::add( @@ -195,6 +207,7 @@ impl ServiceBuilder { description: self.description, schema: self.schema, stats_handler: self.stats_handler, + metadata: self.metadata, }, ) .await @@ -240,6 +253,7 @@ pub trait ServiceExt { /// schema: None, /// description: None, /// stats_handler: None, + /// metadata: None, /// }) /// .await?; /// @@ -312,6 +326,7 @@ impl ServiceExt for crate::Client { /// schema: None, /// description: None, /// stats_handler: None, +/// metadata: None, /// }) /// .await?; /// @@ -416,6 +431,7 @@ impl Service { description: config.description.clone(), version: config.version.clone(), subjects: Vec::default(), + metadata: config.metadata.clone().unwrap_or_default(), }; let (shutdown_tx, _) = tokio::sync::broadcast::channel(1); @@ -450,6 +466,7 @@ impl Service { "name": config.name.clone(), "id": id.clone(), "version": config.version.clone(), + "metadata": config.metadata.clone(), })) .map(Bytes::from)?; async move { @@ -698,15 +715,16 @@ impl Request { /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { - /// use futures::StreamExt; /// use async_nats::service::ServiceExt; + /// use futures::StreamExt; /// # let client = async_nats::connect("demo.nats.io").await?; /// # let mut service = client.add_service(async_nats::service::Config { /// # name: "generator".to_string(), /// # version: "1.0.0".to_string(), /// # schema: None, /// # description: None, - /// stats_handler: None, + /// # stats_handler: None, + /// # metadata: None, /// # }).await?; /// /// let mut endpoint = service.endpoint("endpoint").await?; diff --git a/async-nats/tests/service_tests.rs b/async-nats/tests/service_tests.rs index 585bb9597..069398f08 100644 --- a/async-nats/tests/service_tests.rs +++ b/async-nats/tests/service_tests.rs @@ -13,9 +13,9 @@ #[cfg(feature = "service")] mod service { - use std::str::from_utf8; + use std::{collections::HashMap, str::from_utf8}; - use async_nats::service::{Info, ServiceExt, StatsResponse}; + use async_nats::service::{self, Info, ServiceExt, StatsResponse}; use futures::StreamExt; use tracing::debug; @@ -31,6 +31,7 @@ mod service { version: "1.0.0.1".to_string(), schema: None, stats_handler: None, + metadata: None, }) .await .unwrap_err() @@ -47,6 +48,7 @@ mod service { version: "beta-1.0.0".to_string(), schema: None, stats_handler: None, + metadata: None, }) .await .unwrap_err() @@ -63,6 +65,7 @@ mod service { version: "1.0.0".to_string(), schema: None, stats_handler: None, + metadata: None, }) .await .unwrap_err() @@ -79,6 +82,7 @@ mod service { version: "1.0.0".to_string(), schema: None, stats_handler: None, + metadata: None, }) .await .unwrap_err() @@ -88,6 +92,44 @@ mod service { assert_eq!(std::io::ErrorKind::InvalidInput, err_kind); } + #[tokio::test] + async fn metadata() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let metadata = HashMap::from([ + ("key".to_string(), "value".to_string()), + ("other".to_string(), "value".to_string()), + ]); + client + .add_service(async_nats::service::Config { + name: "serviceA".to_string(), + description: None, + version: "1.0.0".to_string(), + schema: None, + stats_handler: None, + metadata: Some(metadata.clone()), + }) + .await + .unwrap(); + + let reply = client.new_inbox(); + let mut responses = client.subscribe(reply.clone()).await.unwrap(); + client + .publish_with_reply("$SRV.INFO".to_string(), reply, "".into()) + .await + .unwrap(); + let response = responses + .next() + .await + .map(|message| { + serde_json::from_slice::(&message.payload) + .unwrap() + .metadata + }) + .unwrap(); + assert_eq!(metadata, response); + } + #[tokio::test] async fn ping() { let server = nats_server::run_basic_server(); @@ -99,6 +141,7 @@ mod service { version: "1.0.0".to_string(), schema: None, stats_handler: None, + metadata: None, }) .await .unwrap(); @@ -110,6 +153,7 @@ mod service { version: "2.0.0".to_string(), schema: None, stats_handler: None, + metadata: None, }) .await .unwrap(); @@ -136,6 +180,7 @@ mod service { schema: None, description: None, stats_handler: None, + metadata: None, }) .await .unwrap(); @@ -175,6 +220,7 @@ mod service { schema: None, description: None, stats_handler: None, + metadata: None, }) .await .unwrap();