Skip to content

Commit

Permalink
fix: configurable default backlog buffer size (#353)
Browse files Browse the repository at this point in the history
feat: allow configuring default backlog buffer size
  • Loading branch information
Devdutt Shenoi authored Jul 22, 2024
1 parent 2512a83 commit 5a51f76
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
37 changes: 22 additions & 15 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ script_runner = [{ name = "run_script", timeout = 10 }]
# Location on disk for persisted streams to write backlogs into, also used to write
persistence_path = "/tmp/uplink/"

# Size of in-memory buffer for dynamically created streams. Used for backlog management.
default_buf_size = 1024 # 1KB

# MQTT client configuration
#
#
# Required Parameters
# - max_packet_size: Maximum packet size acceptable for MQTT messages
# - max_inflight: Maximum number of outgoing QoS 1/2 messages that can be
Expand Down Expand Up @@ -43,18 +46,18 @@ actions = [{ name = "install_update" }, { name = "load_file" }]
port = 6060
actions = []

# Metrics configurations are available for serializer and streams. By default
# they are disabled and no metrics will be forwarded to platform.
# Metrics configurations are available for serializer and streams. By default
# they are disabled and no metrics will be forwarded to platform.
# Parameters
# - topic(optional): One can configure to push stats to a specific topic,
# different from the default by configuring it with this field.
#
# Serializer module can publish associated metrics, to keep track of
# serializer performance.
# Serializer module can publish associated metrics, to keep track of
# serializer performance.
[serializer_metrics]
enabled = true

# Serializer module can also publishe stream metrics, to keep track of latencies
# Serializer module can also publishe stream metrics, to keep track of latencies
# and batch sizes on a per-stream basis.
# NOTE: Leaving this configuration empty like the following tells uplink to enable
# sending metrics, but with default topic string.
Expand Down Expand Up @@ -95,15 +98,15 @@ priority = 50

# Configuration details associated with uplink's persistent storage module which writes publish
# packets to disk in case of slow or crashed network, for recovery purposes.
#
#
# Required Parameters
# - max_file_size(defaults to 100MB): Maximum size upto which single persistence file can grow.
# This value alone can be changed to configure in-memory storage of packets during network recovery.
# - path(optional): Path to directory for storing backlog in files, shouldn't contain anything else.
# Please ensure the location is unique for each stream to ensure there is no clash in files.
# Please ensure the location is unique for each stream to ensure there is no clash in files.
# - max_file_count(optional, defaults to 3): Maximum number of persistence files allowed on disk.
#
# NOTE: Persitence is an optional feature that is disabled by default, i.e. if not inlcuded in the
# NOTE: Persitence is an optional feature that is disabled by default, i.e. if not inlcuded in the
# configuration, we use transient(in-memory) storage to handle network downtime only.
[streams.gps]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/gps/jsonarray"
Expand Down Expand Up @@ -134,14 +137,18 @@ priority = 255

# Configurations for uplink's built-in file downloader, including the actions that can trigger
# a download, the location in file system where uplink will download and store files from the
# provided URL in the payload of download file actions, over HTTP(S).
# provided URL in the payload of download file actions, over HTTP(S).
# If left unconfigured, downloader will be disabled.
#
# Required Parameters
# - actions: List of actions names that can trigger the downloader, with configurable timeouts
# - path: Location in fs where the files are downloaded into
[downloader]
actions = [{ name = "update_firmware" }, { name = "send_file", timeout = 10 }, { name = "send_script" }]
actions = [
{ name = "update_firmware" },
{ name = "send_file", timeout = 10 },
{ name = "send_script" },
]
path = "/var/tmp/ota-file"

# Configurations associated with the system stats module of uplink, if enabled
Expand All @@ -161,7 +168,7 @@ update_period = 30
enabled = true
port = 3333

# Configurations associated with running uplink in simulator mode, if enabled
# Configurations associated with running uplink in simulator mode, if enabled
# uplink will push simulated data for device_id 1..=num_devices and respond to any
# actions triggered on the tenant connected to, with progress 0..=100.
# NOTE: uplink bridge will hence be turned off.
Expand All @@ -173,15 +180,15 @@ port = 3333
# num_devices = 10
# gps_paths = "./paths/"

# Configuration of logger, journalctl in the case of linux. As configured, uplink will
# Configuration of logger, journalctl in the case of linux. As configured, uplink will
# push log lines in batches of size
#
#
# Required Parameters
# - tags: syslog identifiers for the processes to be logged, e.g. systemd, sshd, kernel
# - units: names of the systemd units that are to be logged, e.g. collector.service, app.service
# - min_level: priority level by number(7 is Debug, 0 is Emergency)
# - stream_size: maximum number of log line before the log stream is flushed
#
#
# [logging]
# tags = ["systemd"]
# units = ["collector"]
Expand Down
7 changes: 4 additions & 3 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use storage::Storage;
use thiserror::Error;
use tokio::{select, time::interval};

use crate::config::{default_file_size, Compression, StreamConfig};
use crate::config::{Compression, StreamConfig};
use crate::{Config, Package};
pub use metrics::{Metrics, SerializerMetrics, StreamMetrics};

Expand Down Expand Up @@ -132,6 +132,7 @@ impl MqttClient for AsyncClient {
}

struct StorageHandler {
config: Arc<Config>,
map: BTreeMap<Arc<StreamConfig>, Storage>,
// Stream being read from
read_stream: Option<Arc<StreamConfig>>,
Expand Down Expand Up @@ -163,13 +164,13 @@ impl StorageHandler {
map.insert(Arc::new(stream_config.clone()), storage);
}

Ok(Self { map, read_stream: None })
Ok(Self { config, map, read_stream: None })
}

fn select(&mut self, stream: &Arc<StreamConfig>) -> &mut Storage {
self.map
.entry(stream.to_owned())
.or_insert_with(|| Storage::new(&stream.topic, default_file_size()))
.or_insert_with(|| Storage::new(&stream.topic, self.config.default_buf_size))
}

fn next(&mut self, metrics: &mut Metrics) -> Option<(&Arc<StreamConfig>, &mut Storage)> {
Expand Down
2 changes: 2 additions & 0 deletions uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ pub struct Config {
pub streams: HashMap<String, StreamConfig>,
#[serde(default = "default_persistence_path")]
pub persistence_path: PathBuf,
#[serde(default = "default_file_size")]
pub default_buf_size: usize,
pub action_status: StreamConfig,
pub stream_metrics: StreamMetricsConfig,
pub serializer_metrics: SerializerMetricsConfig,
Expand Down

0 comments on commit 5a51f76

Please sign in to comment.