Skip to content

Commit

Permalink
Merge pull request #3278 from golemfactory/mw/configurable-sync-notif…
Browse files Browse the repository at this point in the history
…s-backoff

feat: Configurable sync_notifs backoff
  • Loading branch information
nieznanysprawiciel authored Jul 29, 2024
2 parents 09ee56f + a084d67 commit 83d4600
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 72 deletions.
12 changes: 9 additions & 3 deletions core/payment/examples/payment_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use ya_core_model::driver::{driver_bus_id, AccountMode, Fund, Init};
use ya_core_model::identity;
use ya_dummy_driver as dummy;
use ya_erc20_driver as erc20;
use ya_net::Config;
use ya_net::Config as NetConfig;
use ya_payment::processor::PaymentProcessor;
use ya_payment::Config as PaymentConfig;
use ya_payment::{migrations, utils, PaymentService};
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::auth::dummy::DummyAuth;
Expand Down Expand Up @@ -230,7 +231,12 @@ async fn main() -> anyhow::Result<()> {
log::debug!("bind_gsb_router()");

let processor = Arc::new(PaymentProcessor::new(db.clone()));
ya_payment::service::bind_service(&db, processor, BindOptions::default().run_sync_job(false));
ya_payment::service::bind_service(
&db,
processor,
BindOptions::default().run_sync_job(false),
Arc::new(PaymentConfig::from_env()?),
);
log::debug!("bind_service()");

bus::bind(identity::BUS_ID, {
Expand Down Expand Up @@ -342,7 +348,7 @@ async fn main() -> anyhow::Result<()> {
log::info!("bind remote...");

ya_net::hybrid::start_network(
Arc::new(Config::from_env()?),
Arc::new(NetConfig::from_env()?),
provider_id,
vec![provider_id, requestor_id],
)
Expand Down
2 changes: 1 addition & 1 deletion core/payment/src/api/debit_notes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ async fn accept_debit_note(
response?;
} else {
log::debug!("AcceptDebitNote not delivered");
sync_dao.upsert(node_id).await?;
sync_dao.upsert(issuer_id).await?;
SYNC_NOTIFS_NOTIFY.notify_one();
}

Expand Down
41 changes: 41 additions & 0 deletions core/payment/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use structopt::*;

#[derive(StructOpt, Clone)]
pub struct Config {
#[structopt(flatten)]
pub sync_notif_backoff: SyncNotifBackoffConfig,
}

#[derive(StructOpt, Clone)]
pub struct SyncNotifBackoffConfig {
#[structopt(long, env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_INITIAL_DELAY", parse(try_from_str = humantime::parse_duration), default_value = "30s")]
pub initial_delay: std::time::Duration,

#[structopt(
long,
env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_EXPONENT",
default_value = "6"
)]
pub exponent: f64,

#[structopt(
long,
env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_MAX_RETRIES",
default_value = "7"
)]
pub max_retries: u32,

#[structopt(long, env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_CAP", parse(try_from_str = humantime::parse_duration))]
pub cap: Option<std::time::Duration>,

#[structopt(long, env = "YA_PAYMENT_SYNC_NOTIF_BACKOFF_ERROR_DELAY", parse(try_from_str = humantime::parse_duration), default_value = "10m")]
pub error_delay: std::time::Duration,
}

impl Config {
pub fn from_env() -> Result<Config, structopt::clap::Error> {
// Empty command line arguments, because we want to use ENV fallback
// or default values if ENV variables are not set.
Config::from_iter_safe(&[""])
}
}
7 changes: 6 additions & 1 deletion core/payment/src/dao/debit_note.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,17 @@ impl<'c> DebitNoteDao<'c> {
}

/// Lists debit notes with send_accept
pub async fn unsent_accepted(&self, owner_id: NodeId) -> DbResult<Vec<DebitNote>> {
pub async fn unsent_accepted(
&self,
owner_id: NodeId,
peer_id: NodeId,
) -> DbResult<Vec<DebitNote>> {
readonly_transaction(self.pool, "debit_note_unsent_accepted", move |conn| {
let read: Vec<ReadObj> = query!()
.filter(dsl::owner_id.eq(owner_id))
.filter(dsl::send_accept.eq(true))
.filter(dsl::status.eq(DocumentStatus::Accepted.to_string()))
.filter(agreement_dsl::peer_id.eq(peer_id))
.order_by(dsl::timestamp.desc())
.load(conn)?;
let mut debit_notes = Vec::new();
Expand Down
14 changes: 12 additions & 2 deletions core/payment/src/dao/invoice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,17 @@ impl<'c> InvoiceDao<'c> {
}

/// Lists invoices with send_accept
pub async fn unsent_accepted(&self, owner_id: NodeId) -> DbResult<Vec<Invoice>> {
pub async fn unsent_accepted(
&self,
owner_id: NodeId,
peer_id: NodeId,
) -> DbResult<Vec<Invoice>> {
readonly_transaction(self.pool, "invoice_dao_unsent_accepted", move |conn| {
let invoices: Vec<ReadObj> = query!()
.filter(dsl::owner_id.eq(owner_id))
.filter(dsl::send_accept.eq(true))
.filter(dsl::status.eq(DocumentStatus::Accepted.to_string()))
.filter(agreement_dsl::peer_id.eq(peer_id))
.load(conn)?;

let activities = activity_dsl::pay_invoice_x_activity
Expand Down Expand Up @@ -459,12 +464,17 @@ impl<'c> InvoiceDao<'c> {
.await
}

pub async fn unsent_rejected(&self, owner_id: NodeId) -> DbResult<Vec<Invoice>> {
pub async fn unsent_rejected(
&self,
owner_id: NodeId,
peer_id: NodeId,
) -> DbResult<Vec<Invoice>> {
readonly_transaction(self.pool, "unsent_rejected", move |conn| {
let invoices: Vec<ReadObj> = query!()
.filter(dsl::owner_id.eq(owner_id))
.filter(dsl::send_reject.eq(true))
.filter(dsl::status.eq(DocumentStatus::Rejected.to_string()))
.filter(agreement_dsl::peer_id.eq(peer_id))
.load(conn)?;

let activities = activity_dsl::pay_invoice_x_activity
Expand Down
6 changes: 5 additions & 1 deletion core/payment/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)] // Crate under development
#![allow(unused_variables)] // Crate under development
pub use crate::config::Config;
use crate::processor::PaymentProcessor;
use futures::FutureExt;
use service::BindOptions;
Expand All @@ -15,6 +16,7 @@ extern crate diesel;
pub mod accounts;
pub mod api;
mod cli;
pub mod config;
pub mod dao;
pub mod error;
pub mod models;
Expand Down Expand Up @@ -53,8 +55,10 @@ impl PaymentService {
let db = context.component();
db.apply_migration(migrations::run_with_output)?;

let config = Arc::new(Config::from_env()?);

let processor = Arc::new(PaymentProcessor::new(db.clone()));
self::service::bind_service(&db, processor.clone(), BindOptions::default());
self::service::bind_service(&db, processor.clone(), BindOptions::default(), config);

tokio::task::spawn(async move {
processor.release_allocations(false).await;
Expand Down
Loading

0 comments on commit 83d4600

Please sign in to comment.