Skip to content

Commit

Permalink
Add timesource abstration to aws-smithy-async (#2721)
Browse files Browse the repository at this point in the history
## Motivation and Context
- Controlling time is required for several testing use cases
- #2087 
- #2262 

## Description
Introduce `TimeSource` trait, a real implementation, and a test
implementation.

## Testing
These changes are used in the timestream PR

## Checklist
No changelog, these changes have no impact since the code is not yet
utilized

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
rcoh authored May 23, 2023
1 parent 64fb3dd commit c0345a5
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 1 deletion.
1 change: 1 addition & 0 deletions rust-runtime/aws-smithy-async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository = "https://github.com/awslabs/smithy-rs"

[features]
rt-tokio = ["tokio/time"]
test-util = []

[dependencies]
pin-project-lite = "0.2"
Expand Down
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

pub mod future;
pub mod rt;
#[cfg(feature = "test-util")]
pub mod test_util;
pub mod time;

/// Given an `Instant` and a `Duration`, assert time elapsed since `Instant` is equal to `Duration`.
/// This macro allows for a 5ms margin of error.
Expand Down
1 change: 1 addition & 0 deletions rust-runtime/aws-smithy-async/src/rt/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn default_async_sleep() -> Option<Arc<dyn AsyncSleep>> {

/// Future returned by [`AsyncSleep`].
#[non_exhaustive]
#[must_use]
pub struct Sleep(Pin<Box<dyn Future<Output = ()> + Send + 'static>>);

impl Debug for Sleep {
Expand Down
241 changes: 241 additions & 0 deletions rust-runtime/aws-smithy-async/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Test utilities for time and sleep

use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

use tokio::sync::oneshot;
use tokio::sync::Barrier;
use tokio::time::timeout;

use crate::rt::sleep::{AsyncSleep, Sleep};
use crate::time::TimeSource;

/// Manually controlled time source
#[derive(Debug, Clone)]
pub struct ManualTimeSource {
start_time: SystemTime,
log: Arc<Mutex<Vec<Duration>>>,
}

impl TimeSource for ManualTimeSource {
fn now(&self) -> SystemTime {
self.start_time + dbg!(self.log.lock().unwrap()).iter().sum()
}
}

/// A sleep implementation where calls to [`AsyncSleep::sleep`] block until [`SleepGate::expect_sleep`] is called
///
/// Create a [`ControlledSleep`] with [`controlled_time_and_sleep`]
#[derive(Debug, Clone)]
pub struct ControlledSleep {
barrier: Arc<Barrier>,
log: Arc<Mutex<Vec<Duration>>>,
duration: Arc<Mutex<Option<Duration>>>,
advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}

/// Gate that allows [`ControlledSleep`] to advance.
///
/// See [`controlled_time_and_sleep`] for more details
pub struct SleepGate {
gate: Arc<Barrier>,
pending: Arc<Mutex<Option<Duration>>>,
advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}

impl ControlledSleep {
fn new(log: Arc<Mutex<Vec<Duration>>>) -> (ControlledSleep, SleepGate) {
let gate = Arc::new(Barrier::new(2));
let pending = Arc::new(Mutex::new(None));
let advance_guard: Arc<Mutex<Option<oneshot::Sender<()>>>> = Default::default();
(
ControlledSleep {
barrier: gate.clone(),
log,
duration: pending.clone(),
advance_guard: advance_guard.clone(),
},
SleepGate {
gate,
pending,
advance_guard,
},
)
}
}

/// Guard returned from [`SleepGate::expect_sleep`]
///
/// # Examples
/// ```rust
/// # use std::sync::Arc;
/// use std::sync::atomic::{AtomicUsize, Ordering};
/// # async {
/// use std::time::{Duration, UNIX_EPOCH};
/// use aws_smithy_async::rt::sleep::AsyncSleep;
/// use aws_smithy_async::test_util::controlled_time_and_sleep;
/// let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
/// let progress = Arc::new(AtomicUsize::new(0));
/// let task_progress = progress.clone();
/// let task = tokio::spawn(async move {
/// let progress = task_progress;
/// progress.store(1, Ordering::Release);
/// sleep.sleep(Duration::from_secs(1)).await;
/// progress.store(2, Ordering::Release);
/// sleep.sleep(Duration::from_secs(2)).await;
/// });
/// while progress.load(Ordering::Acquire) != 1 {}
/// let guard = gate.expect_sleep().await;
/// assert_eq!(guard.duration(), Duration::from_secs(1));
/// assert_eq!(progress.load(Ordering::Acquire), 1);
/// guard.allow_progress();
///
/// let guard = gate.expect_sleep().await;
/// assert_eq!(progress.load(Ordering::Acquire), 2);
/// assert_eq!(task.is_finished(), false);
/// guard.allow_progress();
/// task.await.expect("successful completion");
/// # };
/// ```
pub struct CapturedSleep<'a>(oneshot::Sender<()>, &'a SleepGate, Duration);
impl CapturedSleep<'_> {
/// Allow the calling code to advance past the call to [`AsyncSleep::sleep`]
///
/// In order to facilitate testing with no flakiness, the future returned by the call to `sleep`
/// will not resolve until [`CapturedSleep`] is dropped or this method is called.
///
/// ```rust
/// use std::time::Duration;
/// use aws_smithy_async::rt::sleep::AsyncSleep;
/// fn do_something(sleep: &dyn AsyncSleep) {
/// println!("before sleep");
/// sleep.sleep(Duration::from_secs(1));
/// println!("after sleep");
/// }
/// ```
///
/// To be specific, when `do_something` is called, the code will advance to `sleep.sleep`.
/// When [`SleepGate::expect_sleep`] is called, the 1 second sleep will be captured, but `after sleep`
/// WILL NOT be printed, until `allow_progress` is called.
pub fn allow_progress(self) {
drop(self)
}

/// Duration in the call to [`AsyncSleep::sleep`]
pub fn duration(&self) -> Duration {
self.2
}
}

impl AsRef<Duration> for CapturedSleep<'_> {
fn as_ref(&self) -> &Duration {
&self.2
}
}

impl SleepGate {
/// Expect the time source to sleep
///
/// This returns the duration that was slept and a [`CapturedSleep`]. The drop guard is used
/// to precisely control
pub async fn expect_sleep(&mut self) -> CapturedSleep<'_> {
timeout(Duration::from_secs(1), self.gate.wait())
.await
.expect("timeout");
let dur = self
.pending
.lock()
.unwrap()
.take()
.unwrap_or(Duration::from_secs(123456));
let guard = CapturedSleep(
self.advance_guard.lock().unwrap().take().unwrap(),
self,
dur,
);
guard
}
}

impl AsyncSleep for ControlledSleep {
fn sleep(&self, duration: Duration) -> Sleep {
let barrier = self.barrier.clone();
let log = self.log.clone();
let pending = self.duration.clone();
let drop_guard = self.advance_guard.clone();
Sleep::new(async move {
// 1. write the duration into the shared mutex
assert!(pending.lock().unwrap().is_none());
*pending.lock().unwrap() = Some(duration);
let (tx, rx) = oneshot::channel();
*drop_guard.lock().unwrap() = Some(tx);
// 2. first wait on the barrier—this is how we wait for an invocation of `expect_sleep`
barrier.wait().await;
log.lock().unwrap().push(duration);
let _ = dbg!(rx.await);
})
}
}

/// Returns a trio of tools to test interactions with time
///
/// 1. [`ManualTimeSource`] which starts at a specific time and only advances when `sleep` is called.
/// It MUST be paired with [`ControlledSleep`] in order to function.
pub fn controlled_time_and_sleep(
start_time: SystemTime,
) -> (ManualTimeSource, ControlledSleep, SleepGate) {
let log = Arc::new(Mutex::new(vec![]));
let (sleep, gate) = ControlledSleep::new(log.clone());
(ManualTimeSource { start_time, log }, sleep, gate)
}

#[cfg(test)]
mod test {
use crate::rt::sleep::AsyncSleep;
use crate::test_util::controlled_time_and_sleep;
use crate::time::TimeSource;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::task::yield_now;
use tokio::time::timeout;

#[tokio::test]
async fn test_sleep_gate() {
use std::time::{Duration, UNIX_EPOCH};
let start = UNIX_EPOCH;
let (time, sleep, mut gate) = controlled_time_and_sleep(UNIX_EPOCH);
let progress = Arc::new(AtomicUsize::new(0));
let task_progress = progress.clone();
let task = tokio::spawn(async move {
assert_eq!(time.now(), start);
let progress = task_progress;
progress.store(1, Ordering::Release);
sleep.sleep(Duration::from_secs(1)).await;
assert_eq!(time.now(), start + Duration::from_secs(1));
progress.store(2, Ordering::Release);
sleep.sleep(Duration::from_secs(2)).await;
assert_eq!(time.now(), start + Duration::from_secs(3));
});
while progress.load(Ordering::Acquire) != 1 {
yield_now().await
}
let guard = gate.expect_sleep().await;
assert_eq!(guard.duration(), Duration::from_secs(1));
assert_eq!(progress.load(Ordering::Acquire), 1);
guard.allow_progress();

let guard = gate.expect_sleep().await;
assert_eq!(progress.load(Ordering::Acquire), 2);
assert_eq!(task.is_finished(), false);
guard.allow_progress();
timeout(Duration::from_secs(1), task)
.await
.expect("no timeout")
.expect("successful completion");
}
}
32 changes: 32 additions & 0 deletions rust-runtime/aws-smithy-async/src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Time source abstraction to support WASM and testing
use std::fmt::Debug;
use std::time::SystemTime;

/// Trait with a `now()` function returning the current time
pub trait TimeSource: Debug + Send + Sync {
/// Returns the current time
fn now(&self) -> SystemTime;
}

/// Time source that delegates to [`SystemTime::now`]
#[non_exhaustive]
#[derive(Debug, Default)]
pub struct SystemTimeSource;

impl SystemTimeSource {
/// Creates a new SystemTimeSource
pub fn new() -> Self {
SystemTimeSource
}
}

impl TimeSource for SystemTimeSource {
fn now(&self) -> SystemTime {
SystemTime::now()
}
}
2 changes: 1 addition & 1 deletion tools/ci-scripts/codegen-diff/semver-checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def main(skip_generation=False):
# package and manifest path explicitly
f'--manifest-path {path}/Cargo.toml '
f'-p {path} '
f'--release-type patch', check=False, quiet=True)
f'--release-type minor', check=False, quiet=True)
if status == 0:
eprint('ok!')
else:
Expand Down

0 comments on commit c0345a5

Please sign in to comment.