Skip to content

Commit

Permalink
feat: track stream-wise compression stats in serializer (#320)
Browse files Browse the repository at this point in the history
* feat: track compression stats in serializer

* feat: Track serialization and compression times in nanos (#321)

* feat: track serialization and compression times

* feat: calculate average times

* fix: don't panic

* fix: serialize as seconds for more precision

* log: calculate averages for times

* style: use box to satisfy clippy
  • Loading branch information
Devdutt Shenoi authored Jan 14, 2024
1 parent 8f1e61d commit fe7a9c2
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 52 deletions.
3 changes: 2 additions & 1 deletion uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ pub struct InstallerConfig {
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct StreamMetricsConfig {
pub enabled: bool,
pub topic: String,
pub bridge_topic: String,
pub serializer_topic: String,
pub blacklist: Vec<String>,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
Expand Down
37 changes: 26 additions & 11 deletions uplink/src/base/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use flume::{Receiver, RecvError};
use rumqttc::{AsyncClient, ClientError, QoS, Request};
use tokio::select;

use crate::base::bridge::StreamMetrics;
use crate::Config;

use super::bridge::StreamMetrics;
use super::mqtt::MqttMetrics;
use super::serializer::SerializerMetrics;

Expand Down Expand Up @@ -38,8 +38,10 @@ impl Monitor {

pub async fn start(&self) -> Result<(), Error> {
let stream_metrics_config = self.config.stream_metrics.clone();
let stream_metrics_topic = stream_metrics_config.topic;
let mut stream_metrics = Vec::with_capacity(10);
let bridge_stream_metrics_topic = stream_metrics_config.bridge_topic;
let mut bridge_stream_metrics = Vec::with_capacity(10);
let serializer_stream_metrics_topic = stream_metrics_config.serializer_topic;
let mut serializer_stream_metrics = Vec::with_capacity(10);

let serializer_metrics_config = self.config.serializer_metrics.clone();
let serializer_metrics_topic = serializer_metrics_config.topic;
Expand All @@ -58,18 +60,31 @@ impl Monitor {
continue;
}

stream_metrics.push(o);
let v = serde_json::to_string(&stream_metrics).unwrap();
bridge_stream_metrics.push(o);
let v = serde_json::to_string(&bridge_stream_metrics).unwrap();

stream_metrics.clear();
self.client.publish(&stream_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap();
bridge_stream_metrics.clear();
self.client.publish(&bridge_stream_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap();
}
o = self.serializer_metrics_rx.recv_async() => {
let o = o?;
serializer_metrics.push(o);
let v = serde_json::to_string(&serializer_metrics).unwrap();
serializer_metrics.clear();
self.client.publish(&serializer_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap();
match o {
SerializerMetrics::Main(o) => {
serializer_metrics.push(o);
let v = serde_json::to_string(&serializer_metrics).unwrap();
serializer_metrics.clear();
self.client.publish(&serializer_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap();
}
SerializerMetrics::Stream(o) => {
if stream_metrics_config.blacklist.contains(&o.stream) {
continue;
}
serializer_stream_metrics.push(o);
let v = serde_json::to_string(&serializer_stream_metrics).unwrap();
serializer_stream_metrics.clear();
self.client.publish(&serializer_stream_metrics_topic, QoS::AtLeastOnce, false, v).await.unwrap();
}
}
}
o = self.mqtt_metrics_rx.recv_async() => {
let o = o?;
Expand Down
87 changes: 84 additions & 3 deletions uplink/src/base/serializer/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::time::Duration;

use serde::Serialize;
use serde_with::{serde_as, DurationSeconds};

use crate::base::clock;

/// Metrics information relating to the operation of the `Serializer`, all values are reset on metrics flush
#[derive(Debug, Serialize, Clone)]
pub struct SerializerMetrics {
pub struct Metrics {
timestamp: u128,
sequence: u32,
/// One of **Catchup**, **Normal**, **Slow** or **Crash**
Expand All @@ -27,9 +30,9 @@ pub struct SerializerMetrics {
pub sent_size: usize,
}

impl SerializerMetrics {
impl Metrics {
pub fn new(mode: &str) -> Self {
SerializerMetrics {
Metrics {
timestamp: clock(),
sequence: 1,
mode: mode.to_owned(),
Expand Down Expand Up @@ -99,3 +102,81 @@ impl SerializerMetrics {
self.errors = 0;
}
}

#[serde_as]
#[derive(Debug, Serialize, Clone)]
pub struct StreamMetrics {
pub timestamp: u128,
pub sequence: u32,
pub stream: String,
pub serialized_data_size: usize,
pub compressed_data_size: usize,
#[serde(skip)]
pub serializations: u32,
#[serde_as(as = "DurationSeconds<f64>")]
pub total_serialization_time: Duration,
#[serde_as(as = "DurationSeconds<f64>")]
pub avg_serialization_time: Duration,
#[serde(skip)]
pub compressions: u32,
#[serde_as(as = "DurationSeconds<f64>")]
pub total_compression_time: Duration,
#[serde_as(as = "DurationSeconds<f64>")]
pub avg_compression_time: Duration,
}

impl StreamMetrics {
pub fn new(name: &str) -> Self {
StreamMetrics {
stream: name.to_owned(),
timestamp: clock(),
sequence: 1,
serialized_data_size: 0,
compressed_data_size: 0,
serializations: 0,
total_serialization_time: Duration::ZERO,
avg_serialization_time: Duration::ZERO,
compressions: 0,
total_compression_time: Duration::ZERO,
avg_compression_time: Duration::ZERO,
}
}

pub fn add_serialized_sizes(&mut self, data_size: usize, compressed_data_size: Option<usize>) {
self.serialized_data_size += data_size;
self.compressed_data_size += compressed_data_size.unwrap_or(data_size);
}

pub fn add_serialization_time(&mut self, serialization_time: Duration) {
self.serializations += 1;
self.total_serialization_time += serialization_time;
}

pub fn add_compression_time(&mut self, compression_time: Duration) {
self.compressions += 1;
self.total_compression_time += compression_time;
}

// Should be called before serializing metrics to ensure averages are computed.
// Averages aren't calculated for ever `add_*` call to save on costs.
pub fn prepare_snapshot(&mut self) {
self.avg_serialization_time = self
.total_serialization_time
.checked_div(self.serializations)
.unwrap_or(Duration::ZERO);
self.avg_compression_time =
self.total_compression_time.checked_div(self.compressions).unwrap_or(Duration::ZERO);
}

pub fn prepare_next(&mut self) {
self.timestamp = clock();
self.sequence += 1;
self.serialized_data_size = 0;
self.compressed_data_size = 0;
}
}

pub enum SerializerMetrics {
Main(Box<Metrics>),
Stream(Box<StreamMetrics>),
}
Loading

0 comments on commit fe7a9c2

Please sign in to comment.