Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream subject mappings #1103

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use base64::engine::general_purpose::STANDARD;
use base64::engine::Engine;
use bytes::Bytes;
use futures::{future::BoxFuture, TryFutureExt};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::json;
use time::{serde::rfc3339, OffsetDateTime};

Expand Down Expand Up @@ -1153,16 +1153,21 @@ pub enum StorageType {
/// Shows config and current state for this stream.
#[derive(Debug, Deserialize, Clone)]
pub struct Info {
/// The configuration associated with this stream
/// The configuration associated with this stream.
pub config: Config,
/// The time that this stream was created
/// The time that this stream was created.
#[serde(with = "rfc3339")]
pub created: time::OffsetDateTime,
/// Various metrics associated with this stream
/// Various metrics associated with this stream.
pub state: State,

///information about leader and replicas
/// Information about leader and replicas.
pub cluster: Option<ClusterInfo>,
/// Information about mirror config if present.
#[serde(default)]
pub mirror: Option<SourceInfo>,
/// Information about sources configs if present.
#[serde(default)]
pub sources: Vec<SourceInfo>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -1374,6 +1379,40 @@ pub struct PeerInfo {
pub lag: Option<u64>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct SourceInfo {
/// Source name.
pub name: String,
/// Number of messages this source is lagging behind.
pub lag: u64,
/// Last time the source was seen active.
#[serde(deserialize_with = "negative_duration_as_none")]
pub active: Option<std::time::Duration>,
/// Filtering for the source.
#[serde(default)]
pub filter_subject: Option<String>,
/// Source destination subject.
#[serde(default)]
pub subject_transform_dest: Option<String>,
/// List of transforms.
#[serde(default)]
pub subject_transforms: Vec<SubjectTransform>,
}

fn negative_duration_as_none<'de, D>(
deserializer: D,
) -> Result<Option<std::time::Duration>, D::Error>
where
D: Deserializer<'de>,
{
let n = i64::deserialize(deserializer)?;
if n.is_negative() {
Ok(None)
} else {
Ok(Some(std::time::Duration::from_nanos(n as u64)))
}
}

/// The response generated by trying to purge a stream.
#[derive(Debug, Deserialize, Clone, Copy)]
pub struct PurgeResponse {
Expand Down Expand Up @@ -1430,6 +1469,10 @@ pub struct Source {
skip_serializing_if = "is_default"
)]
pub subject_transform_destination: Option<String>,
/// Subject transforms for Stream.
#[cfg(feature = "server_2_10")]
#[serde(default, skip_serializing_if = "is_default")]
pub subject_transforms: Vec<SubjectTransform>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
Expand Down
59 changes: 59 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2733,6 +2733,65 @@ mod jetstream {
);
}

#[tokio::test]
#[cfg(feature = "server_2_10")]
async fn stream_subject_transforms() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let subject_transform = stream::SubjectTransform {
source: "foo".to_string(),
destination: "bar".to_string(),
};

let source = stream::Source {
name: "source".to_string(),
filter_subject: Some("stream1.foo".to_string()),
subject_transform_destination: Some("foo".to_string()),
..Default::default()
};

let sources = vec![
source.clone(),
stream::Source {
name: "multi_source".to_string(),
subject_transforms: vec![stream::SubjectTransform {
source: "stream2.foo.>".to_string(),
destination: "foo.>".to_string(),
}],
..Default::default()
},
];

let mut stream = context
.create_stream(stream::Config {
name: "filtered".to_string(),
subject_transform: Some(subject_transform.clone()),
sources: Some(sources.clone()),
..Default::default()
})
.await
.unwrap();

let info = stream.info().await.unwrap();
assert_eq!(info.config.sources, Some(sources.clone()));
assert_eq!(info.config.subject_transform, Some(subject_transform));

let mut stream = context
.create_stream(stream::Config {
name: "mirror".to_string(),
mirror: Some(source.clone()),
..Default::default()
})
.await
.unwrap();

let info = stream.info().await.unwrap();

assert_eq!(info.config.mirror, Some(source));
}

#[tokio::test]
async fn pull_by_bytes() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down
Loading