Skip to content

Commit

Permalink
Add max_bytes for pull consumer config
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored and Jarema committed Jun 7, 2023
1 parent 2291293 commit bfc23c1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
5 changes: 4 additions & 1 deletion async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -312,6 +312,9 @@ pub struct Config {
/// Maximum size of a request batch
#[serde(default, skip_serializing_if = "is_default")]
pub max_batch: i64,
/// Maximum size of a request max_bytes
#[serde(default, skip_serializing_if = "is_default")]
pub max_bytes: i64,
/// Maximum value for request expiration
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub max_expires: Duration,
Expand Down
14 changes: 13 additions & 1 deletion async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -616,6 +616,10 @@ pub struct OrderedConfig {
// This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
// [stream]
pub max_batch: i64,
// Maximum number of bytes that can be requested in single Pull Request.
// This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
// [stream]
pub max_bytes: i64,
// Maximum expiry that can be set for a single Pull Request.
// This is used explicitly by [batch] and [fetch], but also, under the hood, by [messages] and
// [stream]
Expand All @@ -642,6 +646,7 @@ impl From<OrderedConfig> for Config {
max_ack_pending: 0,
headers_only: config.headers_only,
max_batch: config.max_batch,
max_bytes: config.max_bytes,
max_expires: config.max_expires,
inactive_threshold: Duration::from_secs(30),
num_replicas: 1,
Expand Down Expand Up @@ -673,6 +678,7 @@ impl FromConsumer for OrderedConfig {
#[cfg(feature = "server_2_10")]
metadata: config.metadata,
max_batch: config.max_batch,
max_bytes: config.max_bytes,
max_expires: config.max_expires,
})
}
Expand Down Expand Up @@ -702,6 +708,7 @@ impl IntoConsumerConfig for OrderedConfig {
flow_control: false,
idle_heartbeat: Duration::default(),
max_batch: 0,
max_bytes: 0,
max_expires: Duration::default(),
inactive_threshold: Duration::from_secs(30),
num_replicas: 1,
Expand Down Expand Up @@ -1979,6 +1986,9 @@ pub struct Config {
/// Maximum size of a request batch
#[serde(default, skip_serializing_if = "is_default")]
pub max_batch: i64,
/// Maximum value of request max_bytes
#[serde(default, skip_serializing_if = "is_default")]
pub max_bytes: i64,
/// Maximum value for request expiration
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub max_expires: Duration,
Expand Down Expand Up @@ -2030,6 +2040,7 @@ impl IntoConsumerConfig for Config {
flow_control: false,
idle_heartbeat: Duration::default(),
max_batch: self.max_batch,
max_bytes: self.max_bytes,
max_expires: self.max_expires,
inactive_threshold: self.inactive_threshold,
num_replicas: self.num_replicas,
Expand Down Expand Up @@ -2066,6 +2077,7 @@ impl FromConsumer for Config {
max_ack_pending: config.max_ack_pending,
headers_only: config.headers_only,
max_batch: config.max_batch,
max_bytes: config.max_bytes,
max_expires: config.max_expires,
inactive_threshold: config.inactive_threshold,
num_replicas: config.num_replicas,
Expand Down
4 changes: 3 additions & 1 deletion async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -303,6 +303,7 @@ impl IntoConsumerConfig for Config {
flow_control: self.flow_control,
idle_heartbeat: self.idle_heartbeat,
max_batch: 0,
max_bytes: 0,
max_expires: Duration::default(),
inactive_threshold: Duration::default(),
num_replicas: self.num_replicas,
Expand Down Expand Up @@ -420,6 +421,7 @@ impl IntoConsumerConfig for OrderedConfig {
flow_control: true,
idle_heartbeat: Duration::from_secs(5),
max_batch: 0,
max_bytes: 0,
max_expires: Duration::default(),
inactive_threshold: Duration::from_secs(30),
num_replicas: 1,
Expand Down

0 comments on commit bfc23c1

Please sign in to comment.