Skip to content

Commit

Permalink
Adding group streaming logs (#1092)
Browse files Browse the repository at this point in the history
* add additional logging

* add more logs
  • Loading branch information
nplasterer authored Sep 26, 2024
1 parent cd0fc5c commit 885c7b7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 0 deletions.
5 changes: 5 additions & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl MlsGroup {
added_by_inbox: String,
welcome_id: i64,
) -> Result<Self, GroupError> {
log::info!("Creating from welcome");
let mls_welcome =
StagedWelcome::new_from_welcome(provider, &build_group_join_config(), welcome, None)?;

Expand Down Expand Up @@ -386,6 +387,7 @@ impl MlsGroup {
encrypted_welcome_bytes: Vec<u8>,
welcome_id: i64,
) -> Result<Self, GroupError> {
log::info!("Trying to decrypt welcome");
let welcome_bytes = decrypt_welcome(provider, hpke_public_key, &encrypted_welcome_bytes)?;

let welcome = deserialize_welcome(&welcome_bytes)?;
Expand All @@ -396,6 +398,7 @@ impl MlsGroup {
ProcessedWelcome::new_from_welcome(provider, &join_config, welcome.clone())?;
let psks = processed_welcome.psks();
if !psks.is_empty() {
log::error!("No PSK support for welcome");
return Err(GroupError::NoPSKSupport);
}
let staged_welcome = processed_welcome.into_staged_welcome(provider, None)?;
Expand Down Expand Up @@ -1258,6 +1261,7 @@ async fn validate_initial_group_membership<ApiClient: XmtpApi>(
conn: &DbConnection,
mls_group: &OpenMlsGroup,
) -> Result<(), GroupError> {
log::info!("Validating initial group membership");
let membership = extract_group_membership(mls_group.extensions())?;
let needs_update = client.filter_inbox_ids_needing_updates(conn, membership.to_filters())?;
if !needs_update.is_empty() {
Expand Down Expand Up @@ -1289,6 +1293,7 @@ async fn validate_initial_group_membership<ApiClient: XmtpApi>(
return Err(GroupError::InvalidGroupMembership);
}

log::info!("Group membership validated");
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions xmtp_mls/src/storage/encrypted_store/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl DbConnection {
}

pub fn insert_or_replace_group(&self, group: StoredGroup) -> Result<StoredGroup, StorageError> {
log::info!("Trying to insert group");
let stored_group = self.raw_query(|conn| {
let maybe_inserted_group: Option<StoredGroup> = diesel::insert_into(dsl::groups)
.values(&group)
Expand All @@ -222,14 +223,18 @@ impl DbConnection {
if maybe_inserted_group.is_none() {
let existing_group: StoredGroup = dsl::groups.find(group.id).first(conn)?;
if existing_group.welcome_id == group.welcome_id {
log::info!("Group welcome id already exists");
// Error so OpenMLS db transaction are rolled back on duplicate welcomes
return Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
Box::new("welcome id already exists".to_string()),
));
} else {
log::info!("Group already exists");
return Ok(existing_group);
}
} else {
log::info!("Group is inserted");
}

match maybe_inserted_group {
Expand Down
5 changes: 5 additions & 0 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ where
let creation_result = retry_async!(
Retry::default(),
(async {
log::info!("Trying to process streamed welcome");
let welcome_v1 = welcome_v1.clone();
self.context
.store
Expand Down Expand Up @@ -147,6 +148,7 @@ where
let installation_key = self.installation_public_key();
let id_cursor = 0;

log::info!("Setting up conversation stream");
let subscription = self
.api_client
.subscribe_welcome_messages(installation_key, Some(id_cursor))
Expand Down Expand Up @@ -242,6 +244,7 @@ where
futures::pin_mut!(stream);
let _ = tx.send(());
while let Some(convo) = stream.next().await {
log::info!("Trigger conversation callback");
convo_callback(convo)
}
log::debug!("`stream_conversations` stream ended, dropping stream");
Expand Down Expand Up @@ -298,6 +301,7 @@ where
.await?;
futures::pin_mut!(messages_stream);

log::info!("Setting up conversation stream in stream_all_messages");
let convo_stream = self.stream_conversations().await?;
futures::pin_mut!(convo_stream);

Expand All @@ -321,6 +325,7 @@ where
yield Ok(message);
}
Some(new_group) = convo_stream.next() => {
log::info!("Received new conversation inside streamAllMessages");
if group_id_to_info.contains_key(&new_group.group_id) {
continue;
}
Expand Down

0 comments on commit 885c7b7

Please sign in to comment.