diff --git a/async-nats/src/jetstream/consumer/mod.rs b/async-nats/src/jetstream/consumer/mod.rs index 13a57f170..773d28d16 100644 --- a/async-nats/src/jetstream/consumer/mod.rs +++ b/async-nats/src/jetstream/consumer/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -312,6 +312,9 @@ pub struct Config { /// Maximum size of a request batch #[serde(default, skip_serializing_if = "is_default")] pub max_batch: i64, + /// Maximum size of a request max_bytes + #[serde(default, skip_serializing_if = "is_default")] + pub max_bytes: i64, /// Maximum value for request expiration #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")] pub max_expires: Duration, diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 11f2a090f..4ce1220e5 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -616,6 +616,10 @@ pub struct OrderedConfig { // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and // [stream] pub max_batch: i64, + // Maximum number of bytes that can be requested in single Pull Request. + // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and + // [stream] + pub max_bytes: i64, // Maximum expiry that can be set for a single Pull Request. // This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and // [stream] @@ -642,6 +646,7 @@ impl From for Config { max_ack_pending: 0, headers_only: config.headers_only, max_batch: config.max_batch, + max_bytes: config.max_bytes, max_expires: config.max_expires, inactive_threshold: Duration::from_secs(30), num_replicas: 1, @@ -673,6 +678,7 @@ impl FromConsumer for OrderedConfig { #[cfg(feature = "server_2_10")] metadata: config.metadata, max_batch: config.max_batch, + max_bytes: config.max_bytes, max_expires: config.max_expires, }) } @@ -702,6 +708,7 @@ impl IntoConsumerConfig for OrderedConfig { flow_control: false, idle_heartbeat: Duration::default(), max_batch: 0, + max_bytes: 0, max_expires: Duration::default(), inactive_threshold: Duration::from_secs(30), num_replicas: 1, @@ -1979,6 +1986,9 @@ pub struct Config { /// Maximum size of a request batch #[serde(default, skip_serializing_if = "is_default")] pub max_batch: i64, + /// Maximum value of request max_bytes + #[serde(default, skip_serializing_if = "is_default")] + pub max_bytes: i64, /// Maximum value for request expiration #[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")] pub max_expires: Duration, @@ -2030,6 +2040,7 @@ impl IntoConsumerConfig for Config { flow_control: false, idle_heartbeat: Duration::default(), max_batch: self.max_batch, + max_bytes: self.max_bytes, max_expires: self.max_expires, inactive_threshold: self.inactive_threshold, num_replicas: self.num_replicas, @@ -2066,6 +2077,7 @@ impl FromConsumer for Config { max_ack_pending: config.max_ack_pending, headers_only: config.headers_only, max_batch: config.max_batch, + max_bytes: config.max_bytes, max_expires: config.max_expires, inactive_threshold: config.inactive_threshold, num_replicas: config.num_replicas, diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index c3aeb90b7..f7a7723d3 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -303,6 +303,7 @@ impl IntoConsumerConfig for Config { flow_control: self.flow_control, idle_heartbeat: self.idle_heartbeat, max_batch: 0, + max_bytes: 0, max_expires: Duration::default(), inactive_threshold: Duration::default(), num_replicas: self.num_replicas, @@ -420,6 +421,7 @@ impl IntoConsumerConfig for OrderedConfig { flow_control: true, idle_heartbeat: Duration::from_secs(5), max_batch: 0, + max_bytes: 0, max_expires: Duration::default(), inactive_threshold: Duration::from_secs(30), num_replicas: 1,