diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 9d955b60e..266c23301 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -1,6 +1,3 @@ -use std::pin::Pin; -use std::time::Duration; - use super::group_metadata::ConversationType; use super::{GroupError, MlsGroup}; use crate::configuration::NS_IN_HOUR; @@ -27,12 +24,13 @@ use aes_gcm::{ aead::{Aead, KeyInit}, Aes256Gcm, }; -use futures::{Stream, StreamExt}; +use futures::{pin_mut, Stream, StreamExt}; use rand::{ distributions::{Alphanumeric, DistString}, Rng, RngCore, }; use serde::{Deserialize, Serialize}; +use std::time::Duration; use thiserror::Error; use tracing::warn; use xmtp_cryptography::utils as crypto_utils; @@ -132,9 +130,11 @@ where let client = self.clone(); let receiver = client.local_events.subscribe(); - let mut sync_stream = Box::pin(receiver.stream_sync_messages()); + let sync_stream = receiver.stream_sync_messages(); async move { + pin_mut!(sync_stream); + while let Err(err) = client.sync_worker(&mut sync_stream).await { tracing::error!("Sync worker error: {err}"); } @@ -152,10 +152,8 @@ where { pub(crate) async fn sync_worker( &self, - sync_stream: &mut Pin>>>, + sync_stream: &mut (impl Stream> + Unpin), ) -> Result<(), DeviceSyncError> { - let mut sync_stream = sync_stream.as_mut(); - let provider = self.mls_provider()?; let query_retry = RetryBuilder::default() diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index e10a55d80..664ba318d 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -230,10 +230,6 @@ pub struct MlsGroup { mutex: Arc>, } -// C is wrapped in an Arc, and so is the Mutex, so this is fine. -// unsafe impl Send for MlsGroup {} -// unsafe impl Sync for MlsGroup {} - #[derive(Default)] pub struct GroupMetadataOptions { pub name: Option,