Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consumer action #1254

Merged
merged 4 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,7 @@ max_ack_pending
footgun
KV
rebalance
update_consumer
update_consumer_on_stream
create_consumer_strict
create_consumer_strict_on_stream
111 changes: 107 additions & 4 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ use super::errors::ErrorCode;
use super::is_valid_name;
use super::kv::{Store, MAX_HISTORY};
use super::object_store::{is_valid_bucket_name, ObjectStore};
#[cfg(feature = "server_2_10")]
use super::stream::Compression;
use super::stream::{
self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
Stream,
};
#[cfg(feature = "server_2_10")]
use super::stream::{Compression, ConsumerCreateStrictError, ConsumerUpdateError};

/// A context which can perform jetstream scoped requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -880,8 +880,9 @@ impl Context {
}
}

/// Create a new `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
/// Create or update a `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
/// returns the info from the server about created [Consumer] without binding to a [Stream] first.
/// If you want a strict update or create, use [Context::create_consumer_strict_on_stream] or [Context::update_consumer_on_stream].
///
/// # Examples
///
Expand All @@ -908,6 +909,96 @@ impl Context {
&self,
config: C,
stream: S,
) -> Result<Consumer<C>, ConsumerError> {
self.create_consumer_on_stream_action(config, stream, ConsumerAction::CreateOrUpdate)
.await
}

/// Update an existing consumer.
/// This call will fail if the consumer does not exist.
/// returns the info from the server about updated [Consumer] without binding to a [Stream] first.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let consumer: consumer::PullConsumer = jetstream
/// .update_consumer_on_stream(
/// consumer::pull::Config {
/// durable_name: Some("pull".to_string()),
/// description: Some("updated pull consumer".to_string()),
/// ..Default::default()
/// },
/// "stream",
/// )
/// .await?;
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "server_2_10")]
pub async fn update_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
&self,
config: C,
stream: S,
) -> Result<Consumer<C>, ConsumerUpdateError> {
self.create_consumer_on_stream_action(config, stream, ConsumerAction::Update)
.await
.map_err(|err| err.into())
}

/// Create consumer on stream, but only if it does not exist or the existing config is exactly
/// the same.
/// This method will fail if consumer is already present with different config.
/// returns the info from the server about created [Consumer] without binding to a [Stream] first.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let consumer: consumer::PullConsumer = jetstream
/// .create_consumer_strict_on_stream(
/// consumer::pull::Config {
/// durable_name: Some("pull".to_string()),
/// ..Default::default()
/// },
/// "stream",
/// )
/// .await?;
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "server_2_10")]
pub async fn create_consumer_strict_on_stream<
C: IntoConsumerConfig + FromConsumer,
S: AsRef<str>,
>(
&self,
config: C,
stream: S,
) -> Result<Consumer<C>, ConsumerCreateStrictError> {
self.create_consumer_on_stream_action(config, stream, ConsumerAction::Create)
.await
.map_err(|err| err.into())
}

async fn create_consumer_on_stream_action<
C: IntoConsumerConfig + FromConsumer,
S: AsRef<str>,
>(
&self,
config: C,
stream: S,
action: ConsumerAction,
) -> Result<Consumer<C>, ConsumerError> {
let config = config.into_consumer_config();

Expand Down Expand Up @@ -943,7 +1034,7 @@ impl Context {
match self
.request(
subject,
&json!({"stream_name": stream.as_ref(), "config": config}),
&json!({"stream_name": stream.as_ref(), "config": config, "action": action}),
)
.await?
{
Expand Down Expand Up @@ -1638,3 +1729,15 @@ impl From<RequestError> for AccountError {
}
}
}

#[derive(Clone, Debug, Serialize)]
enum ConsumerAction {
#[serde(rename = "")]
CreateOrUpdate,
#[serde(rename = "create")]
#[cfg(feature = "server_2_10")]
Create,
#[serde(rename = "update")]
#[cfg(feature = "server_2_10")]
Update,
}
38 changes: 37 additions & 1 deletion async-nats/src/jetstream/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ErrorCode {
/// General RAFT error
pub const RAFT_GENERAL: ErrorCode = ErrorCode(10041);

/// Jetstream unable to subscribe to restore snapshot
/// Jetstream unable to subscribe to restore snapshot
pub const RESTORE_SUBSCRIBE_FAILED: ErrorCode = ErrorCode(10042);

/// Stream deletion failed
Expand Down Expand Up @@ -435,6 +435,42 @@ impl ErrorCode {

/// Duplicate source configuration detected
pub const SOURCE_DUPLICATE_DETECTED: ErrorCode = ErrorCode(10140);

/// Sourced stream name is invalid
pub const SOURCE_INVALID_STREAM_NAME: ErrorCode = ErrorCode(10141);

/// Mirrored stream name is invalid
pub const MIRROR_INVALID_STREAM_NAME: ErrorCode = ErrorCode(10142);

/// Source with multiple subject transforms cannot also have a single subject filter
pub const SOURCE_MULTIPLE_FILTERS_NOT_ALLOWED: ErrorCode = ErrorCode(10144);

/// Source subject filter is invalid
pub const SOURCE_INVALID_SUBJECT_FILTER: ErrorCode = ErrorCode(10145);

/// Source transform destination is invalid
pub const SOURCE_INVALID_TRANSFORM_DESTINATION: ErrorCode = ErrorCode(10146);

/// Source filters cannot overlap
pub const SOURCE_OVERLAPPING_SUBJECT_FILTERS: ErrorCode = ErrorCode(10147);

/// Consumer already exists
pub const CONSUMER_ALREADY_EXISTS: ErrorCode = ErrorCode(10148);

/// Consumer does not exist
pub const CONSUMER_DOES_NOT_EXIST: ErrorCode = ErrorCode(10149);

/// Mirror with multiple subject transforms cannot also have a single subject filter
pub const MIRROR_MULTIPLE_FILTERS_NOT_ALLOWED: ErrorCode = ErrorCode(10150);

/// Mirror subject filter is invalid
pub const MIRROR_INVALID_SUBJECT_FILTER: ErrorCode = ErrorCode(10151);

/// Mirror subject filters cannot overlap
pub const MIRROR_OVERLAPPING_SUBJECT_FILTERS: ErrorCode = ErrorCode(10152);

/// Consumer inactive threshold exceeds system limit
pub const CONSUMER_INACTIVE_THRESHOLD_EXCESS: ErrorCode = ErrorCode(10153);
}

/// `Error` type returned from an API response when an error occurs.
Expand Down
Loading
Loading