Skip to content

Commit

Permalink
test: integration to tests/, serializer with tokio::test (#354)
Browse files Browse the repository at this point in the history
* test: move to tests/

* refactor: tokio test, rm `Stream::push`

* test: move integration level test

* test: move mocking code

* test: reorg
  • Loading branch information
Devdutt Shenoi authored Jul 30, 2024
1 parent 5a51f76 commit 0bb9346
Show file tree
Hide file tree
Showing 14 changed files with 1,356 additions and 1,224 deletions.
781 changes: 1 addition & 780 deletions uplink/src/base/bridge/actions_lane.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl DataBridge {
/// Handle for apps to send action status to bridge
#[derive(Debug, Clone)]
pub struct DataTx {
pub(crate) inner: Sender<Payload>,
pub inner: Sender<Payload>,
}

impl DataTx {
Expand Down
4 changes: 2 additions & 2 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ mod actions_lane;
mod data_lane;
mod delaymap;
mod metrics;
pub(crate) mod stream;
pub mod stream;
mod streams;

use actions_lane::{ActionsBridge, Error};
pub use actions_lane::{ActionsBridge, Error};
pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx};
use data_lane::DataBridge;
pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx};
Expand Down
21 changes: 0 additions & 21 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,6 @@ where

Ok(status)
}

#[cfg(test)]
/// Push data into buffer and trigger sync channel send on max_batch_size.
/// Returns [`StreamStatus`].
pub fn push(&mut self, data: T) -> Result<StreamStatus, Error> {
if let Some(buf) = self.add(data)? {
self.tx.send(Box::new(buf))?;
return Ok(StreamStatus::Flushed);
}

let status = match self.len() {
1 => StreamStatus::Init(self.config.flush_period),
len => StreamStatus::Partial(len),
};

Ok(status)
}

// pub fn metrics(&self) -> StreamMetrics {
// self.metrics.clone()
// }
}

/// Buffer is an abstraction of a collection that serializer receives.
Expand Down
Loading

0 comments on commit 0bb9346

Please sign in to comment.