Skip to content

Commit

Permalink
Merge pull request #1203 from MutinyWallet/batched-hermes
Browse files Browse the repository at this point in the history
Claim zaps in batches
  • Loading branch information
TonyGiorgio authored Jun 6, 2024
2 parents 12d3a05 + 94ef7e8 commit 542a212
Showing 1 changed file with 46 additions and 9 deletions.
55 changes: 46 additions & 9 deletions mutiny-core/src/hermes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use fedimint_core::api::InviteCode;
use fedimint_core::config::FederationId;
use futures::{pin_mut, select, FutureExt};
use lightning::util::logger::Logger;
use lightning::{log_error, log_info, log_warn};
use lightning::{log_debug, log_error, log_info, log_warn};
use lightning_invoice::Bolt11Invoice;
use nostr::secp256k1::SecretKey;
use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, Tag, ToBech32};
use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, RelayMessage, Tag, ToBech32};
use nostr::{prelude::decrypt_received_private_zap_message, EventBuilder};
use nostr::{Filter, Kind, Timestamp};
use nostr_sdk::{Client, RelayPoolNotification};
Expand Down Expand Up @@ -102,7 +102,12 @@ impl<S: MutinyStorage> HermesClient<S> {
let client = Client::new(&keys);

client
.add_relays(RELAYS)
.add_relays(
// we are listening only, so no need to connect to blastr
RELAYS
.into_iter()
.filter(|r| *r != "wss://nostr.mutinywallet.com"),
)
.await
.expect("Failed to add relays");

Expand Down Expand Up @@ -267,6 +272,9 @@ impl<S: MutinyStorage> HermesClient<S> {

client.subscribe(vec![received_dm_filter], None).await;

let mut has_received_eose = false;
let mut batch_notifications: Vec<(EcashNotification, Timestamp)> = Vec::new();

let mut notifications = client.notifications();

loop {
Expand All @@ -283,8 +291,17 @@ impl<S: MutinyStorage> HermesClient<S> {
Kind::EncryptedDirectMessage => {
match decrypt_ecash_notification(&dm_key, event.pubkey, &event.content) {
Ok(notification) => {
if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
// if we have received an EOSE, we should immediately handle the notification
// otherwise we should wait until we have received an EOSE so we can do the initial batch
if has_received_eose {
if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
} else if let Err(e) = storage.set_dm_sync_time(event.created_at.as_u64(), true) { // save the last sync time after processing the notification
log_error!(logger, "Error saving last sync time: {e}");
}
} else {
log_debug!(logger, "Received ecash notification, adding to batch");
batch_notifications.push((notification, event.created_at));
}
},
Err(e) => {
Expand All @@ -296,7 +313,30 @@ impl<S: MutinyStorage> HermesClient<S> {
}
}
},
Ok(RelayPoolNotification::Message { .. }) => {}, // ignore messages
Ok(RelayPoolNotification::Message { message, .. }) => {
// if we receive an EOSE, we have received all the notifications from the relay
// and can now handle the batch
if let RelayMessage::EndOfStoredEvents(_) = message {
has_received_eose = true;
if !batch_notifications.is_empty() {
let mut max_created_at: Option<Timestamp> = None;
for (notification, created_at) in batch_notifications.drain(..) {
if let Err(e) = handle_ecash_notification(notification, created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
} else if max_created_at.is_none() || max_created_at.is_some_and(|x| x < created_at) {
max_created_at = Some(created_at);
}
}

// save the last sync time after the batch
if let Some(max_created_at) = max_created_at {
if let Err(e) = storage.set_dm_sync_time(max_created_at.as_u64(), true) {
log_error!(logger, "Error saving last sync time: {e}");
}
}
}
}
},
Ok(RelayPoolNotification::Shutdown) => break, // if we disconnect, we restart to reconnect
Ok(RelayPoolNotification::Stop) => {}, // Currently unused
Ok(RelayPoolNotification::RelayStatus { .. }) => {}, // Currently unused
Expand Down Expand Up @@ -731,9 +771,6 @@ async fn handle_ecash_notification<S: MutinyStorage>(
);
}

// save the last sync time
storage.set_dm_sync_time(created_at.as_u64(), true)?;

Ok(())
}

Expand Down

0 comments on commit 542a212

Please sign in to comment.