Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide timestamps in identity action updates #1073

Merged
merged 18 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,15 +1047,16 @@ impl FfiGroup {
Ok(ffi_message)
}

pub fn list_members(&self) -> Result<Vec<FfiGroupMember>, GenericError> {
pub async fn list_members(&self) -> Result<Vec<FfiGroupMember>, GenericError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let members: Vec<FfiGroupMember> = group
.members()?
.members(&self.inner_client)
.await?
.into_iter()
.map(|member| FfiGroupMember {
inbox_id: member.inbox_id,
Expand Down Expand Up @@ -2275,7 +2276,7 @@ mod tests {
.await
.unwrap();

let members = group.list_members().unwrap();
let members = group.list_members().await.unwrap();
assert_eq!(members.len(), 2);
}

Expand All @@ -2300,7 +2301,7 @@ mod tests {
.await
.unwrap();

let members = group.list_members().unwrap();
let members = group.list_members().await.unwrap();
assert_eq!(members.len(), 2);
assert_eq!(group.group_name().unwrap(), "Group Name");
assert_eq!(group.group_image_url_square().unwrap(), "url");
Expand Down Expand Up @@ -2564,10 +2565,10 @@ mod tests {
client2_group.sync().await.unwrap();

// Assert both clients see 2 members
let client1_members = client1_group.list_members().unwrap();
let client1_members = client1_group.list_members().await.unwrap();
assert_eq!(client1_members.len(), 2);

let client2_members = client2_group.list_members().unwrap();
let client2_members = client2_group.list_members().await.unwrap();
assert_eq!(client2_members.len(), 2);

// Drop and delete local database for client2
Expand All @@ -2585,12 +2586,12 @@ mod tests {
.unwrap();

// Assert client1 still sees 2 members
let client1_members = client1_group.list_members().unwrap();
let client1_members = client1_group.list_members().await.unwrap();
assert_eq!(client1_members.len(), 2);

client2.conversations().sync().await.unwrap();
let client2_group = client2.group(group.id()).unwrap();
let client2_members = client2_group.list_members().unwrap();
let client2_members = client2_group.list_members().await.unwrap();
assert_eq!(client2_members.len(), 2);
}

Expand Down Expand Up @@ -2838,11 +2839,11 @@ mod tests {
.unwrap();

bo_group.sync().await.unwrap();
let bo_members = bo_group.list_members().unwrap();
let bo_members = bo_group.list_members().await.unwrap();
assert_eq!(bo_members.len(), 4);

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
let alix_members = alix_group.list_members().await.unwrap();
assert_eq!(alix_members.len(), 4);
}

Expand All @@ -2864,11 +2865,11 @@ mod tests {
let bo_group = bo.group(alix_group.id()).unwrap();

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
let alix_members = alix_group.list_members().await.unwrap();
assert_eq!(alix_members.len(), 2);

bo_group.sync().await.unwrap();
let bo_members = bo_group.list_members().unwrap();
let bo_members = bo_group.list_members().await.unwrap();
assert_eq!(bo_members.len(), 2);

let bo_messages = bo_group
Expand All @@ -2892,11 +2893,11 @@ mod tests {
assert!(bo_messages.first().unwrap().kind == FfiGroupMessageKind::MembershipChange);
assert_eq!(bo_messages.len(), 1);

let bo_members = bo_group.list_members().unwrap();
let bo_members = bo_group.list_members().await.unwrap();
assert_eq!(bo_members.len(), 1);

alix_group.sync().await.unwrap();
let alix_members = alix_group.list_members().unwrap();
let alix_members = alix_group.list_members().await.unwrap();
assert_eq!(alix_members.len(), 1);
}

Expand Down Expand Up @@ -3736,6 +3737,7 @@ mod tests {

if let Some(member) = alix_group
.list_members()
.await
.unwrap()
.iter()
.find(|&m| m.inbox_id == bo.inbox_id())
Expand Down
5 changes: 3 additions & 2 deletions bindings_node/src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,16 @@ impl NapiGroup {
}

#[napi]
pub fn list_members(&self) -> Result<Vec<NapiGroupMember>> {
pub async fn list_members(&self) -> Result<Vec<NapiGroupMember>> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let members: Vec<NapiGroupMember> = group
.members()
.members(&self.inner_client)
.await
.map_err(ErrorWrapper::from)?
.into_iter()
.map(|member| NapiGroupMember {
Expand Down
29 changes: 23 additions & 6 deletions xmtp_id/src/associations/association_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub trait IdentityAction: Send + 'static {
fn update_state(
&self,
existing_state: Option<AssociationState>,
client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError>;
fn signatures(&self) -> Vec<Vec<u8>>;
fn replay_check(&self, state: &AssociationState) -> Result<(), AssociationError> {
Expand All @@ -66,6 +67,7 @@ impl IdentityAction for CreateInbox {
fn update_state(
&self,
existing_state: Option<AssociationState>,
_client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError> {
if existing_state.is_some() {
return Err(AssociationError::MultipleCreate);
Expand Down Expand Up @@ -106,6 +108,7 @@ impl IdentityAction for AddAssociation {
fn update_state(
&self,
maybe_existing_state: Option<AssociationState>,
client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError> {
let existing_state = maybe_existing_state.ok_or(AssociationError::NotCreated)?;
self.replay_check(&existing_state)?;
Expand Down Expand Up @@ -176,7 +179,11 @@ impl IdentityAction for AddAssociation {
self.new_member_identifier.kind(),
)?;

let new_member = Member::new(new_member_address.clone(), Some(existing_entity_id));
let new_member = Member::new(
new_member_address.clone(),
Some(existing_entity_id),
Some(client_timestamp_ns),
);

Ok(existing_state.add(new_member))
}
Expand All @@ -200,6 +207,7 @@ impl IdentityAction for RevokeAssociation {
fn update_state(
&self,
maybe_existing_state: Option<AssociationState>,
_client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError> {
let existing_state = maybe_existing_state.ok_or(AssociationError::NotCreated)?;
self.replay_check(&existing_state)?;
Expand Down Expand Up @@ -255,6 +263,7 @@ impl IdentityAction for ChangeRecoveryAddress {
fn update_state(
&self,
existing_state: Option<AssociationState>,
_client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError> {
let existing_state = existing_state.ok_or(AssociationError::NotCreated)?;
self.replay_check(&existing_state)?;
Expand Down Expand Up @@ -292,12 +301,19 @@ impl IdentityAction for Action {
fn update_state(
&self,
existing_state: Option<AssociationState>,
client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError> {
match self {
Action::CreateInbox(event) => event.update_state(existing_state),
Action::AddAssociation(event) => event.update_state(existing_state),
Action::RevokeAssociation(event) => event.update_state(existing_state),
Action::ChangeRecoveryAddress(event) => event.update_state(existing_state),
Action::CreateInbox(event) => event.update_state(existing_state, client_timestamp_ns),
Action::AddAssociation(event) => {
event.update_state(existing_state, client_timestamp_ns)
}
Action::RevokeAssociation(event) => {
event.update_state(existing_state, client_timestamp_ns)
}
Action::ChangeRecoveryAddress(event) => {
event.update_state(existing_state, client_timestamp_ns)
}
}
}

Expand Down Expand Up @@ -333,10 +349,11 @@ impl IdentityAction for IdentityUpdate {
fn update_state(
&self,
existing_state: Option<AssociationState>,
_client_timestamp_ns: u64,
) -> Result<AssociationState, AssociationError> {
let mut state = existing_state.clone();
for action in &self.actions {
state = Some(action.update_state(state)?);
state = Some(action.update_state(state, self.client_timestamp_ns)?);
}

let new_state = state.ok_or(AssociationError::NotCreated)?;
Expand Down
9 changes: 8 additions & 1 deletion xmtp_id/src/associations/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ impl From<Vec<u8>> for MemberIdentifier {
pub struct Member {
pub identifier: MemberIdentifier,
pub added_by_entity: Option<MemberIdentifier>,
pub client_timestamp_ns: Option<u64>,
}

impl Member {
pub fn new(identifier: MemberIdentifier, added_by_entity: Option<MemberIdentifier>) -> Self {
pub fn new(
identifier: MemberIdentifier,
added_by_entity: Option<MemberIdentifier>,
client_timestamp_ns: Option<u64>,
) -> Self {
Self {
identifier,
added_by_entity,
client_timestamp_ns,
}
}

Expand Down Expand Up @@ -118,6 +124,7 @@ mod tests {
Self {
identifier: MemberIdentifier::default(),
added_by_entity: None,
client_timestamp_ns: None,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions xmtp_id/src/associations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn apply_update(
initial_state: AssociationState,
update: IdentityUpdate,
) -> Result<AssociationState, AssociationError> {
update.update_state(Some(initial_state))
update.update_state(Some(initial_state), update.client_timestamp_ns)
}

// Get the current state from an array of `IdentityUpdate`s. Entire operation fails if any operation fails
Expand All @@ -32,7 +32,7 @@ pub fn get_state<Updates: AsRef<[IdentityUpdate]>>(
) -> Result<AssociationState, AssociationError> {
let mut state = None;
for update in updates.as_ref().iter() {
let res = update.update_state(state);
let res = update.update_state(state, update.client_timestamp_ns);
state = Some(res?);
}

Expand Down
2 changes: 2 additions & 0 deletions xmtp_id/src/associations/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl From<Member> for MemberProto {
MemberProto {
identifier: Some(member.identifier.into()),
added_by_entity: member.added_by_entity.map(Into::into),
client_timestamp_ns: member.client_timestamp_ns,
}
}
}
Expand All @@ -332,6 +333,7 @@ impl TryFrom<MemberProto> for Member {
.ok_or(DeserializationError::MissingMemberIdentifier)?
.try_into()?,
added_by_entity: proto.added_by_entity.map(TryInto::try_into).transpose()?,
client_timestamp_ns: proto.client_timestamp_ns,
})
}
}
Expand Down
8 changes: 2 additions & 6 deletions xmtp_id/src/associations/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,9 @@ impl AssociationState {
pub fn new(account_address: String, nonce: u64) -> Self {
let inbox_id = generate_inbox_id(&account_address, &nonce);
let identifier = MemberIdentifier::Address(account_address.clone());
let new_member = Member::new(identifier.clone(), None);
let new_member = Member::new(identifier.clone(), None, None);
Self {
members: {
let mut members = HashMap::new();
members.insert(identifier, new_member);
members
},
members: HashMap::from_iter([(identifier, new_member)]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup

seen_signatures: HashSet::new(),
recovery_address: account_address.to_lowercase(),
inbox_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DELETE FROM association_state;
4 changes: 2 additions & 2 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,14 +1045,14 @@ mod tests {
.add_members_by_inbox_id(&amal, vec![bola.inbox_id()])
.await
.unwrap();
assert_eq!(amal_group.members().unwrap().len(), 2);
assert_eq!(amal_group.members(&amal).await.unwrap().len(), 2);

// Now remove bola
amal_group
.remove_members_by_inbox_id(&amal, vec![bola.inbox_id()])
.await
.unwrap();
assert_eq!(amal_group.members().unwrap().len(), 1);
assert_eq!(amal_group.members(&amal).await.unwrap().len(), 1);
log::info!("Syncing bolas welcomes");
// See if Bola can see that they were added to the group
bola.sync_welcomes().await.unwrap();
Expand Down
40 changes: 27 additions & 13 deletions xmtp_mls/src/groups/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
consent_record::{ConsentState, ConsentType},
},
xmtp_openmls_provider::XmtpOpenMlsProvider,
Client, XmtpApi,
};

#[derive(Debug, Clone)]
Expand All @@ -28,13 +29,17 @@ pub enum PermissionLevel {

impl MlsGroup {
// Load the member list for the group from the DB, merging together multiple installations into a single entry
pub fn members(&self) -> Result<Vec<GroupMember>, GroupError> {
pub async fn members<ApiClient: XmtpApi>(
&self,
client: &Client<ApiClient>,
) -> Result<Vec<GroupMember>, GroupError> {
let provider = self.mls_provider()?;
self.members_with_provider(&provider)
self.members_with_provider(client, &provider).await
}

pub fn members_with_provider(
pub async fn members_with_provider<ApiClient: XmtpApi>(
&self,
client: &Client<ApiClient>,
provider: &XmtpOpenMlsProvider,
) -> Result<Vec<GroupMember>, GroupError> {
let openmls_group = self.load_mls_group(provider)?;
Expand All @@ -48,19 +53,28 @@ impl MlsGroup {
.collect::<Vec<_>>();

let conn = provider.conn_ref();
let association_states =
let mut association_states =
StoredAssociationState::batch_read_from_cache(conn, requests.clone())?;
let mutable_metadata = self.mutable_metadata(provider)?;
if association_states.len() != requests.len() {
// Cache miss - not expected to happen because:
// 1. We don't allow updates to the group metadata unless we have already validated the association state
// 2. When validating the association state, we must have written it to the cache
log::error!(
"Failed to load all members for group - metadata: {:?}, computed members: {:?}",
requests,
association_states
);
return Err(GroupError::InvalidGroupMembership);
// Attempt to rebuild the cache before erroring out due to a cache miss.
let requests: Vec<_> = requests
.into_iter()
.map(|(id, sequence)| (id, Some(sequence)))
codabrink marked this conversation as resolved.
Show resolved Hide resolved
.collect();
association_states = client.batch_get_association_state(conn, &requests).await?;

if association_states.len() != requests.len() {
// Cache miss - not expected to happen because:
// 1. We don't allow updates to the group metadata unless we have already validated the association state
// 2. When validating the association state, we must have written it to the cache
log::error!(
"Failed to load all members for group - metadata: {:?}, computed members: {:?}",
requests,
association_states
);
return Err(GroupError::InvalidGroupMembership);
}
}
let members = association_states
.into_iter()
Expand Down
Loading
Loading