From 6f70a63c70e4517d58fbdd4992db41174564a176 Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Tue, 23 Jul 2024 15:41:39 +0200 Subject: [PATCH 1/9] feat: Configurable sync_notifs backoff --- core/payment/examples/payment_api.rs | 12 ++++++-- core/payment/src/config.rs | 41 ++++++++++++++++++++++++++++ core/payment/src/lib.rs | 6 +++- core/payment/src/payment_sync.rs | 27 ++++++++++++------ core/payment/src/service.rs | 19 ++++++++++--- 5 files changed, 88 insertions(+), 17 deletions(-) create mode 100644 core/payment/src/config.rs diff --git a/core/payment/examples/payment_api.rs b/core/payment/examples/payment_api.rs index b60563326c..93dd3f764c 100644 --- a/core/payment/examples/payment_api.rs +++ b/core/payment/examples/payment_api.rs @@ -23,8 +23,9 @@ use ya_core_model::driver::{driver_bus_id, AccountMode, Fund, Init}; use ya_core_model::identity; use ya_dummy_driver as dummy; use ya_erc20_driver as erc20; -use ya_net::Config; +use ya_net::Config as NetConfig; use ya_payment::processor::PaymentProcessor; +use ya_payment::Config as PaymentConfig; use ya_payment::{migrations, utils, PaymentService}; use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::auth::dummy::DummyAuth; @@ -230,7 +231,12 @@ async fn main() -> anyhow::Result<()> { log::debug!("bind_gsb_router()"); let processor = Arc::new(PaymentProcessor::new(db.clone())); - ya_payment::service::bind_service(&db, processor, BindOptions::default().run_sync_job(false)); + ya_payment::service::bind_service( + &db, + processor, + BindOptions::default().run_sync_job(false), + Arc::new(PaymentConfig::from_env()?), + ); log::debug!("bind_service()"); bus::bind(identity::BUS_ID, { @@ -342,7 +348,7 @@ async fn main() -> anyhow::Result<()> { log::info!("bind remote..."); ya_net::hybrid::start_network( - Arc::new(Config::from_env()?), + Arc::new(NetConfig::from_env()?), provider_id, vec![provider_id, requestor_id], ) diff --git a/core/payment/src/config.rs b/core/payment/src/config.rs new file mode 100644 index 0000000000..9e8fab3574 --- /dev/null +++ b/core/payment/src/config.rs @@ -0,0 +1,41 @@ +use structopt::*; + +#[derive(StructOpt, Clone)] +pub struct Config { + #[structopt(flatten)] + pub sync_notif_backoff: SyncNotifBackoffConfig, +} + +#[derive(StructOpt, Clone)] +pub struct SyncNotifBackoffConfig { + #[structopt(long, env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_INITIAL_DELAY", parse(try_from_str = humantime::parse_duration), default_value = "30s")] + pub initial_delay: std::time::Duration, + + #[structopt( + long, + env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_EXPONENT", + default_value = "6" + )] + pub exponent: f64, + + #[structopt( + long, + env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_MAX_RETRIES", + default_value = "7" + )] + pub max_retries: u32, + + #[structopt(long, env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_CAP", parse(try_from_str = humantime::parse_duration))] + pub cap: Option, + + #[structopt(long, env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_ERROR_DELAY", parse(try_from_str = humantime::parse_duration), default_value = "10m")] + pub error_delay: std::time::Duration, +} + +impl Config { + pub fn from_env() -> Result { + // Empty command line arguments, because we want to use ENV fallback + // or default values if ENV variables are not set. + Config::from_iter_safe(&[""]) + } +} diff --git a/core/payment/src/lib.rs b/core/payment/src/lib.rs index 8a24ee5af1..41b3d1e39d 100644 --- a/core/payment/src/lib.rs +++ b/core/payment/src/lib.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] // Crate under development #![allow(unused_variables)] // Crate under development +pub use crate::config::Config; use crate::processor::PaymentProcessor; use futures::FutureExt; use service::BindOptions; @@ -15,6 +16,7 @@ extern crate diesel; pub mod accounts; pub mod api; mod cli; +pub mod config; pub mod dao; pub mod error; pub mod models; @@ -53,8 +55,10 @@ impl PaymentService { let db = context.component(); db.apply_migration(migrations::run_with_output)?; + let config = Arc::new(Config::from_env()?); + let processor = Arc::new(PaymentProcessor::new(db.clone())); - self::service::bind_service(&db, processor.clone(), BindOptions::default()); + self::service::bind_service(&db, processor.clone(), BindOptions::default(), config); tokio::task::spawn(async move { processor.release_allocations(false).await; diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index e1b97fdf36..f49eea4cb1 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -1,5 +1,8 @@ +use std::sync::Arc; use std::{collections::HashSet, time::Duration}; +use crate::Config; + use chrono::Utc; use tokio::sync::Notify; use ya_client_model::{ @@ -25,9 +28,6 @@ use ya_service_bus::{timeout::IntoTimeoutFuture, typed, Error, RpcEndpoint}; use crate::dao::{DebitNoteDao, InvoiceDao, InvoiceEventDao, PaymentDao, SyncNotifsDao}; -const SYNC_NOTIF_DELAY_0: Duration = Duration::from_secs(30); -const SYNC_NOTIF_RATIO: u32 = 6; -const SYNC_NOTIF_MAX_RETRIES: u32 = 7; const REMOTE_CALL_TIMEOUT: Duration = Duration::from_secs(30); async fn payment_sync( @@ -152,10 +152,19 @@ async fn mark_all_sent(db: &DbExecutor, msg: PaymentSync) -> anyhow::Result<()> Ok(()) } -async fn send_sync_notifs(db: &DbExecutor) -> anyhow::Result> { +async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result> { let dao: SyncNotifsDao = db.as_dao(); + let backoff_config = &config.sync_notif_backoff; - let exp_backoff = |n| SYNC_NOTIF_DELAY_0 * SYNC_NOTIF_RATIO.pow(n); + let exp_backoff = |n| { + let secs = backoff_config.initial_delay * backoff_config.exponent.powi(n) as u32; + let capped: Duration = if let Some(cap) = backoff_config.cap { + ::std::cmp::min(cap, secs) + } else { + secs + }; + capped + }; let cutoff = Utc::now(); let default_identity = typed::service(identity::BUS_ID) @@ -183,7 +192,7 @@ async fn send_sync_notifs(db: &DbExecutor) -> anyhow::Result> { .into_iter() .filter(|entry| { let next_deadline = entry.last_ping + exp_backoff(entry.retries as _); - next_deadline.and_utc() < cutoff && entry.retries <= SYNC_NOTIF_MAX_RETRIES as i32 + next_deadline.and_utc() <= cutoff && entry.retries <= backoff_config.max_retries as i32 }) .map(|entry| entry.id) .collect::>(); @@ -220,11 +229,11 @@ lazy_static::lazy_static! { pub static ref SYNC_NOTIFS_NOTIFY: Notify = Notify::new(); } -pub fn send_sync_notifs_job(db: DbExecutor) { - let sleep_on_error = Duration::from_secs(3600); +pub fn send_sync_notifs_job(db: DbExecutor, config: Arc) { + let sleep_on_error = config.sync_notif_backoff.error_delay; tokio::task::spawn_local(async move { loop { - let sleep_for = match send_sync_notifs(&db).await { + let sleep_for = match send_sync_notifs(&db, &config).await { Err(e) => { log::error!("PaymentSyncNeeded sendout job failed: {e}"); sleep_on_error diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index 675c9928ce..de917875c5 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -1,5 +1,6 @@ use crate::dao::{DebitNoteDao, InvoiceDao}; use crate::processor::PaymentProcessor; +use crate::Config; use futures::prelude::*; use metrics::counter; @@ -35,11 +36,16 @@ impl Default for BindOptions { } } -pub fn bind_service(db: &DbExecutor, processor: Arc, opts: BindOptions) { +pub fn bind_service( + db: &DbExecutor, + processor: Arc, + opts: BindOptions, + config: Arc, +) { log::debug!("Binding payment service to service bus"); local::bind_service(db, processor.clone()); - public::bind_service(db, processor, opts); + public::bind_service(db, processor, opts, config); log::debug!("Successfully bound payment service to service bus"); } @@ -825,7 +831,12 @@ mod public { use ya_core_model::payment::public::*; use ya_persistence::types::Role; - pub fn bind_service(db: &DbExecutor, processor: Arc, opts: BindOptions) { + pub fn bind_service( + db: &DbExecutor, + processor: Arc, + opts: BindOptions, + config: Arc, + ) { log::debug!("Binding payment public service to service bus"); ServiceBinder::new(BUS_ID, db, processor) @@ -843,7 +854,7 @@ mod public { .bind_with_processor(sync_payment); if opts.run_sync_job { - send_sync_notifs_job(db.clone()); + send_sync_notifs_job(db.clone(), config); send_sync_requests(db.clone()); } From 7d31c5874c735e24b8ed8a5318fdea9d881ece1c Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Thu, 25 Jul 2024 22:38:06 +0200 Subject: [PATCH 2/9] fix: correctly select and send payment and invoices syncs --- core/payment/src/dao/debit_note.rs | 7 +- core/payment/src/dao/invoice.rs | 14 +++- core/payment/src/payment_sync.rs | 120 ++++++++++++++++++++++------- 3 files changed, 112 insertions(+), 29 deletions(-) diff --git a/core/payment/src/dao/debit_note.rs b/core/payment/src/dao/debit_note.rs index 873c9e061d..a6352b2c93 100644 --- a/core/payment/src/dao/debit_note.rs +++ b/core/payment/src/dao/debit_note.rs @@ -344,12 +344,17 @@ impl<'c> DebitNoteDao<'c> { } /// Lists debit notes with send_accept - pub async fn unsent_accepted(&self, owner_id: NodeId) -> DbResult> { + pub async fn unsent_accepted( + &self, + owner_id: NodeId, + peer_id: NodeId, + ) -> DbResult> { readonly_transaction(self.pool, "debit_note_unsent_accepted", move |conn| { let read: Vec = query!() .filter(dsl::owner_id.eq(owner_id)) .filter(dsl::send_accept.eq(true)) .filter(dsl::status.eq(DocumentStatus::Accepted.to_string())) + .filter(agreement_dsl::peer_id.eq(peer_id)) .order_by(dsl::timestamp.desc()) .load(conn)?; let mut debit_notes = Vec::new(); diff --git a/core/payment/src/dao/invoice.rs b/core/payment/src/dao/invoice.rs index 429357bbd5..a02750ba7c 100644 --- a/core/payment/src/dao/invoice.rs +++ b/core/payment/src/dao/invoice.rs @@ -364,12 +364,17 @@ impl<'c> InvoiceDao<'c> { } /// Lists invoices with send_accept - pub async fn unsent_accepted(&self, owner_id: NodeId) -> DbResult> { + pub async fn unsent_accepted( + &self, + owner_id: NodeId, + peer_id: NodeId, + ) -> DbResult> { readonly_transaction(self.pool, "invoice_dao_unsent_accepted", move |conn| { let invoices: Vec = query!() .filter(dsl::owner_id.eq(owner_id)) .filter(dsl::send_accept.eq(true)) .filter(dsl::status.eq(DocumentStatus::Accepted.to_string())) + .filter(agreement_dsl::peer_id.eq(peer_id)) .load(conn)?; let activities = activity_dsl::pay_invoice_x_activity @@ -459,12 +464,17 @@ impl<'c> InvoiceDao<'c> { .await } - pub async fn unsent_rejected(&self, owner_id: NodeId) -> DbResult> { + pub async fn unsent_rejected( + &self, + owner_id: NodeId, + peer_id: NodeId, + ) -> DbResult> { readonly_transaction(self.pool, "unsent_rejected", move |conn| { let invoices: Vec = query!() .filter(dsl::owner_id.eq(owner_id)) .filter(dsl::send_reject.eq(true)) .filter(dsl::status.eq(DocumentStatus::Rejected.to_string())) + .filter(agreement_dsl::peer_id.eq(peer_id)) .load(conn)?; let activities = activity_dsl::pay_invoice_x_activity diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index f49eea4cb1..efdea12b74 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::{collections::HashSet, time::Duration}; +use ya_client_model::payment::Payment; use crate::Config; @@ -30,8 +31,24 @@ use crate::dao::{DebitNoteDao, InvoiceDao, InvoiceEventDao, PaymentDao, SyncNoti const REMOTE_CALL_TIMEOUT: Duration = Duration::from_secs(30); +fn remove_allocation_ids_from_payment(payment: &Payment) -> Payment { + // We remove allocation ID from syncs because allocations are not transferred to peers and + // their IDs would be unknown to the recipient. + let mut payment = payment.clone(); + for agreement_payment in &mut payment.agreement_payments.iter_mut() { + agreement_payment.allocation_id = None; + } + + for activity_payment in &mut payment.activity_payments.iter_mut() { + activity_payment.allocation_id = None; + } + + payment +} + async fn payment_sync( db: &DbExecutor, + current_node_id: NodeId, peer_id: NodeId, ) -> anyhow::Result<(PaymentSync, PaymentSyncWithBytes)> { let payment_dao: PaymentDao = db.as_dao(); @@ -45,6 +62,8 @@ async fn payment_sync( let platform_components = payment.payment_platform.split('-').collect::>(); let driver = &platform_components[0]; + let payment = remove_allocation_ids_from_payment(&payment); + let signature = typed::service(driver_bus_id(driver)) .send(SignPayment(payment.clone())) .await??; @@ -57,7 +76,10 @@ async fn payment_sync( } let mut invoice_accepts = Vec::default(); - for invoice in invoice_dao.unsent_accepted(peer_id).await? { + for invoice in invoice_dao + .unsent_accepted(current_node_id, peer_id) + .await? + { invoice_accepts.push(AcceptInvoice::new( invoice.invoice_id, Acceptance { @@ -69,7 +91,10 @@ async fn payment_sync( } let mut invoice_rejects = Vec::default(); - for invoice in invoice_dao.unsent_rejected(peer_id).await? { + for invoice in invoice_dao + .unsent_rejected(current_node_id, peer_id) + .await? + { let events = invoice_event_dao .get_for_invoice_id( invoice.invoice_id.clone(), @@ -93,7 +118,10 @@ async fn payment_sync( } let mut debit_note_accepts = Vec::default(); - for debit_note in debit_note_dao.unsent_accepted(peer_id).await? { + for debit_note in debit_note_dao + .unsent_accepted(current_node_id, peer_id) + .await? + { debit_note_accepts.push(AcceptDebitNote::new( debit_note.debit_note_id, Acceptance { @@ -120,7 +148,7 @@ async fn payment_sync( )) } -async fn mark_all_sent(db: &DbExecutor, msg: PaymentSync) -> anyhow::Result<()> { +async fn mark_all_sent(db: &DbExecutor, owner_id: NodeId, msg: PaymentSync) -> anyhow::Result<()> { let payment_dao: PaymentDao = db.as_dao(); let invoice_dao: InvoiceDao = db.as_dao(); let debit_note_dao: DebitNoteDao = db.as_dao(); @@ -133,19 +161,19 @@ async fn mark_all_sent(db: &DbExecutor, msg: PaymentSync) -> anyhow::Result<()> for invoice_accept in msg.invoice_accepts { invoice_dao - .mark_accept_sent(invoice_accept.invoice_id, invoice_accept.issuer_id) + .mark_accept_sent(invoice_accept.invoice_id, owner_id) .await?; } for invoice_reject in msg.invoice_rejects { invoice_dao - .mark_reject_sent(invoice_reject.invoice_id, invoice_reject.issuer_id) + .mark_reject_sent(invoice_reject.invoice_id, owner_id) .await?; } for debit_note_accept in msg.debit_note_accepts { debit_note_dao - .mark_accept_sent(debit_note_accept.debit_note_id, debit_note_accept.issuer_id) + .mark_accept_sent(debit_note_accept.debit_note_id, owner_id) .await?; } @@ -173,19 +201,6 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result &cutoff) - .min() - .map(|ts| ts - cutoff) - .and_then(|dur| dur.to_std().ok()); - let peers_to_notify = dao .list() .await? @@ -198,15 +213,42 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result>(); for peer in peers_to_notify { - let (msg, msg_with_bytes) = payment_sync(db, peer).await?; + // FIXME: We should iterate over all identities present in the current instance or make + // payment_sync return a mapping identity -> msg and use the returned identity as the + // sender, or store notifying identity in SyncNotifsDao. + // Currently we assume that everything is sent from the default identity. + let (msg, msg_with_bytes) = payment_sync(db, default_identity, peer).await?; + log::debug!("Sending PaymentSync as [{default_identity}] to [{peer}]."); let mut result = ya_net::from(default_identity) .to(peer) .service(ya_core_model::payment::public::BUS_ID) .call(msg_with_bytes.clone()) .await; - if matches!(&result, Err(Error::GsbBadRequest(_))) { + log::debug!("Sending PaymentSync as [{default_identity}] to [{peer}] result: {result:?}"); + + // PaymentSyncWithBytes is newer message that won't always be supported, but it contains + // signatures that are crutial for clients that do support this message and rely on them + // for payment verification. + // For this reason we will try to send PaymentSyncWithBytes first and send the older + // PaymentSync only if the new message is not supported. + // + // Manual tests on centralnet show that the following errors are returned: + // if the endpoint is not supported + // Err(RemoteError("/net//payment/PaymentSyncWithBytes", "GSB failure: Bad request: endpoint address not found")) + // if the peer is not available + // Err(RemoteError("/net//payment/PaymentSyncWithBytes", "Bad request: endpoint address not found")) + // We'll Use presence of "GSB failure" message to distinguish them. + // + // We cannot just use any RemoteError or 'Bad request' as an indicator that old message + // should be sent, becaues that could cause sending not signed messages to newer clients in + // case of transient errors. + // TODO: is there any better way to know if the peer is connected but the endpoint is not + // handled? + if matches!(&result, Err(Error::RemoteError(_, e)) if e.contains("GSB failure: Bad request: endpoint address not found")) + { + log::debug!("Sending PaymentSync as [{default_identity}] to [{peer}]: PaymentSyncWithBytes not supported, falling back to PaymentSync."); result = ya_net::from(default_identity) .to(peer) .service(ya_core_model::payment::public::BUS_ID) @@ -215,14 +257,36 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result x.to_string(), + Ok(Err(x)) => x.to_string(), + Ok(Ok(_)) => unreachable!(), + }; + log::debug!("Couldn't deliver PaymentSync to [{peer}] as [{default_identity}]: {err}"); dao.increment_retry(peer, cutoff.naive_utc()).await?; } } - Ok(next_wakeup) + // Next sleep duration is calculated after all events were updated to pick up entries + // that were not delivered in current run. + let next_sleep_duration = dao + .list() + .await? + .iter() + .map(|entry| { + let next_deadline = entry.last_ping + exp_backoff(entry.retries as _); + next_deadline.and_utc() + }) + .filter(|deadline| deadline > &cutoff) + .min() + .map(|ts| ts - cutoff) + .and_then(|dur| dur.to_std().ok()); + + Ok(next_sleep_duration) } lazy_static::lazy_static! { @@ -239,8 +303,12 @@ pub fn send_sync_notifs_job(db: DbExecutor, config: Arc) { sleep_on_error } Ok(duration) => { - log::debug!("PaymentSyncNeeded sendout job done"); - duration.unwrap_or(sleep_on_error) + let sleep_duration = duration.unwrap_or(sleep_on_error); + log::debug!( + "PaymentSyncNeeded sendout job done, sleeping for {:?}", + sleep_duration + ); + sleep_duration } }; From 0c02456e12c689f9218c38b20f64a5e8c9d1d955 Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Thu, 25 Jul 2024 23:11:57 +0200 Subject: [PATCH 3/9] Feat: implement PaymentSyncWithBytes handling --- core/payment/src/service.rs | 60 +++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index de917875c5..f35966bbc3 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -851,7 +851,8 @@ mod public { .bind(sync_request) .bind_with_processor(send_payment) .bind_with_processor(send_payment_with_bytes) - .bind_with_processor(sync_payment); + .bind_with_processor(sync_payment) + .bind_with_processor(sync_payment_with_bytes); if opts.run_sync_job { send_sync_notifs_job(db.clone(), config); @@ -1370,10 +1371,57 @@ mod public { sender_id: String, msg: PaymentSync, ) -> Result { + sync_payment_impl( + db, + processor, + sender_id, + msg.payments, + send_payment, + msg.invoice_accepts, + msg.invoice_rejects, + msg.debit_note_accepts, + ) + .await + } + + async fn sync_payment_with_bytes( + db: DbExecutor, + processor: Arc, + sender_id: String, + msg: PaymentSyncWithBytes, + ) -> Result { + sync_payment_impl( + db, + processor, + sender_id, + msg.payments, + send_payment_with_bytes, + msg.invoice_accepts, + msg.invoice_rejects, + msg.debit_note_accepts, + ) + .await + } + + #[allow(clippy::too_many_arguments)] + async fn sync_payment_impl( + db: DbExecutor, + processor: Arc, + sender_id: String, + payments: Vec, + payment_processor_func: PaymentProcessorFunc, + invoice_accepts: Vec, + invoice_rejects: Vec, + debit_note_accepts: Vec, + ) -> Result + where + PaymentProcessorFunc: Fn(DbExecutor, Arc, String, PaymentType) -> Fut, + Fut: Future>, + { let mut errors = PaymentSyncError::default(); - for payment_send in msg.payments { - let result = send_payment( + for payment_send in payments { + let result = payment_processor_func( db.clone(), Arc::clone(&processor), sender_id.clone(), @@ -1386,21 +1434,21 @@ mod public { } } - for invoice_accept in msg.invoice_accepts { + for invoice_accept in invoice_accepts { let result = accept_invoice(db.clone(), sender_id.clone(), invoice_accept).await; if let Err(e) = result { errors.accept_errors.push(e); } } - for invoice_reject in msg.invoice_rejects { + for invoice_reject in invoice_rejects { let result = reject_invoice(db.clone(), sender_id.clone(), invoice_reject).await; if let Err(e) = result { errors.accept_errors.push(e); } } - for debit_note_accept in msg.debit_note_accepts { + for debit_note_accept in debit_note_accepts { let result = accept_debit_note(db.clone(), sender_id.clone(), debit_note_accept).await; if let Err(e) = result { errors.accept_errors.push(e); From 5a166bf979a7a190ae72dfa744da4a81fa17c615 Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Thu, 25 Jul 2024 23:29:26 +0200 Subject: [PATCH 4/9] fix: add sync entry for receiver instead of sender for delayed debit notes accepts --- core/payment/src/api/debit_notes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/payment/src/api/debit_notes.rs b/core/payment/src/api/debit_notes.rs index f5c1535224..bf60f12da1 100644 --- a/core/payment/src/api/debit_notes.rs +++ b/core/payment/src/api/debit_notes.rs @@ -461,7 +461,7 @@ async fn accept_debit_note( response?; } else { log::debug!("AcceptDebitNote not delivered"); - sync_dao.upsert(node_id).await?; + sync_dao.upsert(issuer_id).await?; SYNC_NOTIFS_NOTIFY.notify_one(); } From 962c28fdea309dc9e0ac53cb49172272a4d39a5e Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Thu, 25 Jul 2024 23:31:10 +0200 Subject: [PATCH 5/9] fix: Use full primary key when selecting payments for marking as sent --- core/payment/src/dao/payment.rs | 12 +++++++----- core/payment/src/payment_sync.rs | 2 +- core/payment/src/processor.rs | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/payment/src/dao/payment.rs b/core/payment/src/dao/payment.rs index 9c336d1d40..816355da91 100644 --- a/core/payment/src/dao/payment.rs +++ b/core/payment/src/dao/payment.rs @@ -159,12 +159,14 @@ impl<'c> PaymentDao<'c> { .await } - pub async fn mark_sent(&self, payment_id: String) -> DbResult<()> { + pub async fn mark_sent(&self, payment_id: String, owner_id: NodeId) -> DbResult<()> { do_with_transaction(self.pool, "payment_dao_mark_sent", move |conn| { - diesel::update(dsl::pay_payment.filter(dsl::id.eq(payment_id))) - .filter(dsl::role.eq(Role::Requestor)) - .set(dsl::send_payment.eq(false)) - .execute(conn)?; + diesel::update( + dsl::pay_payment.filter(dsl::id.eq(payment_id).and(dsl::owner_id.eq(owner_id))), + ) + .filter(dsl::role.eq(Role::Requestor)) + .set(dsl::send_payment.eq(false)) + .execute(conn)?; Ok(()) }) .await diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index efdea12b74..b18dc00de7 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -155,7 +155,7 @@ async fn mark_all_sent(db: &DbExecutor, owner_id: NodeId, msg: PaymentSync) -> a for payment_send in msg.payments { payment_dao - .mark_sent(payment_send.payment.payment_id) + .mark_sent(payment_send.payment.payment_id, owner_id) .await?; } diff --git a/core/payment/src/processor.rs b/core/payment/src/processor.rs index 75de2649ec..ac78f145b7 100644 --- a/core/payment/src/processor.rs +++ b/core/payment/src/processor.rs @@ -546,7 +546,7 @@ impl PaymentProcessor { }; if mark_sent { - payment_dao.mark_sent(payment_id).await.ok(); + payment_dao.mark_sent(payment_id, payer_id).await.ok(); } else { sync_dao.upsert(payee_id).await?; SYNC_NOTIFS_NOTIFY.notify_one(); From 7c87c6c7fa9b7a4d2c733d4ed0f705c277695597 Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Mon, 29 Jul 2024 11:32:53 +0200 Subject: [PATCH 6/9] PR fixes --- core/payment/src/dao/payment.rs | 12 ++- core/payment/src/payment_sync.rs | 145 ++++++++++++++----------------- core/payment/src/processor.rs | 10 +-- core/payment/src/utils.rs | 16 ++++ 4 files changed, 89 insertions(+), 94 deletions(-) diff --git a/core/payment/src/dao/payment.rs b/core/payment/src/dao/payment.rs index 816355da91..9c336d1d40 100644 --- a/core/payment/src/dao/payment.rs +++ b/core/payment/src/dao/payment.rs @@ -159,14 +159,12 @@ impl<'c> PaymentDao<'c> { .await } - pub async fn mark_sent(&self, payment_id: String, owner_id: NodeId) -> DbResult<()> { + pub async fn mark_sent(&self, payment_id: String) -> DbResult<()> { do_with_transaction(self.pool, "payment_dao_mark_sent", move |conn| { - diesel::update( - dsl::pay_payment.filter(dsl::id.eq(payment_id).and(dsl::owner_id.eq(owner_id))), - ) - .filter(dsl::role.eq(Role::Requestor)) - .set(dsl::send_payment.eq(false)) - .execute(conn)?; + diesel::update(dsl::pay_payment.filter(dsl::id.eq(payment_id))) + .filter(dsl::role.eq(Role::Requestor)) + .set(dsl::send_payment.eq(false)) + .execute(conn)?; Ok(()) }) .await diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index b18dc00de7..a06102f5cb 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use std::{collections::HashSet, time::Duration}; -use ya_client_model::payment::Payment; +use crate::utils::remove_allocation_ids_from_payment; use crate::Config; -use chrono::Utc; +use chrono::{DateTime, Utc}; use tokio::sync::Notify; use ya_client_model::{ payment::{Acceptance, InvoiceEventType}, @@ -25,30 +25,15 @@ use ya_core_model::{ }; use ya_net::RemoteEndpoint; use ya_persistence::executor::DbExecutor; -use ya_service_bus::{timeout::IntoTimeoutFuture, typed, Error, RpcEndpoint}; +use ya_service_bus::{timeout::IntoTimeoutFuture, typed, RpcEndpoint}; use crate::dao::{DebitNoteDao, InvoiceDao, InvoiceEventDao, PaymentDao, SyncNotifsDao}; const REMOTE_CALL_TIMEOUT: Duration = Duration::from_secs(30); -fn remove_allocation_ids_from_payment(payment: &Payment) -> Payment { - // We remove allocation ID from syncs because allocations are not transferred to peers and - // their IDs would be unknown to the recipient. - let mut payment = payment.clone(); - for agreement_payment in &mut payment.agreement_payments.iter_mut() { - agreement_payment.allocation_id = None; - } - - for activity_payment in &mut payment.activity_payments.iter_mut() { - activity_payment.allocation_id = None; - } - - payment -} - async fn payment_sync( db: &DbExecutor, - current_node_id: NodeId, + current: NodeId, peer_id: NodeId, ) -> anyhow::Result<(PaymentSync, PaymentSyncWithBytes)> { let payment_dao: PaymentDao = db.as_dao(); @@ -61,25 +46,23 @@ async fn payment_sync( for payment in payment_dao.list_unsent(Some(peer_id)).await? { let platform_components = payment.payment_platform.split('-').collect::>(); let driver = &platform_components[0]; + let bus_id = driver_bus_id(driver); - let payment = remove_allocation_ids_from_payment(&payment); + let payment = remove_allocation_ids_from_payment(payment); - let signature = typed::service(driver_bus_id(driver)) + let signature = typed::service(bus_id.clone()) .send(SignPayment(payment.clone())) .await??; payments.push(SendPayment::new(payment.clone(), signature)); - let signature_canonicalized = typed::service(driver_bus_id(driver)) + let signature_canonicalized = typed::service(bus_id.clone()) .send(SignPaymentCanonicalized(payment.clone())) .await??; payments_canonicalized.push(SendSignedPayment::new(payment, signature_canonicalized)); } let mut invoice_accepts = Vec::default(); - for invoice in invoice_dao - .unsent_accepted(current_node_id, peer_id) - .await? - { + for invoice in invoice_dao.unsent_accepted(current, peer_id).await? { invoice_accepts.push(AcceptInvoice::new( invoice.invoice_id, Acceptance { @@ -91,10 +74,7 @@ async fn payment_sync( } let mut invoice_rejects = Vec::default(); - for invoice in invoice_dao - .unsent_rejected(current_node_id, peer_id) - .await? - { + for invoice in invoice_dao.unsent_rejected(current, peer_id).await? { let events = invoice_event_dao .get_for_invoice_id( invoice.invoice_id.clone(), @@ -118,10 +98,7 @@ async fn payment_sync( } let mut debit_note_accepts = Vec::default(); - for debit_note in debit_note_dao - .unsent_accepted(current_node_id, peer_id) - .await? - { + for debit_note in debit_note_dao.unsent_accepted(current, peer_id).await? { debit_note_accepts.push(AcceptDebitNote::new( debit_note.debit_note_id, Acceptance { @@ -155,7 +132,7 @@ async fn mark_all_sent(db: &DbExecutor, owner_id: NodeId, msg: PaymentSync) -> a for payment_send in msg.payments { payment_dao - .mark_sent(payment_send.payment.payment_id, owner_id) + .mark_sent(payment_send.payment.payment_id) .await?; } @@ -180,8 +157,14 @@ async fn mark_all_sent(db: &DbExecutor, owner_id: NodeId, msg: PaymentSync) -> a Ok(()) } -async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result> { +async fn send_sync_notifs_for_identity( + identity: NodeId, + db: &DbExecutor, + config: &Config, + cutoff: &DateTime, +) -> anyhow::Result> { let dao: SyncNotifsDao = db.as_dao(); + let backoff_config = &config.sync_notif_backoff; let exp_backoff = |n| { @@ -193,63 +176,36 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result>(); for peer in peers_to_notify { - // FIXME: We should iterate over all identities present in the current instance or make - // payment_sync return a mapping identity -> msg and use the returned identity as the - // sender, or store notifying identity in SyncNotifsDao. - // Currently we assume that everything is sent from the default identity. - let (msg, msg_with_bytes) = payment_sync(db, default_identity, peer).await?; - - log::debug!("Sending PaymentSync as [{default_identity}] to [{peer}]."); - let mut result = ya_net::from(default_identity) + let (msg, msg_with_bytes) = payment_sync(db, identity, peer).await?; + + log::debug!("Sending PaymentSync as [{identity}] to [{peer}]."); + let mut result = ya_net::from(identity) .to(peer) .service(ya_core_model::payment::public::BUS_ID) .call(msg_with_bytes.clone()) .await; - log::debug!("Sending PaymentSync as [{default_identity}] to [{peer}] result: {result:?}"); - - // PaymentSyncWithBytes is newer message that won't always be supported, but it contains - // signatures that are crutial for clients that do support this message and rely on them - // for payment verification. - // For this reason we will try to send PaymentSyncWithBytes first and send the older - // PaymentSync only if the new message is not supported. - // - // Manual tests on centralnet show that the following errors are returned: - // if the endpoint is not supported - // Err(RemoteError("/net//payment/PaymentSyncWithBytes", "GSB failure: Bad request: endpoint address not found")) - // if the peer is not available - // Err(RemoteError("/net//payment/PaymentSyncWithBytes", "Bad request: endpoint address not found")) - // We'll Use presence of "GSB failure" message to distinguish them. - // - // We cannot just use any RemoteError or 'Bad request' as an indicator that old message - // should be sent, becaues that could cause sending not signed messages to newer clients in - // case of transient errors. - // TODO: is there any better way to know if the peer is connected but the endpoint is not - // handled? - if matches!(&result, Err(Error::RemoteError(_, e)) if e.contains("GSB failure: Bad request: endpoint address not found")) - { - log::debug!("Sending PaymentSync as [{default_identity}] to [{peer}]: PaymentSyncWithBytes not supported, falling back to PaymentSync."); - result = ya_net::from(default_identity) + log::debug!("Sending PaymentSync as [{identity}] to [{peer}] result: {result:?}"); + + // Centralnet and hybridnet return different errors when the endpoint is not supported, so + // we have to resort to checking error message. + // This message will be sent even if the node can handle PaymentSyncWithBytes but is not + // connected at all, but there is no standard way to differenciate between these cases. + if matches!(&result, Err(e) if e.to_string().contains("endpoint address not found")) { + log::debug!("Sending PaymentSync as [{identity}] to [{peer}]: PaymentSyncWithBytes endpoint not found, falling back to PaymentSync."); + result = ya_net::from(identity) .to(peer) .service(ya_core_model::payment::public::BUS_ID) .call(msg.clone()) @@ -257,8 +213,8 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result anyhow::Result x.to_string(), Ok(Ok(_)) => unreachable!(), }; - log::debug!("Couldn't deliver PaymentSync to [{peer}] as [{default_identity}]: {err}"); + log::debug!("Couldn't deliver PaymentSync to [{peer}] as [{identity}]: {err}"); dao.increment_retry(peer, cutoff.naive_utc()).await?; } } @@ -289,6 +245,35 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result anyhow::Result> { + let cutoff = Utc::now(); + + let identities = typed::service(identity::BUS_ID) + .call(ya_core_model::identity::List {}) + .await??; + + let mut next_sleep_duration: Option = None; + for identity in identities { + if identity.is_locked || identity.deleted { + continue; + } + let sleep_duration = + send_sync_notifs_for_identity(identity.node_id, &db, &config, &cutoff).await?; + next_sleep_duration = match sleep_duration { + None => next_sleep_duration, + Some(duration) => { + let result_duration = match next_sleep_duration { + None => duration, + Some(last_duration) => ::std::cmp::min(duration, last_duration), + }; + Some(result_duration) + } + }; + } + + return Ok(next_sleep_duration); +} + lazy_static::lazy_static! { pub static ref SYNC_NOTIFS_NOTIFY: Notify = Notify::new(); } diff --git a/core/payment/src/processor.rs b/core/payment/src/processor.rs index ac78f145b7..933b0bcd79 100644 --- a/core/payment/src/processor.rs +++ b/core/payment/src/processor.rs @@ -9,6 +9,7 @@ use crate::error::processor::{ use crate::models::order::ReadObj as DbOrder; use crate::payment_sync::SYNC_NOTIFS_NOTIFY; use crate::timeout_lock::{MutexTimeoutExt, RwLockTimeoutExt}; +use crate::utils::remove_allocation_ids_from_payment; use actix_web::web::Data; use bigdecimal::{BigDecimal, Zero}; use chrono::{DateTime, Utc}; @@ -491,12 +492,7 @@ impl PaymentProcessor { } // Allocation IDs are requestor's private matter and should not be sent to provider - for agreement_payment in payment.agreement_payments.iter_mut() { - agreement_payment.allocation_id = None; - } - for activity_payment in payment.activity_payments.iter_mut() { - activity_payment.allocation_id = None; - } + payment = remove_allocation_ids_from_payment(payment); let signature_canonicalized = driver_endpoint(&driver) .send(driver::SignPaymentCanonicalized(payment.clone())) @@ -546,7 +542,7 @@ impl PaymentProcessor { }; if mark_sent { - payment_dao.mark_sent(payment_id, payer_id).await.ok(); + payment_dao.mark_sent(payment_id).await.ok(); } else { sync_dao.upsert(payee_id).await?; SYNC_NOTIFS_NOTIFY.notify_one(); diff --git a/core/payment/src/utils.rs b/core/payment/src/utils.rs index 9f6346c5fb..74da6641ff 100644 --- a/core/payment/src/utils.rs +++ b/core/payment/src/utils.rs @@ -4,6 +4,7 @@ use futures::Future; use serde::{Deserialize, Serialize}; use std::time::Duration; use ya_client_model::market::{Agreement, Role}; +use ya_client_model::payment::Payment; use ya_core_model::market; use ya_service_bus::{typed as bus, RpcEndpoint}; @@ -173,6 +174,21 @@ pub async fn listen_for_events( .unwrap_or(Ok(vec![])) } +pub fn remove_allocation_ids_from_payment(mut payment: Payment) -> Payment { + // We remove allocation ID from syncs because allocations are not transferred to peers and + // their IDs would be unknown to the recipient. + // let mut payment = payment.clone(); + for agreement_payment in &mut payment.agreement_payments.iter_mut() { + agreement_payment.allocation_id = None; + } + + for activity_payment in &mut payment.activity_payments.iter_mut() { + activity_payment.allocation_id = None; + } + + payment +} + pub mod response { use actix_web::HttpResponse; use serde::Serialize; From d30a5ded69092bd61dab10ac0545066290e71cbe Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Mon, 29 Jul 2024 11:36:41 +0200 Subject: [PATCH 7/9] PR fixes 2 --- core/payment/src/payment_sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index a06102f5cb..af6dcefcd8 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -258,7 +258,7 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result next_sleep_duration, Some(duration) => { From 3d853e6ac0354c4c0aa3630a4774c22a31424ab5 Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Mon, 29 Jul 2024 11:45:33 +0200 Subject: [PATCH 8/9] pr fix 3 --- core/payment/src/payment_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index af6dcefcd8..29e82dc240 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -237,7 +237,7 @@ async fn send_sync_notifs_for_identity( let next_deadline = entry.last_ping + exp_backoff(entry.retries as _); next_deadline.and_utc() }) - .filter(|deadline| deadline > &cutoff) + .filter(|deadline| deadline > cutoff) .min() .map(|ts| ts - cutoff) .and_then(|dur| dur.to_std().ok()); @@ -271,7 +271,7 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result Date: Mon, 29 Jul 2024 12:09:10 +0200 Subject: [PATCH 9/9] pr fix 4 --- core/payment/src/payment_sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/payment/src/payment_sync.rs b/core/payment/src/payment_sync.rs index 29e82dc240..6e5a8f1357 100644 --- a/core/payment/src/payment_sync.rs +++ b/core/payment/src/payment_sync.rs @@ -254,7 +254,7 @@ async fn send_sync_notifs(db: &DbExecutor, config: &Config) -> anyhow::Result = None; for identity in identities { - if identity.is_locked || identity.deleted { + if identity.is_locked { continue; } let sleep_duration =