From 450be15860ced43391d4ebe84785251bb9211a10 Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Fri, 13 Sep 2024 02:39:10 +0900 Subject: [PATCH 1/4] Wake previous message receiver on clone --- rust_crate/src/channel.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust_crate/src/channel.rs b/rust_crate/src/channel.rs index 7dada693..eeef4c69 100644 --- a/rust_crate/src/channel.rs +++ b/rust_crate/src/channel.rs @@ -53,7 +53,10 @@ impl Clone for MessageReceiver { inner: self.inner.clone(), id: inner.active_receiver_id + 1, // Increment ID for new receiver }; - inner.active_receiver_id = new_receiver.id; // Update active receiver + inner.active_receiver_id = new_receiver.id; + if let Some(waker) = inner.waker.take() { + waker.wake(); + } new_receiver } } From daa5e4dbed5cb11315dc3f0ce98bb1aed6b2846a Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Fri, 13 Sep 2024 02:42:01 +0900 Subject: [PATCH 2/4] Add a comment --- flutter_package/lib/src/load_os.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/flutter_package/lib/src/load_os.dart b/flutter_package/lib/src/load_os.dart index 46f08666..29b23419 100644 --- a/flutter_package/lib/src/load_os.dart +++ b/flutter_package/lib/src/load_os.dart @@ -165,6 +165,7 @@ class RustLibraryNew extends RustLibrary { /// Class for local native library symbols loaded with `RTLD_LOCAL`. /// This is relatively inefficient because `malloc.allocate` is required. +/// It involves extra memory copy before sending the data to Rust. class RustLibraryOld extends RustLibrary { late DynamicLibrary lib; late void Function() startRustLogicExtern; From bfb375749bcf2e10dfa301bf436113effa6a3321 Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Fri, 13 Sep 2024 02:44:29 +0900 Subject: [PATCH 3/4] Organize code --- rust_crate/src/channel.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/rust_crate/src/channel.rs b/rust_crate/src/channel.rs index eeef4c69..995a2447 100644 --- a/rust_crate/src/channel.rs +++ b/rust_crate/src/channel.rs @@ -97,15 +97,12 @@ pub fn message_channel() -> (MessageSender, MessageReceiver) { active_receiver_id: 0, // Start with receiver ID 0 })); - let receiver = MessageReceiver { + let sender = MessageSender { inner: channel.clone(), + }; + let receiver = MessageReceiver { + inner: channel, id: 0, }; - - ( - MessageSender { - inner: channel.clone(), - }, - receiver, - ) + (sender, receiver) } From 4404f8cdc738228b0c9d1ae5dc23a2feef9a7752 Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Fri, 13 Sep 2024 02:50:10 +0900 Subject: [PATCH 4/4] Add doc comments --- rust_crate/src/channel.rs | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/rust_crate/src/channel.rs b/rust_crate/src/channel.rs index 995a2447..6619fe8d 100644 --- a/rust_crate/src/channel.rs +++ b/rust_crate/src/channel.rs @@ -4,16 +4,27 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; +/// The `MessageSender` is used to send messages into a shared message queue. +/// It is clonable, and multiple senders can be created to send messages into +/// the same queue. Each message is sent to a receiver, but only the currently +/// active receiver can receive messages. #[derive(Clone)] pub struct MessageSender { inner: Arc>>, } +/// The `MessageReceiver` is used to asynchronously receive messages from the +/// shared message queue. Only one receiver can be active at a time; new +/// receivers are created by cloning the original. When a receiver is cloned, +/// it becomes the active receiver, and the previous receiver will no longer +/// receive messages. pub struct MessageReceiver { inner: Arc>>, id: usize, // Each receiver has a unique ID } +/// A channel holding a message queue and managing the current active receiver. +/// Only the active receiver can receive messages. struct MessageChannel { queue: VecDeque, waker: Option, @@ -21,6 +32,9 @@ struct MessageChannel { } impl MessageSender { + /// Sends a message to the shared queue. If a receiver is waiting for a + /// message, it will be woken up. This method does not fail if the mutex + /// is poisoned but simply ignores the failure. pub fn send(&self, msg: T) { let mut inner = match self.inner.lock() { Ok(inner) => inner, @@ -29,6 +43,7 @@ impl MessageSender { // Enqueue the message inner.queue.push_back(msg); + // Wake up the previous receiver making it receive `None`, if any if let Some(waker) = inner.waker.take() { waker.wake(); } @@ -36,6 +51,10 @@ impl MessageSender { } impl MessageReceiver { + /// Asynchronously receives the next message from the queue. Only the active + /// receiver is allowed to receive messages. If there are no messages in the + /// queue, the receiver will wait until a new message is sent. If this receiver + /// is not active, it will return `None`. pub async fn recv(&self) -> Option { RecvFuture { inner: self.inner.clone(), @@ -47,6 +66,9 @@ impl MessageReceiver { // Automatically make the cloned receiver the active one impl Clone for MessageReceiver { + /// Clones the receiver and makes the new receiver the active one. The + /// original receiver will no longer receive messages after this clone. + /// This ensures only the most recent receiver can access the message queue. fn clone(&self) -> Self { let mut inner = self.inner.lock().unwrap(); let new_receiver = MessageReceiver { @@ -61,6 +83,9 @@ impl Clone for MessageReceiver { } } +/// A future that represents the attempt of a `MessageReceiver` to receive a +/// message. This future is only completed when the active receiver receives +/// a message from the queue. struct RecvFuture { inner: Arc>>, receiver_id: usize, // Track which receiver is polling @@ -69,6 +94,10 @@ struct RecvFuture { impl Future for RecvFuture { type Output = Option; + /// Polls the future to check if the active receiver has a message in the + /// queue. If no message is available, the task will be put to sleep until + /// a message is sent. If this receiver is not the active receiver, it will + /// return `None`. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut inner = match self.inner.lock() { Ok(inner) => inner, @@ -89,7 +118,10 @@ impl Future for RecvFuture { } } -// Create the message channel with a message queue +/// Creates a message channel with a sender and a receiver. The sender can be +/// used to send messages, and the receiver can be used to receive them +/// asynchronously. Only one receiver is active at a time, and new receivers +/// are created by cloning the original receiver. pub fn message_channel() -> (MessageSender, MessageReceiver) { let channel = Arc::new(Mutex::new(MessageChannel { queue: VecDeque::new(),