Skip to content

Commit

Permalink
remove retry_sync (#1032)
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx authored Sep 4, 2024
1 parent 428826e commit 3af97cb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 86 deletions.
9 changes: 4 additions & 5 deletions xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::storage::group_message::StoredGroupMessage;
use crate::storage::refresh_state::EntityKind;
use crate::storage::StorageError;
use crate::subscriptions::{MessagesStreamInfo, StreamHandle};
use crate::XmtpApi;
use crate::{retry::Retry, retry_async, Client};
use crate::{retry_sync, XmtpApi};
use prost::Message;
use xmtp_proto::xmtp::mls::api::v1::GroupMessage;

Expand All @@ -33,7 +33,7 @@ impl MlsGroup {
);
let created_ns = msgv1.created_ns;

if !self.has_already_synced(msg_id)? {
if !self.has_already_synced(msg_id).await? {
let client_pointer = client.clone();
let process_result = retry_async!(
Retry::default(),
Expand Down Expand Up @@ -96,14 +96,13 @@ impl MlsGroup {
}

// Checks if a message has already been processed through a sync
fn has_already_synced(&self, id: u64) -> Result<bool, GroupError> {
async fn has_already_synced(&self, id: u64) -> Result<bool, GroupError> {
let check_for_last_cursor = || -> Result<i64, StorageError> {
let conn = self.context.store.conn()?;
conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group)
};

let last_id = retry_sync!(Retry::default(), check_for_last_cursor)?;

let last_id = retry_async!(Retry::default(), (async { check_for_last_cursor() }))?;
Ok(last_id >= id as i64)
}

Expand Down
93 changes: 12 additions & 81 deletions xmtp_mls/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,75 +115,6 @@ impl Retry {
}
}

/// Retry a function, specifying the strategy with $retry.
///
/// # Example
/// ```
/// use thiserror::Error;
/// use xmtp_mls::{retry_sync, retry::{RetryableError, Retry}};
///
/// #[derive(Debug, Error)]
/// enum MyError {
/// #[error("A retryable error")]
/// Retryable,
/// #[error("An error we don't want to retry")]
/// NotRetryable
/// }
///
/// impl RetryableError for MyError {
/// fn is_retryable(&self) -> bool {
/// match self {
/// Self::Retryable => true,
/// _=> false,
/// }
/// }
/// }
///
/// fn fallable_fn(i: usize) -> Result<(), MyError> {
/// if i == 2 {
/// return Ok(());
/// }
///
/// Err(MyError::Retryable)
/// }
///
/// fn main() {
/// let mut i = 0;
/// retry_sync!(Retry::default(), (|| -> Result<(), MyError> {
/// let res = fallable_fn(i);
/// i += 1;
/// res
/// })).unwrap();
///
/// }
/// ```
#[macro_export]
macro_rules! retry_sync {
($retry: expr, $code: tt) => {{
#[allow(unused)]
use $crate::retry::RetryableError;
let mut attempts = 0;
tracing::trace_span!("retry").in_scope(|| loop {
#[allow(clippy::redundant_closure_call)]
match $code() {
Ok(v) => break Ok(v),
Err(e) => {
if (&e).is_retryable() && attempts < $retry.retries() {
log::info!(
"retrying function that failed with error=`{}`",
e.to_string()
);
attempts += 1;
std::thread::sleep($retry.duration(attempts));
} else {
break Err(e);
}
}
}
})
}};
}

/// Retry but for an async context
/// ```
/// use xmtp_mls::{retry_async, retry::{RetryableError, Retry}};
Expand Down Expand Up @@ -304,8 +235,8 @@ mod tests {
Err(SomeError::ARetryableError)
}

#[test]
fn it_retries_twice_and_succeeds() {
#[tokio::test]
async fn it_retries_twice_and_succeeds() {
let mut i = 0;
let mut test_fn = || -> Result<(), SomeError> {
if i == 2 {
Expand All @@ -316,11 +247,11 @@ mod tests {
Ok(())
};

retry_sync!(Retry::default(), test_fn).unwrap();
retry_async!(Retry::default(), (async { test_fn() })).unwrap();
}

#[test]
fn it_works_with_random_args() {
#[tokio::test]
async fn it_works_with_random_args() {
let mut i = 0;
let list = vec!["String".into(), "Foo".into()];
let mut test_fn = || -> Result<(), SomeError> {
Expand All @@ -331,29 +262,29 @@ mod tests {
retryable_with_args(i, "Hello".to_string(), &list)
};

retry_sync!(Retry::default(), test_fn).unwrap();
retry_async!(Retry::default(), (async { test_fn() })).unwrap();
}

#[test]
fn it_fails_on_three_retries() {
#[tokio::test]
async fn it_fails_on_three_retries() {
let closure = || -> Result<(), SomeError> {
retry_error_fn()?;
Ok(())
};
let result: Result<(), SomeError> = retry_sync!(Retry::default(), (closure));
let result: Result<(), SomeError> = retry_async!(Retry::default(), (async { closure() }));

assert!(result.is_err())
}

#[test]
fn it_only_runs_non_retryable_once() {
#[tokio::test]
async fn it_only_runs_non_retryable_once() {
let mut attempts = 0;
let mut test_fn = || -> Result<(), SomeError> {
attempts += 1;
Err(SomeError::DontRetryThis)
};

let _r = retry_sync!(Retry::default(), test_fn);
let _r = retry_async!(Retry::default(), (async { test_fn() }));

assert_eq!(attempts, 1);
}
Expand Down

0 comments on commit 3af97cb

Please sign in to comment.