-
Notifications
You must be signed in to change notification settings - Fork 2
/
channel.rs
178 lines (154 loc) · 5.31 KB
/
channel.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::task::{Poll, Waker};
/// Create a naive but fast enough async->sync bounded channel.
/// - The sender is non-blocking and can operate in an async context.
/// - The receiver is blocking and expected to operate in a sync context.
pub fn new<T>(size: usize) -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
size,
waiter: Condvar::new(),
next_sender_id: AtomicU64::new(1),
senders: AtomicU64::new(1),
locked: Mutex::new(Shared {
items: Default::default(),
wakers: Default::default(),
}),
});
let sender = Sender {
id: 0,
inner: Arc::downgrade(&inner),
};
let receiver = Receiver {
inner,
items: Default::default(),
};
(sender, receiver)
}
pub enum SendError<T> {
/// If we try to send an item, but the receiver is
/// dropped before we can, we'll be given the item
/// back in this error.
ReceiverDropped(T),
/// If we try polling the `send` future again after
/// it's completed, we'll get this back.
AlreadyReady,
}
pub struct Sender<T> {
id: u64,
inner: Weak<Inner<T>>,
}
impl<T: Send + 'static> Sender<T> {
pub fn send(&self, item: T) -> impl Future<Output = Result<(), SendError<T>>> + 'static {
let inner = self.inner.clone();
let id = self.id;
let mut maybe_item = Some(item);
std::future::poll_fn(move |ctx| {
// If there is no item in our option, we've already
// finished, so return an error.
let Some(item) = maybe_item.take() else {
return Poll::Ready(Err(SendError::AlreadyReady));
};
// Try to upgrade Arc to a strong pointer to access
// contents. If we can't, it means receiver was dropped,
// so hand back the item as an error.
let Some(inner) = inner.upgrade() else {
return Poll::Ready(Err(SendError::ReceiverDropped(item)));
};
let mut locked = inner.locked.lock().unwrap();
// Not ready yet if at capacity. We'll expect the
// other side to wake the waker when it consumes items
// so that we can try again. Put the item back into the
// option for next time.
if locked.items.len() >= inner.size {
locked.wakers.insert(id, ctx.waker().clone());
drop(locked);
maybe_item = Some(item);
return Poll::Pending;
}
// Else, push the item. Tell the
// other side to stop waiting.
locked.items.push_back(item);
drop(locked);
inner.waiter.notify_one();
Poll::Ready(Ok(()))
})
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let mut id = 0;
// If the receiver still exists, make sure to
// tell it about the new Sender, and give the sender
// a unique ID.
if let Some(inner) = self.inner.upgrade() {
id = inner.next_sender_id.fetch_add(1, Ordering::Relaxed);
inner.senders.fetch_add(1, Ordering::Relaxed);
}
Self {
id,
inner: self.inner.clone(),
}
}
}
impl<T> std::fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender")
.field("id", &self.id)
.field("inner", &"<inner>")
.finish()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.upgrade() {
let num_senders = inner.senders.fetch_sub(1, Ordering::Relaxed);
// Remember; the _previous_ value was returned, so
// inner.senders is now 0.
if num_senders == 1 {
inner.waiter.notify_one();
}
}
}
}
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
items: VecDeque<T>,
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Option<T> {
if self.items.is_empty() {
let mut shared = self.inner.locked.lock().unwrap();
let senders = &self.inner.senders;
// Wait until notified that more items have been put in the queue.
// this may wake up spuriously, so loop until items. Stop waiting
// if no more senders.
while shared.items.is_empty() && senders.load(Ordering::Relaxed) > 0 {
shared = self.inner.waiter.wait(shared).unwrap();
}
// Move the items to our local buffer, and wake the waker so that
// futures waiting to send are woken, too.
self.items.append(&mut shared.items);
let wakers_iter = shared.wakers.drain();
for (_, waker) in wakers_iter {
waker.wake();
}
}
// We'll start returning None when we're closed
// and we run out of items.
self.items.pop_front()
}
}
struct Inner<T> {
size: usize,
waiter: Condvar,
next_sender_id: AtomicU64,
senders: AtomicU64,
locked: Mutex<Shared<T>>,
}
struct Shared<T> {
wakers: HashMap<u64, Waker>,
items: VecDeque<T>,
}