From 3af97cb69435e5b9daa4577bad6a3bd187834d97 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 4 Sep 2024 17:33:33 -0400 Subject: [PATCH] remove retry_sync (#1032) --- xmtp_mls/src/groups/subscriptions.rs | 9 ++- xmtp_mls/src/retry.rs | 93 ++++------------------------ 2 files changed, 16 insertions(+), 86 deletions(-) diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index 88cf2faa6..cb8894271 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -9,8 +9,8 @@ use crate::storage::group_message::StoredGroupMessage; use crate::storage::refresh_state::EntityKind; use crate::storage::StorageError; use crate::subscriptions::{MessagesStreamInfo, StreamHandle}; +use crate::XmtpApi; use crate::{retry::Retry, retry_async, Client}; -use crate::{retry_sync, XmtpApi}; use prost::Message; use xmtp_proto::xmtp::mls::api::v1::GroupMessage; @@ -33,7 +33,7 @@ impl MlsGroup { ); let created_ns = msgv1.created_ns; - if !self.has_already_synced(msg_id)? { + if !self.has_already_synced(msg_id).await? { let client_pointer = client.clone(); let process_result = retry_async!( Retry::default(), @@ -96,14 +96,13 @@ impl MlsGroup { } // Checks if a message has already been processed through a sync - fn has_already_synced(&self, id: u64) -> Result { + async fn has_already_synced(&self, id: u64) -> Result { let check_for_last_cursor = || -> Result { let conn = self.context.store.conn()?; conn.get_last_cursor_for_id(&self.group_id, EntityKind::Group) }; - let last_id = retry_sync!(Retry::default(), check_for_last_cursor)?; - + let last_id = retry_async!(Retry::default(), (async { check_for_last_cursor() }))?; Ok(last_id >= id as i64) } diff --git a/xmtp_mls/src/retry.rs b/xmtp_mls/src/retry.rs index fed9020a1..6bab96703 100644 --- a/xmtp_mls/src/retry.rs +++ b/xmtp_mls/src/retry.rs @@ -115,75 +115,6 @@ impl Retry { } } -/// Retry a function, specifying the strategy with $retry. -/// -/// # Example -/// ``` -/// use thiserror::Error; -/// use xmtp_mls::{retry_sync, retry::{RetryableError, Retry}}; -/// -/// #[derive(Debug, Error)] -/// enum MyError { -/// #[error("A retryable error")] -/// Retryable, -/// #[error("An error we don't want to retry")] -/// NotRetryable -/// } -/// -/// impl RetryableError for MyError { -/// fn is_retryable(&self) -> bool { -/// match self { -/// Self::Retryable => true, -/// _=> false, -/// } -/// } -/// } -/// -/// fn fallable_fn(i: usize) -> Result<(), MyError> { -/// if i == 2 { -/// return Ok(()); -/// } -/// -/// Err(MyError::Retryable) -/// } -/// -/// fn main() { -/// let mut i = 0; -/// retry_sync!(Retry::default(), (|| -> Result<(), MyError> { -/// let res = fallable_fn(i); -/// i += 1; -/// res -/// })).unwrap(); -/// -/// } -/// ``` -#[macro_export] -macro_rules! retry_sync { - ($retry: expr, $code: tt) => {{ - #[allow(unused)] - use $crate::retry::RetryableError; - let mut attempts = 0; - tracing::trace_span!("retry").in_scope(|| loop { - #[allow(clippy::redundant_closure_call)] - match $code() { - Ok(v) => break Ok(v), - Err(e) => { - if (&e).is_retryable() && attempts < $retry.retries() { - log::info!( - "retrying function that failed with error=`{}`", - e.to_string() - ); - attempts += 1; - std::thread::sleep($retry.duration(attempts)); - } else { - break Err(e); - } - } - } - }) - }}; -} - /// Retry but for an async context /// ``` /// use xmtp_mls::{retry_async, retry::{RetryableError, Retry}}; @@ -304,8 +235,8 @@ mod tests { Err(SomeError::ARetryableError) } - #[test] - fn it_retries_twice_and_succeeds() { + #[tokio::test] + async fn it_retries_twice_and_succeeds() { let mut i = 0; let mut test_fn = || -> Result<(), SomeError> { if i == 2 { @@ -316,11 +247,11 @@ mod tests { Ok(()) }; - retry_sync!(Retry::default(), test_fn).unwrap(); + retry_async!(Retry::default(), (async { test_fn() })).unwrap(); } - #[test] - fn it_works_with_random_args() { + #[tokio::test] + async fn it_works_with_random_args() { let mut i = 0; let list = vec!["String".into(), "Foo".into()]; let mut test_fn = || -> Result<(), SomeError> { @@ -331,29 +262,29 @@ mod tests { retryable_with_args(i, "Hello".to_string(), &list) }; - retry_sync!(Retry::default(), test_fn).unwrap(); + retry_async!(Retry::default(), (async { test_fn() })).unwrap(); } - #[test] - fn it_fails_on_three_retries() { + #[tokio::test] + async fn it_fails_on_three_retries() { let closure = || -> Result<(), SomeError> { retry_error_fn()?; Ok(()) }; - let result: Result<(), SomeError> = retry_sync!(Retry::default(), (closure)); + let result: Result<(), SomeError> = retry_async!(Retry::default(), (async { closure() })); assert!(result.is_err()) } - #[test] - fn it_only_runs_non_retryable_once() { + #[tokio::test] + async fn it_only_runs_non_retryable_once() { let mut attempts = 0; let mut test_fn = || -> Result<(), SomeError> { attempts += 1; Err(SomeError::DontRetryThis) }; - let _r = retry_sync!(Retry::default(), test_fn); + let _r = retry_async!(Retry::default(), (async { test_fn() })); assert_eq!(attempts, 1); }