Skip to content

Commit

Permalink
test: move integration level test
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Jul 28, 2024
1 parent 15c569c commit d2e6caf
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 8 deletions.
5 changes: 5 additions & 0 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,8 @@ vergen = { version = "7", features = ["git", "build", "time"] }

[dev-dependencies]
tempdir = { workspace = true }

[[test]]
name = "serializer"
path = "tests/serializer.rs"
required-features = ["test"]
14 changes: 7 additions & 7 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ fn construct_publish(

// Writes the provided publish packet to [Storage], after setting its pkid to 1.
// If the write buffer is full, it is flushed/written onto disk based on config.
fn write_to_storage(
pub fn write_to_storage(
mut publish: Publish,
storage: &mut Storage,
) -> Result<Option<u64>, storage::Error> {
Expand Down Expand Up @@ -863,7 +863,7 @@ impl CtrlTx {
// - Restart with no internet but files on disk

#[cfg(test)]
mod test {
pub mod test {
use serde_json::Value;
use tokio::{spawn, time::sleep};

Expand Down Expand Up @@ -931,7 +931,7 @@ mod test {
}
}

fn default_config() -> Config {
pub fn default_config() -> Config {
Config {
broker: "localhost".to_owned(),
port: 1883,
Expand All @@ -942,7 +942,7 @@ mod test {
}
}

fn defaults(
pub fn defaults(
config: Arc<Config>,
) -> (Serializer<MockClient>, Sender<Box<dyn Package>>, Receiver<Request>) {
let (data_tx, data_rx) = bounded(1);
Expand All @@ -961,20 +961,20 @@ mod test {
Base(#[from] crate::base::bridge::stream::Error),
}

struct MockCollector {
pub struct MockCollector {
stream: Stream<Payload>,
}

impl MockCollector {
fn new(
pub fn new(
stream_name: &str,
stream_config: StreamConfig,
data_tx: Sender<Box<dyn Package>>,
) -> MockCollector {
MockCollector { stream: Stream::new(stream_name, stream_config, data_tx) }
}

async fn send(&mut self, i: u32) -> Result<(), Error> {
pub async fn send(&mut self, i: u32) -> Result<(), Error> {
let payload = Payload {
stream: Default::default(),
sequence: i,
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl PartialOrd for StreamConfig {
}
}

#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)]
#[derive(Debug, Copy, Clone, Deserialize, PartialEq, Eq, PartialOrd)]
pub struct Persistence {
#[serde(default = "default_file_size")]
pub max_file_size: usize,
Expand Down
169 changes: 169 additions & 0 deletions uplink/tests/serializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use std::{
fs::{create_dir_all, remove_dir_all},
path::PathBuf,
sync::Arc,
time::Duration,
};

use bytes::Bytes;
use rumqttc::{Publish, QoS, Request};
use tokio::spawn;

use uplink::{
base::{
bridge::Payload,
serializer::{
test::{default_config, defaults, MockCollector},
write_to_storage,
},
},
config::{Persistence, StreamConfig},
Storage,
};

#[tokio::test]
// Ensures that the data of streams are removed based on preference
async fn preferential_send_on_network() {
let mut config = default_config();
config.stream_metrics.timeout = Duration::from_secs(1000);
config.persistence_path = PathBuf::from(".tmp.serializer_test");
let persistence = Persistence { max_file_size: 1024 * 1024, max_file_count: 1 };
config.streams.extend([
(
"one".to_owned(),
StreamConfig {
topic: "topic/one".to_string(),
priority: 1,
persistence,
..Default::default()
},
),
(
"two".to_owned(),
StreamConfig {
topic: "topic/two".to_string(),
priority: 2,
persistence,
..Default::default()
},
),
(
"top".to_owned(),
StreamConfig {
topic: "topic/top".to_string(),
priority: u8::MAX,
persistence,
..Default::default()
},
),
]);

let publish = |topic: String, i: u32| Publish {
dup: false,
qos: QoS::AtMostOnce,
retain: false,
topic,
pkid: 0,
payload: {
let serialized = serde_json::to_vec(&vec![Payload {
stream: Default::default(),
sequence: i,
timestamp: 0,
payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}").unwrap(),
}])
.unwrap();

Bytes::from(serialized)
},
};
let persistence_path = |path: &PathBuf, stream_name: &str| {
let mut path = path.to_owned();
path.push(stream_name);
create_dir_all(&path).unwrap();

path
};

// write packets for one, two and top onto disk
let mut one = Storage::new("topic/one", 1024 * 1024);
one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap();
write_to_storage(publish("topic/one".to_string(), 4), &mut one).unwrap();
write_to_storage(publish("topic/one".to_string(), 5), &mut one).unwrap();
one.flush().unwrap();

let mut two = Storage::new("topic/two", 1024 * 1024);
two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap();
write_to_storage(publish("topic/two".to_string(), 3), &mut two).unwrap();
two.flush().unwrap();

let mut top = Storage::new("topic/top", 1024 * 1024);
top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap();
write_to_storage(publish("topic/top".to_string(), 1), &mut top).unwrap();
write_to_storage(publish("topic/top".to_string(), 2), &mut top).unwrap();
top.flush().unwrap();

// start serializer in the background
let config = Arc::new(config);
let (serializer, data_tx, req_rx) = defaults(config);

spawn(async { serializer.start().await.unwrap() });

let mut default = MockCollector::new(
"default",
StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() },
data_tx,
);
default.send(6).await.unwrap();
default.send(7).await.unwrap();

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/top");
assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/top");
assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/two");
assert_eq!(payload, "[{\"sequence\":3,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/one");
assert_eq!(payload, "[{\"sequence\":4,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/one");
assert_eq!(payload, "[{\"sequence\":5,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":6,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":7,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

remove_dir_all(".tmp.serializer_test").unwrap();
}

0 comments on commit d2e6caf

Please sign in to comment.