Skip to content

Commit

Permalink
Improve Gate::close to support repeated calls (#662)
Browse files Browse the repository at this point in the history
* Improve Gate::close to support repeated calls

If Gate::close has a timeout, closing the gate is left in a dangling
state with no way to safely resume waiting. This change makes it safe in
that calling close on a closing Gate still waits for any outstanding
tasks. This way you can repeatedly try calling gate close with a timeout
set while safely waiting for all tasks to complete.

This is technically an observable change in that a previous call to close
when still in "closing" would have returned an error but now waits for the
currently running tasks instead. I can't imagine anyone actually relies
too much on this.

* Make the Gate::close synchronous returning future

It's useful if the pattern you want to do is to initiate a close and
then wait for it completing later. Otherwise you sometimes have to jump
through hoops.
  • Loading branch information
vlovich authored May 13, 2024
1 parent 83d3023 commit c373d35
Showing 1 changed file with 160 additions and 18 deletions.
178 changes: 160 additions & 18 deletions glommio/src/sync/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use futures_lite::Future;

use crate::{
channels::local_channel::{self, LocalSender},
channels::local_channel::{self, LocalReceiver, LocalSender},
GlommioError, ResourceType, Task, TaskQueueHandle,
};

Expand Down Expand Up @@ -83,17 +83,35 @@ impl Gate {
)
}

/// Close the gate, and wait for all spawned tasks to complete
pub async fn close(&self) -> Result<(), GlommioError<()>> {
self.inner.close().await
/// Close the gate, and return a waiter for all spawned tasks to complete. If the gate is currently closing, the
/// returned future will wait for it to close before returning a success. This is particularly useful if you might
/// have a timeout on the close - the would otherwise be no safe way to retry & wait for remaining tasks to finish.
///
/// NOTE: After this function returns, [is_open](Self::is_open) returns false and any subsequent attempts to acquire
/// a pass will fail, even if you drop the future. The future will return an error if and only if the gate is
/// already fully closed
pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
self.inner.close()
}

/// Whether the gate is open or not
/// Whether the gate is open or not.
pub fn is_open(&self) -> bool {
self.inner.is_open()
}

/// This returns true only if [Self::close] has been called and all spawned tasks are complete. If it returns false,
/// you may call [Self::close] without it returning an error and it'll wait for all spawned tasks to complete.
///
/// NOTE: multiple concurrent calls to [Self::close] may be a performance issue since each invocation to close will
/// allocate some nominal amount of memory for the channel underneath.
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}

type PreviousWaiter = Option<LocalSender<bool>>;
type CurrentClosure = LocalReceiver<bool>;

#[derive(Debug)]
struct GateInner {
count: Cell<usize>,
Expand Down Expand Up @@ -124,25 +142,58 @@ impl GateInner {
}
}

pub async fn close(&self) -> Result<(), GlommioError<()>> {
if self.is_open() {
if self.count.get() == 0 {
*self.state.borrow_mut() = State::Closed;
} else {
async fn wait_for_closure(
waiter: Result<Option<(CurrentClosure, PreviousWaiter)>, GlommioError<()>>,
) -> Result<(), GlommioError<()>> {
if let Some((waiter, previous_closer)) = waiter? {
waiter.recv().await;
if let Some(previous_closer) = previous_closer {
// Previous channel may be dropped so ignore the result.
let _ = previous_closer.try_send(true);
}
}

Ok(())
}

pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
match self.state.replace(State::Closed) {
State::Open => {
if self.count.get() != 0 {
let (sender, receiver) = local_channel::new_bounded(1);
self.state.replace(State::Closing(sender));
Self::wait_for_closure(Ok(Some((receiver, None))))
} else {
Self::wait_for_closure(Ok(None))
}
}
State::Closing(previous_closer) => {
assert!(
self.count.get() != 0,
"If count is 0 then the state should have been marked as closed"
);
assert!(
!previous_closer.is_full(),
"Already notified that the gate is closed!"
);

let (sender, receiver) = local_channel::new_bounded(1);
*self.state.borrow_mut() = State::Closing(sender);
receiver.recv().await;
self.state.replace(State::Closing(sender));

Self::wait_for_closure(Ok(Some((receiver, Some(previous_closer)))))
}
Ok(())
} else {
Err(GlommioError::Closed(ResourceType::Gate))
State::Closed => Self::wait_for_closure(Err(GlommioError::Closed(ResourceType::Gate))),
}
}

pub fn is_open(&self) -> bool {
matches!(*self.state.borrow(), State::Open)
}

pub fn is_closed(&self) -> bool {
matches!(*self.state.borrow(), State::Closed)
}

pub fn notify_closed(&self) {
if let State::Closing(sender) = self.state.replace(State::Closed) {
sender.try_send(true).unwrap();
Expand All @@ -154,10 +205,11 @@ impl GateInner {

#[cfg(test)]
mod tests {
use crate::{enclose, LocalExecutor};

use super::*;
use crate::sync::Semaphore;
use crate::{enclose, timer::timeout, LocalExecutor};
use futures::join;
use std::time::Duration;

#[test]
fn test_immediate_close() {
Expand All @@ -168,7 +220,9 @@ mod tests {
gate.close().await.unwrap();
assert!(!gate.is_open());

assert!(gate.spawn(async {}).is_err())
assert!(gate.spawn(async {}).is_err());

assert!(gate.close().await.is_err());
})
}

Expand Down Expand Up @@ -229,4 +283,92 @@ mod tests {
assert!(!running.get());
})
}

#[test]
fn test_concurrent_close() {
LocalExecutor::default().run(async {
let gate = &Gate::new();
let gate_closures = &Semaphore::new(0);
let closed = &RefCell::new(false);

let pass = gate.enter().unwrap();

join!(
async {
gate_closures.signal(1);
gate.close().await.unwrap();
assert!(*closed.borrow());
},
async {
gate_closures.signal(1);
gate.close().await.unwrap();
assert!(*closed.borrow());
},
async {
gate_closures.acquire(2).await.unwrap();
drop(pass);
closed.replace(true);
},
);
})
}

#[test]
fn test_close_after_timed_out_close() {
LocalExecutor::default().run(async {
let gate = Gate::new();
let gate = &gate;
let gate_closed_once = Rc::new(Semaphore::new(0));
let task_gate = gate_closed_once.clone();

let _task = gate
.spawn(async move {
task_gate.acquire(1).await.unwrap();
})
.unwrap();

timeout(Duration::from_millis(1), async move {
gate.close().await.unwrap();
Ok(())
})
.await
.expect_err("Should have timed out");

assert!(
!gate.is_closed(),
"Should still be waiting for a task that hasn't finished"
);

gate_closed_once.signal(1);

gate.close().await.unwrap();
})
}

#[test]
fn test_marked_closed_without_waiting() {
LocalExecutor::default().run(async {
let gate = Gate::new();
// Even if task is immediately cancelled, the gate still closes.
drop(gate.close());
assert!(gate.is_closed());

let gate = Gate::new();
let pass = gate.enter().unwrap();
// Even if task is cancelled, the gate is still marked as closing.
drop(gate.close());
assert!(!gate.is_open());
assert!(!gate.is_closed());
// Here we install a waiter after the aborted cancel.
let wait_for_closure = gate.close();
join!(
async move {
drop(pass);
},
async move {
wait_for_closure.await.unwrap();
}
);
})
}
}

0 comments on commit c373d35

Please sign in to comment.