From 4404f8cdc738228b0c9d1ae5dc23a2feef9a7752 Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Fri, 13 Sep 2024 02:50:10 +0900 Subject: [PATCH] Add doc comments --- rust_crate/src/channel.rs | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/rust_crate/src/channel.rs b/rust_crate/src/channel.rs index 995a2447..6619fe8d 100644 --- a/rust_crate/src/channel.rs +++ b/rust_crate/src/channel.rs @@ -4,16 +4,27 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; +/// The `MessageSender` is used to send messages into a shared message queue. +/// It is clonable, and multiple senders can be created to send messages into +/// the same queue. Each message is sent to a receiver, but only the currently +/// active receiver can receive messages. #[derive(Clone)] pub struct MessageSender { inner: Arc>>, } +/// The `MessageReceiver` is used to asynchronously receive messages from the +/// shared message queue. Only one receiver can be active at a time; new +/// receivers are created by cloning the original. When a receiver is cloned, +/// it becomes the active receiver, and the previous receiver will no longer +/// receive messages. pub struct MessageReceiver { inner: Arc>>, id: usize, // Each receiver has a unique ID } +/// A channel holding a message queue and managing the current active receiver. +/// Only the active receiver can receive messages. struct MessageChannel { queue: VecDeque, waker: Option, @@ -21,6 +32,9 @@ struct MessageChannel { } impl MessageSender { + /// Sends a message to the shared queue. If a receiver is waiting for a + /// message, it will be woken up. This method does not fail if the mutex + /// is poisoned but simply ignores the failure. pub fn send(&self, msg: T) { let mut inner = match self.inner.lock() { Ok(inner) => inner, @@ -29,6 +43,7 @@ impl MessageSender { // Enqueue the message inner.queue.push_back(msg); + // Wake up the previous receiver making it receive `None`, if any if let Some(waker) = inner.waker.take() { waker.wake(); } @@ -36,6 +51,10 @@ impl MessageSender { } impl MessageReceiver { + /// Asynchronously receives the next message from the queue. Only the active + /// receiver is allowed to receive messages. If there are no messages in the + /// queue, the receiver will wait until a new message is sent. If this receiver + /// is not active, it will return `None`. pub async fn recv(&self) -> Option { RecvFuture { inner: self.inner.clone(), @@ -47,6 +66,9 @@ impl MessageReceiver { // Automatically make the cloned receiver the active one impl Clone for MessageReceiver { + /// Clones the receiver and makes the new receiver the active one. The + /// original receiver will no longer receive messages after this clone. + /// This ensures only the most recent receiver can access the message queue. fn clone(&self) -> Self { let mut inner = self.inner.lock().unwrap(); let new_receiver = MessageReceiver { @@ -61,6 +83,9 @@ impl Clone for MessageReceiver { } } +/// A future that represents the attempt of a `MessageReceiver` to receive a +/// message. This future is only completed when the active receiver receives +/// a message from the queue. struct RecvFuture { inner: Arc>>, receiver_id: usize, // Track which receiver is polling @@ -69,6 +94,10 @@ struct RecvFuture { impl Future for RecvFuture { type Output = Option; + /// Polls the future to check if the active receiver has a message in the + /// queue. If no message is available, the task will be put to sleep until + /// a message is sent. If this receiver is not the active receiver, it will + /// return `None`. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut inner = match self.inner.lock() { Ok(inner) => inner, @@ -89,7 +118,10 @@ impl Future for RecvFuture { } } -// Create the message channel with a message queue +/// Creates a message channel with a sender and a receiver. The sender can be +/// used to send messages, and the receiver can be used to receive them +/// asynchronously. Only one receiver is active at a time, and new receivers +/// are created by cloning the original receiver. pub fn message_channel() -> (MessageSender, MessageReceiver) { let channel = Arc::new(Mutex::new(MessageChannel { queue: VecDeque::new(),