Skip to content

Commit

Permalink
Claim zaps in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Jun 6, 2024
1 parent 064f407 commit ed3731a
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions mutiny-core/src/hermes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use bitcoin::{bip32::ExtendedPrivKey, secp256k1::Secp256k1};
use fedimint_core::config::FederationId;
use futures::{pin_mut, select, FutureExt};
use lightning::util::logger::Logger;
use lightning::{log_error, log_info, log_warn};
use lightning::{log_debug, log_error, log_info, log_warn};
use lightning_invoice::Bolt11Invoice;
use nostr::secp256k1::SecretKey;
use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, Tag, ToBech32};
use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, RelayMessage, Tag, ToBech32};
use nostr::{prelude::decrypt_received_private_zap_message, EventBuilder};
use nostr::{Filter, Kind, Timestamp};
use nostr_sdk::{Client, RelayPoolNotification};
Expand Down Expand Up @@ -101,7 +101,12 @@ impl<S: MutinyStorage> HermesClient<S> {
let client = Client::new(&keys);

client
.add_relays(RELAYS)
.add_relays(
// we are listening only, so no need to connect to blastr
RELAYS
.into_iter()
.filter(|r| *r != "wss://nostr.mutinywallet.com"),
)
.await
.expect("Failed to add relays");

Expand Down Expand Up @@ -266,6 +271,9 @@ impl<S: MutinyStorage> HermesClient<S> {

client.subscribe(vec![received_dm_filter], None).await;

let mut has_received_eose = false;
let mut batch_notifications: Vec<(EcashNotification, Timestamp)> = Vec::new();

let mut notifications = client.notifications();

loop {
Expand All @@ -282,8 +290,15 @@ impl<S: MutinyStorage> HermesClient<S> {
Kind::EncryptedDirectMessage => {
match decrypt_ecash_notification(&dm_key, event.pubkey, &event.content) {
Ok(notification) => {
if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
// if we have received an EOSE, we should immediately handle the notification
// otherwise we should wait until we have received an EOSE so we can do the initial batch
if has_received_eose {
if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
}
} else {
log_debug!(logger, "Received ecash notification, adding to batch");
batch_notifications.push((notification, event.created_at));
}
},
Err(e) => {
Expand All @@ -295,7 +310,18 @@ impl<S: MutinyStorage> HermesClient<S> {
}
}
},
Ok(RelayPoolNotification::Message { .. }) => {}, // ignore messages
Ok(RelayPoolNotification::Message { message, .. }) => {
// if we receive an EOSE, we have received all the notifications from the relay
// and can now handle the batch
if let RelayMessage::EndOfStoredEvents(_) = message {
has_received_eose = true;
for (notification, created_at) in batch_notifications.drain(..) {
if let Err(e) = handle_ecash_notification(notification, created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
}
}
}
},
Ok(RelayPoolNotification::Shutdown) => break, // if we disconnect, we restart to reconnect
Ok(RelayPoolNotification::Stop) => {}, // Currently unused
Ok(RelayPoolNotification::RelayStatus { .. }) => {}, // Currently unused
Expand Down

0 comments on commit ed3731a

Please sign in to comment.