Skip to content

Commit

Permalink
use Arc<Data> for LocalPublisher to make local publishers not clone data
Browse files Browse the repository at this point in the history
  • Loading branch information
N8BWert committed Sep 12, 2024
1 parent 867e26e commit 48bac71
Showing 1 changed file with 93 additions and 49 deletions.
142 changes: 93 additions & 49 deletions ncomm-publishers-and-subscribers/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,46 @@ use ncomm_core::{Publisher, Subscriber};

/// Local Subscriber that utilizes a crossbeam multi subscriber channel
/// to receive data from a local publisher
pub struct LocalSubscriber<Data: Clone> {
pub struct LocalSubscriber<Data> {
/// The receiver end of a crossbeam channel
rx: Receiver<Data>,
rx: Receiver<Arc<Option<Data>>>,
/// The current data stored in the local subscriber
data: Option<Data>,
data: Arc<Option<Data>>,
}

impl<Data: Clone> Subscriber for LocalSubscriber<Data> {
impl<Data> Subscriber for LocalSubscriber<Data> {
type Target = Option<Data>;

fn get(&mut self) -> &Self::Target {
if let Some(data) = self.rx.try_iter().last() {
self.data = Some(data);
self.data = data;
}

&self.data
self.data.as_ref()
}
}

/// Local Subscriber that stores incoming data into a buffer for processing all at once
pub struct LocalBufferedSubscriber<Data: Clone> {
pub struct LocalBufferedSubscriber<Data> {
/// The receiver end of a crossbeam channel
rx: Receiver<Data>,
rx: Receiver<Arc<Option<Data>>>,
/// The buffer of data stored in the subscriber
buffer: Vec<Data>,
buffer: Vec<Arc<Option<Data>>>,
}

impl<Data: Clone> LocalBufferedSubscriber<Data> {
impl<Data> LocalBufferedSubscriber<Data> {
/// Clear the data buffer
pub fn clear(&mut self) {
self.buffer.clear();
}
}

impl<Data: Clone> Subscriber for LocalBufferedSubscriber<Data> {
type Target = Vec<Data>;
impl<Data> Subscriber for LocalBufferedSubscriber<Data> {
type Target = Vec<Arc<Option<Data>>>;

fn get(&mut self) -> &Self::Target {
for data in self.rx.try_iter() {
self.buffer.push(data);
self.buffer.push(data)
}

&self.buffer
Expand All @@ -67,24 +67,24 @@ impl<Data: Clone> Subscriber for LocalBufferedSubscriber<Data> {

/// Local subscriber where data has a specific time-to-live and will decay
/// after the lifetime has passed
pub struct LocalTTLSubscriber<Data: Clone> {
pub struct LocalTTLSubscriber<Data> {
/// The receiver end of a crossbeam channel
rx: Receiver<Data>,
rx: Receiver<Arc<Option<Data>>>,
/// The current data stored in the local subscriber
data: Option<(Data, Instant)>,
data: Option<(Arc<Option<Data>>, Instant)>,
/// The time-to-live of a piece of data
ttl: Duration,
}

impl<Data: Clone> Subscriber for LocalTTLSubscriber<Data> {
type Target = Option<(Data, Instant)>;
impl<Data> Subscriber for LocalTTLSubscriber<Data> {
type Target = Option<(Arc<Option<Data>>, Instant)>;

fn get(&mut self) -> &Self::Target {
if let Some(data) = self.rx.try_iter().last() {
self.data = Some((data, Instant::now()));
}

if self.data.is_some()
if self.data.as_ref().is_some()
&& Instant::now().duration_since(self.data.as_ref().unwrap().1) > self.ttl
{
self.data = None;
Expand All @@ -96,19 +96,19 @@ impl<Data: Clone> Subscriber for LocalTTLSubscriber<Data> {

/// Local subscriber that maps incoming data to into a location in a hashmap
/// allowing the subscriber to maintain a number of pieces of data at once.
pub struct LocalMappedSubscriber<Data: Clone, K: Eq + Hash, F: Fn(&Data) -> K> {
pub struct LocalMappedSubscriber<Data, K: Eq + Hash, F: Fn(&Option<Data>) -> K> {
/// The receiver end of a crossbeam channel
rx: Receiver<Data>,
rx: Receiver<Arc<Option<Data>>>,
/// The current data stored in the local hashmap
data: HashMap<K, Data>,
data: HashMap<K, Arc<Option<Data>>>,
/// The hash function used to map incoming data into the hashmap
hash: F,
}

impl<Data: Clone, K: Eq + Hash, F: Fn(&Data) -> K> Subscriber
impl<Data, K: Eq + Hash, F: Fn(&Option<Data>) -> K> Subscriber
for LocalMappedSubscriber<Data, K, F>
{
type Target = HashMap<K, Data>;
type Target = HashMap<K, Arc<Option<Data>>>;

fn get(&mut self) -> &Self::Target {
for data in self.rx.try_iter() {
Expand All @@ -123,21 +123,21 @@ impl<Data: Clone, K: Eq + Hash, F: Fn(&Data) -> K> Subscriber
/// Local subscriber that maps incoming data to into a location in a hashmap
/// while specifying a time-to-live for pieces of data contained in the
/// hashmap
pub struct LocalMappedTTLSubscriber<Data: Clone, K: Eq + Hash, F: Fn(&Data) -> K> {
pub struct LocalMappedTTLSubscriber<Data, K: Eq + Hash, F: Fn(&Option<Data>) -> K> {
/// The receiver end of a crossbeam channel
rx: Receiver<Data>,
rx: Receiver<Arc<Option<Data>>>,
/// The current data stored in a hashmap
data: HashMap<K, (Data, Instant)>,
data: HashMap<K, (Arc<Option<Data>>, Instant)>,
/// The hash function used to map incoming data into the hashmap
hash: F,
/// The time-to-live of pieces of data in the hashmap
ttl: Duration,
}

impl<Data: Clone, K: Eq + Hash, F: Fn(&Data) -> K> Subscriber
impl<Data, K: Eq + Hash, F: Fn(&Option<Data>) -> K> Subscriber
for LocalMappedTTLSubscriber<Data, K, F>
{
type Target = HashMap<K, (Data, Instant)>;
type Target = HashMap<K, (Arc<Option<Data>>, Instant)>;

fn get(&mut self) -> &Self::Target {
for data in self.rx.try_iter() {
Expand All @@ -154,15 +154,17 @@ impl<Data: Clone, K: Eq + Hash, F: Fn(&Data) -> K> Subscriber

/// Local Publisher that utilizes a crossbeam multi publisher multi
/// subscriber to send data
pub struct LocalPublisher<Data: Clone> {
pub struct LocalPublisher<Data> {
/// The transmit pipe that is used to send data to the subscriber
txs: Arc<Mutex<Vec<Sender<Data>>>>,
#[allow(clippy::type_complexity)]
txs: Arc<Mutex<Vec<Sender<Arc<Option<Data>>>>>>,
/// The most recent data sent over the tx pipes so new subscribers will
/// automatically have the most recent data
data: Arc<Mutex<Option<(Data, Instant)>>>,
#[allow(clippy::type_complexity)]
data: Arc<Mutex<Option<(Arc<Option<Data>>, Instant)>>>,
}

impl<Data: Clone> Default for LocalPublisher<Data> {
impl<Data> Default for LocalPublisher<Data> {
fn default() -> Self {
Self {
txs: Arc::new(Mutex::new(Vec::new())),
Expand All @@ -171,7 +173,7 @@ impl<Data: Clone> Default for LocalPublisher<Data> {
}
}

impl<Data: Clone> LocalPublisher<Data> {
impl<Data> LocalPublisher<Data> {
/// Create a new local publisher
pub fn new() -> Self {
Self::default()
Expand All @@ -190,7 +192,14 @@ impl<Data: Clone> LocalPublisher<Data> {
.as_ref()
.map(|data| data.0.clone());

LocalSubscriber { rx, data }
if let Some(data) = data {
LocalSubscriber { rx, data }
} else {
LocalSubscriber {
rx,
data: Arc::new(None),
}
}
}

/// Create a local buffered subscriber
Expand Down Expand Up @@ -235,7 +244,7 @@ impl<Data: Clone> LocalPublisher<Data> {
///
/// Note: This subscriber will only have access to them most recent piece of data so
/// do not expect that data sent a long time ago will be present in this subscriber's data
pub fn subscribe_mapped<K: Eq + Hash, F: Fn(&Data) -> K>(
pub fn subscribe_mapped<K: Eq + Hash, F: Fn(&Option<Data>) -> K>(
&mut self,
map: F,
) -> LocalMappedSubscriber<Data, K, F> {
Expand All @@ -262,7 +271,7 @@ impl<Data: Clone> LocalPublisher<Data> {
///
/// Note: This subscriber will only have access to the most recent piece of data so
/// do not expect that data sent a long time ago will be present in this subscriber's data
pub fn subscribe_mapped_ttl<K: Eq + Hash, F: Fn(&Data) -> K>(
pub fn subscribe_mapped_ttl<K: Eq + Hash, F: Fn(&Option<Data>) -> K>(
&mut self,
map: F,
ttl: Duration,
Expand All @@ -289,7 +298,7 @@ impl<Data: Clone> LocalPublisher<Data> {
}
}

impl<Data: Clone> Clone for LocalPublisher<Data> {
impl<Data> Clone for LocalPublisher<Data> {
fn clone(&self) -> Self {
Self {
txs: self.txs.clone(),
Expand All @@ -298,11 +307,12 @@ impl<Data: Clone> Clone for LocalPublisher<Data> {
}
}

impl<Data: Clone> Publisher for LocalPublisher<Data> {
impl<Data> Publisher for LocalPublisher<Data> {
type Data = Data;
type Error = SendError<Data>;
type Error = SendError<Arc<Option<Data>>>;

fn publish(&mut self, data: Self::Data) -> Result<(), Self::Error> {
let data = Arc::new(Some(data));
let txs = self.txs.lock().unwrap();
for tx in txs.iter() {
tx.send(data.clone())?;
Expand Down Expand Up @@ -351,7 +361,14 @@ mod tests {
publisher.publish(data.clone()).unwrap();
}

assert_eq!(*subscriber.get(), datas);
assert_eq!(
subscriber
.get()
.iter()
.map(|v| v.unwrap().clone())
.collect::<Vec<TestData>>(),
datas
);
}

#[test]
Expand All @@ -366,32 +383,59 @@ mod tests {
std::thread::sleep(Duration::from_millis(100));

assert_eq!(*short_subscriber.get(), None);
assert_eq!(long_subscriber.get().unwrap().0, data);
assert_eq!(
long_subscriber.get().as_ref().unwrap().0.clone().unwrap(),
data
);
}

#[test]
fn test_publish_mapped_subscriber() {
let mut publisher = LocalPublisher::new();
let mut subscriber = publisher.subscribe_mapped(|data: &TestData| data.num);
let mut subscriber =
publisher.subscribe_mapped(|data: &Option<TestData>| data.unwrap().num);

let data = TestData::new();
publisher.publish(data.clone()).unwrap();

assert_eq!(*subscriber.get().get(&data.num).unwrap(), data);
assert_eq!(
subscriber
.get()
.get(&data.num)
.unwrap()
.as_ref()
.unwrap()
.clone(),
data
);
}

#[test]
fn test_publish_mapped_ttl_subscriber() {
let mut publisher = LocalPublisher::new();
let mut short_subscriber =
publisher.subscribe_mapped_ttl(|data: &TestData| data.num, Duration::from_nanos(1));
let mut long_subscriber =
publisher.subscribe_mapped_ttl(|data: &TestData| data.num, Duration::from_secs(5));
let mut short_subscriber = publisher.subscribe_mapped_ttl(
|data: &Option<TestData>| data.unwrap().num,
Duration::from_nanos(1),
);
let mut long_subscriber = publisher.subscribe_mapped_ttl(
|data: &Option<TestData>| data.unwrap().num,
Duration::from_secs(5),
);

let data = TestData::new();
publisher.publish(data.clone()).unwrap();

assert_eq!(short_subscriber.get().get(&data.num), None);
assert_eq!(long_subscriber.get().get(&data.num).unwrap().0, data);
assert_eq!(
long_subscriber
.get()
.get(&data.num)
.unwrap()
.0
.as_ref()
.unwrap()
.clone(),
data
);
}
}

0 comments on commit 48bac71

Please sign in to comment.