Skip to content

Commit

Permalink
Fix a deadlock in channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina committed Nov 4, 2019
1 parent 20cdf73 commit e9edadf
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 182 deletions.
95 changes: 43 additions & 52 deletions src/sync/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,41 +138,34 @@ impl<T> Sender<T> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let msg = self.msg.take().unwrap();

// Try sending the message.
let poll = match self.channel.try_send(msg) {
Ok(()) => Poll::Ready(()),
Err(TrySendError::Disconnected(msg)) => {
self.msg = Some(msg);
Poll::Pending
loop {
let msg = self.msg.take().unwrap();

// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.channel.send_wakers.remove(key);
}
Err(TrySendError::Full(msg)) => {
// Insert this send operation.
match self.opt_key {
None => self.opt_key = Some(self.channel.send_wakers.insert(cx)),
Some(key) => self.channel.send_wakers.update(key, cx),

// Try sending the message.
match self.channel.try_send(msg) {
Ok(()) => return Poll::Ready(()),
Err(TrySendError::Disconnected(msg)) => {
self.msg = Some(msg);
return Poll::Pending;
}
Err(TrySendError::Full(msg)) => {
self.msg = Some(msg);

// Insert this send operation.
self.opt_key = Some(self.channel.send_wakers.insert(cx));

// Try sending the message again.
match self.channel.try_send(msg) {
Ok(()) => Poll::Ready(()),
Err(TrySendError::Disconnected(msg)) | Err(TrySendError::Full(msg)) => {
self.msg = Some(msg);
Poll::Pending
// If the channel is still full and not disconnected, return.
if self.channel.is_full() && !self.channel.is_disconnected() {
return Poll::Pending;
}
}
}
};

if poll.is_ready() {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.channel.send_wakers.complete(key);
}
}

poll
}
}

Expand Down Expand Up @@ -543,34 +536,27 @@ fn poll_recv<T>(
opt_key: &mut Option<usize>,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
// Try receiving a message.
let poll = match channel.try_recv() {
Ok(msg) => Poll::Ready(Some(msg)),
Err(TryRecvError::Disconnected) => Poll::Ready(None),
Err(TryRecvError::Empty) => {
// Insert this receive operation.
match *opt_key {
None => *opt_key = Some(wakers.insert(cx)),
Some(key) => wakers.update(key, cx),
}

// Try receiving a message again.
match channel.try_recv() {
Ok(msg) => Poll::Ready(Some(msg)),
Err(TryRecvError::Disconnected) => Poll::Ready(None),
Err(TryRecvError::Empty) => Poll::Pending,
}
}
};

if poll.is_ready() {
loop {
// If the current task is in the set, remove it.
if let Some(key) = opt_key.take() {
wakers.complete(key);
wakers.remove(key);
}
}

poll
// Try receiving a message.
match channel.try_recv() {
Ok(msg) => return Poll::Ready(Some(msg)),
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
Err(TryRecvError::Empty) => {
// Insert this receive operation.
*opt_key = Some(wakers.insert(cx));

// If the channel is still empty and not disconnected, return.
if channel.is_empty() && !channel.is_disconnected() {
return Poll::Pending;
}
}
}
}
}

/// A slot in a channel.
Expand Down Expand Up @@ -862,6 +848,11 @@ impl<T> Channel<T> {
}
}

/// Returns `true` if the channel is disconnected.
pub fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
}

/// Returns `true` if the channel is empty.
fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
Expand Down
42 changes: 18 additions & 24 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,32 +104,26 @@ impl<T> Mutex<T> {
type Output = MutexGuard<'a, T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll = match self.mutex.try_lock() {
Some(guard) => Poll::Ready(guard),
None => {
// Insert this lock operation.
match self.opt_key {
None => self.opt_key = Some(self.mutex.wakers.insert(cx)),
Some(key) => self.mutex.wakers.update(key, cx),
}

// Try locking again because it's possible the mutex got unlocked just
// before the current task was inserted into the waker set.
match self.mutex.try_lock() {
Some(guard) => Poll::Ready(guard),
None => Poll::Pending,
}
}
};

if poll.is_ready() {
loop {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.mutex.wakers.complete(key);
self.mutex.wakers.remove(key);
}
}

poll
// Try acquiring the lock.
match self.mutex.try_lock() {
Some(guard) => return Poll::Ready(guard),
None => {
// Insert this lock operation.
self.opt_key = Some(self.mutex.wakers.insert(cx));

// If the mutex is still locked, return.
if self.mutex.locked.load(Ordering::SeqCst) {
return Poll::Pending;
}
}
}
}
}
}

Expand Down Expand Up @@ -266,8 +260,8 @@ impl<T> Drop for MutexGuard<'_, T> {
// Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`.
self.0.locked.store(false, Ordering::SeqCst);

// Notify one blocked `lock()` operation.
self.0.wakers.notify_one();
// Notify a blocked `lock()` operation if none were notified already.
self.0.wakers.notify_any();
}
}

Expand Down
90 changes: 40 additions & 50 deletions src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,26 @@ impl<T> RwLock<T> {
type Output = RwLockReadGuard<'a, T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll = match self.lock.try_read() {
Some(guard) => Poll::Ready(guard),
None => {
// Insert this lock operation.
match self.opt_key {
None => self.opt_key = Some(self.lock.read_wakers.insert(cx)),
Some(key) => self.lock.read_wakers.update(key, cx),
}

// Try locking again because it's possible the lock got unlocked just
// before the current task was inserted into the waker set.
match self.lock.try_read() {
Some(guard) => Poll::Ready(guard),
None => Poll::Pending,
}
}
};

if poll.is_ready() {
loop {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.lock.read_wakers.complete(key);
self.lock.read_wakers.remove(key);
}
}

poll
// Try acquiring a read lock.
match self.lock.try_read() {
Some(guard) => return Poll::Ready(guard),
None => {
// Insert this lock operation.
self.opt_key = Some(self.lock.read_wakers.insert(cx));

// If the lock is still acquired for writing, return.
if self.lock.state.load(Ordering::SeqCst) & WRITE_LOCK != 0 {
return Poll::Pending;
}
}
}
}
}
}

Expand All @@ -143,9 +137,10 @@ impl<T> RwLock<T> {
if let Some(key) = self.opt_key {
self.lock.read_wakers.cancel(key);

// If there are no active readers, wake one of the writers.
// If there are no active readers, notify a blocked writer if none were
// notified already.
if self.lock.state.load(Ordering::SeqCst) & READ_COUNT_MASK == 0 {
self.lock.write_wakers.notify_one();
self.lock.write_wakers.notify_any();
}
}
}
Expand Down Expand Up @@ -238,32 +233,26 @@ impl<T> RwLock<T> {
type Output = RwLockWriteGuard<'a, T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll = match self.lock.try_write() {
Some(guard) => Poll::Ready(guard),
None => {
// Insert this lock operation.
match self.opt_key {
None => self.opt_key = Some(self.lock.write_wakers.insert(cx)),
Some(key) => self.lock.write_wakers.update(key, cx),
}

// Try locking again because it's possible the lock got unlocked just
// before the current task was inserted into the waker set.
match self.lock.try_write() {
Some(guard) => Poll::Ready(guard),
None => Poll::Pending,
}
}
};

if poll.is_ready() {
loop {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.lock.write_wakers.complete(key);
self.lock.write_wakers.remove(key);
}
}

poll
// Try acquiring a write lock.
match self.lock.try_write() {
Some(guard) => return Poll::Ready(guard),
None => {
// Insert this lock operation.
self.opt_key = Some(self.lock.write_wakers.insert(cx));

// If the lock is still acquired for reading or writing, return.
if self.lock.state.load(Ordering::SeqCst) != 0 {
return Poll::Pending;
}
}
}
}
}
}

Expand Down Expand Up @@ -392,9 +381,9 @@ impl<T> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
let state = self.0.state.fetch_sub(ONE_READ, Ordering::SeqCst);

// If this was the last read, wake one of the writers.
// If this was the last reader, notify a blocked writer if none were notified already.
if state & READ_COUNT_MASK == ONE_READ {
self.0.write_wakers.notify_one();
self.0.write_wakers.notify_any();
}
}
}
Expand Down Expand Up @@ -431,8 +420,9 @@ impl<T> Drop for RwLockWriteGuard<'_, T> {

// Notify all blocked readers.
if !self.0.read_wakers.notify_all() {
// If there were no blocked readers, notify a blocked writer.
self.0.write_wakers.notify_one();
// If there were no blocked readers, notify a blocked writer if none were notified
// already.
self.0.write_wakers.notify_any();
}
}
}
Expand Down
Loading

0 comments on commit e9edadf

Please sign in to comment.