Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic sync - sync worker #1207

Merged
merged 51 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
baa9786
make local events available
codabrink Oct 31, 2024
ef8004e
wip
codabrink Oct 31, 2024
9974979
wip
codabrink Oct 31, 2024
c2a0f3d
bindings
codabrink Oct 31, 2024
83cfac3
node bindings
codabrink Oct 31, 2024
f2dc043
wasm bindings
codabrink Oct 31, 2024
9b31a2e
cli
codabrink Oct 31, 2024
d4fcc08
lint
codabrink Oct 31, 2024
8d9063d
cleanup
codabrink Oct 31, 2024
511ce8a
Merge branch 'coda/sync-bindings' into coda/sync-auto
codabrink Oct 31, 2024
d718282
Merge remote-tracking branch 'origin/main' into coda/sync-auto
codabrink Oct 31, 2024
2e3a581
stream
codabrink Nov 1, 2024
e0c18cf
Merge remote-tracking branch 'origin/main' into coda/sync-auto
codabrink Nov 1, 2024
b82f374
process replies, and respond to requests
codabrink Nov 1, 2024
62ce1d4
wip
codabrink Nov 4, 2024
c4af0d1
wip
codabrink Nov 4, 2024
3f5a8b5
lifetime pollution
codabrink Nov 4, 2024
d908a2b
wip
codabrink Nov 4, 2024
f6f77db
consent test passes
codabrink Nov 4, 2024
958ad90
complete the test
codabrink Nov 4, 2024
9be90d6
improve consent sync test
codabrink Nov 5, 2024
61fbfd4
cleanup
codabrink Nov 5, 2024
9daac7a
update history sync test
codabrink Nov 5, 2024
6031e69
message history server
codabrink Nov 5, 2024
284874e
flaky tests
codabrink Nov 5, 2024
dd213ad
cleanup
codabrink Nov 5, 2024
dba83ed
cleanup
codabrink Nov 5, 2024
e3e6049
cleanup
codabrink Nov 5, 2024
02065ac
worker don't die
codabrink Nov 5, 2024
1fbc22b
Merge branch 'main' into coda/sync-auto
codabrink Nov 5, 2024
7b2cf1e
debounce
codabrink Nov 5, 2024
edcae32
Merge branch 'coda/sync-auto' of github.com:xmtp/libxmtp into coda/sy…
codabrink Nov 5, 2024
1cd72f8
lint
codabrink Nov 5, 2024
e0d839f
fix
codabrink Nov 6, 2024
646a479
remove unwraps
codabrink Nov 6, 2024
07ea0d0
start the sync worker after registering
codabrink Nov 6, 2024
8ddabe7
fix wasm tests
codabrink Nov 6, 2024
ff9dd61
add streaming cfg for wasm
codabrink Nov 6, 2024
3680fa8
wip
codabrink Nov 6, 2024
3d3a130
wip
codabrink Nov 6, 2024
715df1e
move the constraints up a level
codabrink Nov 6, 2024
c064da3
cleanup
codabrink Nov 6, 2024
8f4477c
Merge branch 'main' into coda/sync-auto
mchenani Nov 6, 2024
9f752f3
rename
codabrink Nov 6, 2024
59f3959
Merge branch 'coda/sync-auto' of github.com:xmtp/libxmtp into coda/sy…
codabrink Nov 6, 2024
a136396
unreachable
codabrink Nov 6, 2024
ab05b62
error update
codabrink Nov 6, 2024
d4b47f8
wasm send theory
codabrink Nov 7, 2024
0a2fcc8
Merge remote-tracking branch 'origin/main' into coda/sync-auto
codabrink Nov 7, 2024
d7aaf40
wip
codabrink Nov 7, 2024
3d875fa
cleanup
codabrink Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,15 @@ pub async fn create_client(
legacy_signed_private_key_proto,
);

let xmtp_client: RustXmtpClient = match history_sync_url {
Some(url) => {
ClientBuilder::new(identity_strategy)
.api_client(api_client)
.store(store)
.history_sync_url(&url)
.build()
.await?
}
None => {
ClientBuilder::new(identity_strategy)
.api_client(api_client)
.store(store)
.build()
.await?
}
};
let mut builder = ClientBuilder::new(identity_strategy)
.api_client(api_client)
.store(store);

if let Some(url) = &history_sync_url {
builder = builder.history_sync_url(url);
}

let xmtp_client = builder.build().await?;

log::info!(
"Created XMTP client for inbox_id: {}",
Expand Down Expand Up @@ -397,6 +389,26 @@ impl FfiXmtpClient {
.register_identity(signature_request.clone())
.await?;

self.maybe_start_sync_worker().await?;

Ok(())
}

/// Starts the sync worker if the history sync url is present.
async fn maybe_start_sync_worker(&self) -> Result<(), GenericError> {
if self.inner_client.history_sync_url().is_none() {
return Ok(());
}

let provider = self
.inner_client
.mls_provider()
.map_err(GenericError::from_error)?;
self.inner_client
.start_sync_worker(&provider)
.await
.map_err(GenericError::from_error)?;

Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions dev/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ services:
ports:
- 8545:8545

history-server:
image: ghcr.io/xmtp/message-history-server:main
platform: linux/amd64
ports:
- 5558:5558

db:
image: postgres:13
environment:
Expand Down
64 changes: 10 additions & 54 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ enum Commands {
account_addresses: Vec<String>,
},
RequestHistorySync {},
ReplyToHistorySyncRequest {},
ProcessHistorySyncReply {},
ProcessConsentSyncReply {},
ListHistorySyncMessages {},
/// Information about the account that owns the DB
Info {},
Expand Down Expand Up @@ -415,60 +412,19 @@ async fn main() -> color_eyre::eyre::Result<()> {
);
}
Commands::RequestHistorySync {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
let (group_id, _) = client
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await?;
let group_id_str = hex::encode(group_id);
info!(
group_id = group_id_str,
"Sent history sync request in sync group {group_id_str}"
);
}
Commands::ReplyToHistorySyncRequest {} => {
let provider = client.mls_provider()?;
let group = client.get_sync_group()?;
let group_id_str = hex::encode(group.group_id);
let reply = client
.reply_to_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await?;

info!(
group_id = group_id_str,
"Sent history sync reply in sync group {group_id_str}"
);
info!("Reply: {:?}", reply);
}
Commands::ProcessHistorySyncReply {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
let conn = client.store().conn().unwrap();
let provider = client.mls_provider().unwrap();
client.sync_welcomes(&conn).await.unwrap();
client.start_sync_worker(&provider).await.unwrap();
client
.process_sync_reply(&provider, DeviceSyncKind::MessageHistory)
.await?;

info!("History bundle downloaded and inserted into user DB")
}
Commands::ProcessConsentSyncReply {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
client
.process_sync_reply(&provider, DeviceSyncKind::Consent)
.await?;

info!("Consent bundle downloaded and inserted into user DB")
.send_sync_request(&provider, DeviceSyncKind::MessageHistory)
.await
.unwrap();
info!("Sent history sync request in sync group.")
}
Commands::ListHistorySyncMessages {} => {
let conn = client.store().conn()?;
let provider = client.mls_provider()?;
client.sync_welcomes(&conn).await?;
client.enable_sync(&provider).await?;
let group = client.get_sync_group()?;
let group_id_str = hex::encode(group.group_id.clone());
group.sync().await?;
Expand Down Expand Up @@ -504,7 +460,7 @@ async fn main() -> color_eyre::eyre::Result<()> {
Ok(())
}

async fn create_client<C: XmtpApi>(
async fn create_client<C: XmtpApi + 'static>(
cli: &Cli,
account: IdentityStrategy,
grpc: C,
Expand All @@ -531,7 +487,7 @@ async fn register<C>(
client: C,
) -> Result<(), CliError>
where
C: XmtpApi,
C: XmtpApi + 'static,
{
let w: Wallet = if let Some(seed_phrase) = maybe_seed_phrase {
Wallet::LocalWallet(
Expand Down
20 changes: 10 additions & 10 deletions xmtp_mls/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum ClientBuilderError {
GroupError(#[from] crate::groups::GroupError),
#[error(transparent)]
ApiError(#[from] xmtp_proto::api_client::Error),
#[error(transparent)]
DeviceSync(#[from] crate::groups::device_sync::DeviceSyncError),
}

pub struct ClientBuilder<ApiClient, V = RemoteSignatureVerifier<ApiClient>> {
Expand Down Expand Up @@ -108,8 +110,8 @@ impl<ApiClient, V> ClientBuilder<ApiClient, V> {

impl<ApiClient, V> ClientBuilder<ApiClient, V>
where
ApiClient: XmtpApi,
V: SmartContractSignatureVerifier,
ApiClient: XmtpApi + 'static,
V: SmartContractSignatureVerifier + 'static,
{
/// Build with a custom smart contract wallet verifier
pub async fn build_with_verifier(self) -> Result<Client<ApiClient, V>, ClientBuilderError> {
Expand All @@ -120,7 +122,7 @@ where

impl<ApiClient> ClientBuilder<ApiClient, RemoteSignatureVerifier<ApiClient>>
where
ApiClient: XmtpApi,
ApiClient: XmtpApi + 'static,
{
/// Build with the default [`RemoteSignatureVerifier`]
pub async fn build(self) -> Result<Client<ApiClient>, ClientBuilderError> {
Expand Down Expand Up @@ -161,8 +163,8 @@ async fn inner_build<C, V>(
api_client: Arc<C>,
) -> Result<Client<C, V>, ClientBuilderError>
where
C: XmtpApi,
V: SmartContractSignatureVerifier,
C: XmtpApi + 'static,
V: SmartContractSignatureVerifier + 'static,
{
let ClientBuilder {
mut store,
Expand Down Expand Up @@ -198,15 +200,13 @@ where
)
.await?;

let client = Client::new(
Ok(Client::new(
api_client_wrapper,
identity,
store,
scw_verifier,
history_sync_url,
);

Ok(client)
history_sync_url.clone(),
))
}

#[cfg(test)]
Expand Down
10 changes: 7 additions & 3 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ use crate::{
mutex_registry::MutexRegistry,
retry::Retry,
retry_async, retryable,
storage::group::GroupQueryArgs,
storage::{
consent_record::{ConsentState, ConsentType, StoredConsentRecord},
db_connection::DbConnection,
group::{GroupMembershipState, StoredGroup},
group::{GroupMembershipState, GroupQueryArgs, StoredGroup},
group_message::StoredGroupMessage,
refresh_state::EntityKind,
sql_key_store, EncryptedMessageStore, StorageError,
Expand Down Expand Up @@ -311,7 +310,8 @@ where
let intents = Arc::new(Intents {
context: context.clone(),
});
let (tx, _) = broadcast::channel(10);
let (tx, _) = broadcast::channel(32);

Self {
api_client: api_client.into(),
context,
Expand Down Expand Up @@ -350,6 +350,10 @@ where
self.context.mls_provider()
}

pub fn history_sync_url(&self) -> Option<&String> {
self.history_sync_url.as_ref()
}

/// Calls the server to look up the `inbox_id` associated with a given address
pub async fn find_inbox_id_from_address(
&self,
Expand Down
Loading
Loading