From 48bac714f699fdea75900dec80131828cc01cfe5 Mon Sep 17 00:00:00 2001 From: N8BWert Date: Wed, 11 Sep 2024 20:09:27 -0400 Subject: [PATCH] use Arc for LocalPublisher to make local publishers not clone data --- ncomm-publishers-and-subscribers/src/local.rs | 142 ++++++++++++------ 1 file changed, 93 insertions(+), 49 deletions(-) diff --git a/ncomm-publishers-and-subscribers/src/local.rs b/ncomm-publishers-and-subscribers/src/local.rs index 5ca93df..b13863d 100644 --- a/ncomm-publishers-and-subscribers/src/local.rs +++ b/ncomm-publishers-and-subscribers/src/local.rs @@ -19,46 +19,46 @@ use ncomm_core::{Publisher, Subscriber}; /// Local Subscriber that utilizes a crossbeam multi subscriber channel /// to receive data from a local publisher -pub struct LocalSubscriber { +pub struct LocalSubscriber { /// The receiver end of a crossbeam channel - rx: Receiver, + rx: Receiver>>, /// The current data stored in the local subscriber - data: Option, + data: Arc>, } -impl Subscriber for LocalSubscriber { +impl Subscriber for LocalSubscriber { type Target = Option; fn get(&mut self) -> &Self::Target { if let Some(data) = self.rx.try_iter().last() { - self.data = Some(data); + self.data = data; } - &self.data + self.data.as_ref() } } /// Local Subscriber that stores incoming data into a buffer for processing all at once -pub struct LocalBufferedSubscriber { +pub struct LocalBufferedSubscriber { /// The receiver end of a crossbeam channel - rx: Receiver, + rx: Receiver>>, /// The buffer of data stored in the subscriber - buffer: Vec, + buffer: Vec>>, } -impl LocalBufferedSubscriber { +impl LocalBufferedSubscriber { /// Clear the data buffer pub fn clear(&mut self) { self.buffer.clear(); } } -impl Subscriber for LocalBufferedSubscriber { - type Target = Vec; +impl Subscriber for LocalBufferedSubscriber { + type Target = Vec>>; fn get(&mut self) -> &Self::Target { for data in self.rx.try_iter() { - self.buffer.push(data); + self.buffer.push(data) } &self.buffer @@ -67,24 +67,24 @@ impl Subscriber for LocalBufferedSubscriber { /// Local subscriber where data has a specific time-to-live and will decay /// after the lifetime has passed -pub struct LocalTTLSubscriber { +pub struct LocalTTLSubscriber { /// The receiver end of a crossbeam channel - rx: Receiver, + rx: Receiver>>, /// The current data stored in the local subscriber - data: Option<(Data, Instant)>, + data: Option<(Arc>, Instant)>, /// The time-to-live of a piece of data ttl: Duration, } -impl Subscriber for LocalTTLSubscriber { - type Target = Option<(Data, Instant)>; +impl Subscriber for LocalTTLSubscriber { + type Target = Option<(Arc>, Instant)>; fn get(&mut self) -> &Self::Target { if let Some(data) = self.rx.try_iter().last() { self.data = Some((data, Instant::now())); } - if self.data.is_some() + if self.data.as_ref().is_some() && Instant::now().duration_since(self.data.as_ref().unwrap().1) > self.ttl { self.data = None; @@ -96,19 +96,19 @@ impl Subscriber for LocalTTLSubscriber { /// Local subscriber that maps incoming data to into a location in a hashmap /// allowing the subscriber to maintain a number of pieces of data at once. -pub struct LocalMappedSubscriber K> { +pub struct LocalMappedSubscriber) -> K> { /// The receiver end of a crossbeam channel - rx: Receiver, + rx: Receiver>>, /// The current data stored in the local hashmap - data: HashMap, + data: HashMap>>, /// The hash function used to map incoming data into the hashmap hash: F, } -impl K> Subscriber +impl) -> K> Subscriber for LocalMappedSubscriber { - type Target = HashMap; + type Target = HashMap>>; fn get(&mut self) -> &Self::Target { for data in self.rx.try_iter() { @@ -123,21 +123,21 @@ impl K> Subscriber /// Local subscriber that maps incoming data to into a location in a hashmap /// while specifying a time-to-live for pieces of data contained in the /// hashmap -pub struct LocalMappedTTLSubscriber K> { +pub struct LocalMappedTTLSubscriber) -> K> { /// The receiver end of a crossbeam channel - rx: Receiver, + rx: Receiver>>, /// The current data stored in a hashmap - data: HashMap, + data: HashMap>, Instant)>, /// The hash function used to map incoming data into the hashmap hash: F, /// The time-to-live of pieces of data in the hashmap ttl: Duration, } -impl K> Subscriber +impl) -> K> Subscriber for LocalMappedTTLSubscriber { - type Target = HashMap; + type Target = HashMap>, Instant)>; fn get(&mut self) -> &Self::Target { for data in self.rx.try_iter() { @@ -154,15 +154,17 @@ impl K> Subscriber /// Local Publisher that utilizes a crossbeam multi publisher multi /// subscriber to send data -pub struct LocalPublisher { +pub struct LocalPublisher { /// The transmit pipe that is used to send data to the subscriber - txs: Arc>>>, + #[allow(clippy::type_complexity)] + txs: Arc>>>>>, /// The most recent data sent over the tx pipes so new subscribers will /// automatically have the most recent data - data: Arc>>, + #[allow(clippy::type_complexity)] + data: Arc>, Instant)>>>, } -impl Default for LocalPublisher { +impl Default for LocalPublisher { fn default() -> Self { Self { txs: Arc::new(Mutex::new(Vec::new())), @@ -171,7 +173,7 @@ impl Default for LocalPublisher { } } -impl LocalPublisher { +impl LocalPublisher { /// Create a new local publisher pub fn new() -> Self { Self::default() @@ -190,7 +192,14 @@ impl LocalPublisher { .as_ref() .map(|data| data.0.clone()); - LocalSubscriber { rx, data } + if let Some(data) = data { + LocalSubscriber { rx, data } + } else { + LocalSubscriber { + rx, + data: Arc::new(None), + } + } } /// Create a local buffered subscriber @@ -235,7 +244,7 @@ impl LocalPublisher { /// /// Note: This subscriber will only have access to them most recent piece of data so /// do not expect that data sent a long time ago will be present in this subscriber's data - pub fn subscribe_mapped K>( + pub fn subscribe_mapped) -> K>( &mut self, map: F, ) -> LocalMappedSubscriber { @@ -262,7 +271,7 @@ impl LocalPublisher { /// /// Note: This subscriber will only have access to the most recent piece of data so /// do not expect that data sent a long time ago will be present in this subscriber's data - pub fn subscribe_mapped_ttl K>( + pub fn subscribe_mapped_ttl) -> K>( &mut self, map: F, ttl: Duration, @@ -289,7 +298,7 @@ impl LocalPublisher { } } -impl Clone for LocalPublisher { +impl Clone for LocalPublisher { fn clone(&self) -> Self { Self { txs: self.txs.clone(), @@ -298,11 +307,12 @@ impl Clone for LocalPublisher { } } -impl Publisher for LocalPublisher { +impl Publisher for LocalPublisher { type Data = Data; - type Error = SendError; + type Error = SendError>>; fn publish(&mut self, data: Self::Data) -> Result<(), Self::Error> { + let data = Arc::new(Some(data)); let txs = self.txs.lock().unwrap(); for tx in txs.iter() { tx.send(data.clone())?; @@ -351,7 +361,14 @@ mod tests { publisher.publish(data.clone()).unwrap(); } - assert_eq!(*subscriber.get(), datas); + assert_eq!( + subscriber + .get() + .iter() + .map(|v| v.unwrap().clone()) + .collect::>(), + datas + ); } #[test] @@ -366,32 +383,59 @@ mod tests { std::thread::sleep(Duration::from_millis(100)); assert_eq!(*short_subscriber.get(), None); - assert_eq!(long_subscriber.get().unwrap().0, data); + assert_eq!( + long_subscriber.get().as_ref().unwrap().0.clone().unwrap(), + data + ); } #[test] fn test_publish_mapped_subscriber() { let mut publisher = LocalPublisher::new(); - let mut subscriber = publisher.subscribe_mapped(|data: &TestData| data.num); + let mut subscriber = + publisher.subscribe_mapped(|data: &Option| data.unwrap().num); let data = TestData::new(); publisher.publish(data.clone()).unwrap(); - assert_eq!(*subscriber.get().get(&data.num).unwrap(), data); + assert_eq!( + subscriber + .get() + .get(&data.num) + .unwrap() + .as_ref() + .unwrap() + .clone(), + data + ); } #[test] fn test_publish_mapped_ttl_subscriber() { let mut publisher = LocalPublisher::new(); - let mut short_subscriber = - publisher.subscribe_mapped_ttl(|data: &TestData| data.num, Duration::from_nanos(1)); - let mut long_subscriber = - publisher.subscribe_mapped_ttl(|data: &TestData| data.num, Duration::from_secs(5)); + let mut short_subscriber = publisher.subscribe_mapped_ttl( + |data: &Option| data.unwrap().num, + Duration::from_nanos(1), + ); + let mut long_subscriber = publisher.subscribe_mapped_ttl( + |data: &Option| data.unwrap().num, + Duration::from_secs(5), + ); let data = TestData::new(); publisher.publish(data.clone()).unwrap(); assert_eq!(short_subscriber.get().get(&data.num), None); - assert_eq!(long_subscriber.get().get(&data.num).unwrap().0, data); + assert_eq!( + long_subscriber + .get() + .get(&data.num) + .unwrap() + .0 + .as_ref() + .unwrap() + .clone(), + data + ); } }