Skip to content

Commit

Permalink
Merge pull request #2522 from grumbach/feat-transaction-client-api-at…
Browse files Browse the repository at this point in the history
…op-new-quoting

Transaction fixes and Client API (atop new quoting)
  • Loading branch information
jacderida authored Dec 12, 2024
2 parents 7f28a5a + 343a22d commit 2ea9e13
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 98 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/benchmark-prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ jobs:
###########################
### Client Mem Analysis ###
###########################
### The peak limit shall be restored back to 50MB,
### Once client side chunking/quoting flow got re-examined.

- name: Check client memory usage
shell: bash
run: |
client_peak_mem_limit_mb="1024" # mb
client_peak_mem_limit_mb="1500" # mb
client_avg_mem_limit_mb="512" # mb
peak_mem_usage=$(
Expand Down
46 changes: 1 addition & 45 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,50 +1063,6 @@ impl Network {
send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
pub async fn get_close_group_closest_peers(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
debug!("Getting the closest peers to {key:?}");
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
});
let k_bucket_peers = receiver.await?;

// Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
let result_len = k_bucket_peers.len();
let mut closest_peers = k_bucket_peers;
// ensure we're not including self here
if client {
// remove our peer id from the calculations here:
closest_peers.retain(|&x| x != self.peer_id());
if result_len != closest_peers.len() {
info!("Remove self client from the closest_peers");
}
}
if tracing::level_enabled!(tracing::Level::DEBUG) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();

debug!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}");
}

let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?;
Ok(closest_peers.into_iter().cloned().collect())
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
///
Expand Down Expand Up @@ -1155,7 +1111,7 @@ impl Network {
);
}

let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?;
let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE + 1)?;
Ok(closest_peers.into_iter().cloned().collect())
}

Expand Down
2 changes: 1 addition & 1 deletion ant-node/src/log_markers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum Marker<'a> {
/// Valid paid to us and royalty paid register stored
ValidPaidRegisterPutFromClient(&'a PrettyPrintRecordKey<'a>),
/// Valid transaction stored
ValidSpendPutFromClient(&'a PrettyPrintRecordKey<'a>),
ValidTransactionPutFromClient(&'a PrettyPrintRecordKey<'a>),
/// Valid scratchpad stored
ValidScratchpadRecordPutFromClient(&'a PrettyPrintRecordKey<'a>),

Expand Down
49 changes: 20 additions & 29 deletions ant-node/src/put_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use std::collections::BTreeSet;

use crate::{node::Node, Error, Marker, Result};
use ant_evm::payment_vault::verify_data_payment;
use ant_evm::{AttoTokens, ProofOfPayment};
Expand Down Expand Up @@ -162,29 +164,11 @@ impl Node {
.await
}
RecordKind::Transaction => {
let record_key = record.key.clone();
let value_to_hash = record.value.clone();
let transactions = try_deserialize_record::<Vec<Transaction>>(&record)?;
let result = self
.validate_merge_and_store_transactions(transactions, &record_key)
.await;
if result.is_ok() {
Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log();
let content_hash = XorName::from_content(&value_to_hash);
self.replicate_valid_fresh_record(
record_key,
RecordType::NonChunk(content_hash),
);

// Notify replication_fetcher to mark the attempt as completed.
// Send the notification earlier to avoid it got skipped due to:
// the record becomes stored during the fetch because of other interleaved process.
self.network().notify_fetch_completed(
record.key.clone(),
RecordType::NonChunk(content_hash),
);
}
result
// Transactions should always be paid for
error!("Transaction should not be validated at this point");
Err(Error::InvalidPutWithoutPayment(
PrettyPrintRecordKey::from(&record.key).into_owned(),
))
}
RecordKind::TransactionWithPayment => {
let (payment, transaction) =
Expand Down Expand Up @@ -224,6 +208,12 @@ impl Node {
.await;
if res.is_ok() {
let content_hash = XorName::from_content(&record.value);
Marker::ValidTransactionPutFromClient(&PrettyPrintRecordKey::from(&record.key))
.log();
self.replicate_valid_fresh_record(
record.key.clone(),
RecordType::NonChunk(content_hash),
);

// Notify replication_fetcher to mark the attempt as completed.
// Send the notification earlier to avoid it got skipped due to:
Expand Down Expand Up @@ -601,23 +591,24 @@ impl Node {
}

// verify the transactions
let mut validated_transactions: Vec<Transaction> = transactions_for_key
let mut validated_transactions: BTreeSet<Transaction> = transactions_for_key
.into_iter()
.filter(|t| t.verify())
.collect();

// skip if none are valid
let addr = match validated_transactions.as_slice() {
[] => {
let addr = match validated_transactions.first() {
None => {
warn!("Found no validated transactions to store at {pretty_key:?}");
return Ok(());
}
[t, ..] => t.address(),
Some(t) => t.address(),
};

// add local transactions to the validated transactions
// add local transactions to the validated transactions, turn to Vec
let local_txs = self.get_local_transactions(addr).await?;
validated_transactions.extend(local_txs);
validated_transactions.extend(local_txs.into_iter());
let validated_transactions: Vec<Transaction> = validated_transactions.into_iter().collect();

// store the record into the local storage
let record = Record {
Expand Down
49 changes: 39 additions & 10 deletions ant-protocol/src/storage/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::address::TransactionAddress;
use bls::SecretKey;
use serde::{Deserialize, Serialize};

// re-exports
Expand All @@ -27,13 +28,15 @@ pub struct Transaction {
}

impl Transaction {
/// Create a new transaction, signing it with the provided secret key.
pub fn new(
owner: PublicKey,
parents: Vec<PublicKey>,
content: TransactionContent,
outputs: Vec<(PublicKey, TransactionContent)>,
signature: Signature,
signing_key: &SecretKey,
) -> Self {
let signature = signing_key.sign(Self::bytes_to_sign(&owner, &parents, &content, &outputs));
Self {
owner,
parents,
Expand All @@ -43,35 +46,61 @@ impl Transaction {
}
}

pub fn address(&self) -> TransactionAddress {
TransactionAddress::from_owner(self.owner)
/// Create a new transaction, with the signature already calculated.
pub fn new_with_signature(
owner: PublicKey,
parents: Vec<PublicKey>,
content: TransactionContent,
outputs: Vec<(PublicKey, TransactionContent)>,
signature: Signature,
) -> Self {
Self {
owner,
parents,
content,
outputs,
signature,
}
}

pub fn bytes_for_signature(&self) -> Vec<u8> {
/// Get the bytes that the signature is calculated from.
pub fn bytes_to_sign(
owner: &PublicKey,
parents: &[PublicKey],
content: &[u8],
outputs: &[(PublicKey, TransactionContent)],
) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(&self.owner.to_bytes());
bytes.extend_from_slice(&owner.to_bytes());
bytes.extend_from_slice("parent".as_bytes());
bytes.extend_from_slice(
&self
.parents
&parents
.iter()
.map(|p| p.to_bytes())
.collect::<Vec<_>>()
.concat(),
);
bytes.extend_from_slice("content".as_bytes());
bytes.extend_from_slice(&self.content);
bytes.extend_from_slice(content);
bytes.extend_from_slice("outputs".as_bytes());
bytes.extend_from_slice(
&self
.outputs
&outputs
.iter()
.flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat())
.collect::<Vec<_>>(),
);
bytes
}

pub fn address(&self) -> TransactionAddress {
TransactionAddress::from_owner(self.owner)
}

/// Get the bytes that the signature is calculated from.
pub fn bytes_for_signature(&self) -> Vec<u8> {
Self::bytes_to_sign(&self.owner, &self.parents, &self.content, &self.outputs)
}

pub fn verify(&self) -> bool {
self.owner
.verify(&self.signature, self.bytes_for_signature())
Expand Down
4 changes: 3 additions & 1 deletion autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ pub mod payment;
pub mod quote;

pub mod data;
pub mod files;
pub mod transactions;

#[cfg(feature = "external-signer")]
#[cfg_attr(docsrs, doc(cfg(feature = "external-signer")))]
pub mod external_signer;
pub mod files;
#[cfg(feature = "registers")]
#[cfg_attr(docsrs, doc(cfg(feature = "registers")))]
pub mod registers;
Expand Down
13 changes: 2 additions & 11 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Client {
name: String,
owner: RegisterSecretKey,
) -> Result<AttoTokens, RegisterError> {
info!("Getting cost for register with name: {name}");
trace!("Getting cost for register with name: {name}");
// get register address
let pk = owner.public_key();
let name = XorName::from_content_parts(&[name.as_bytes()]);
Expand Down Expand Up @@ -321,15 +321,6 @@ impl Client {
};

let payees = proof.payees();

if payees.is_empty() {
error!(
"Failed to get payees from payment proof: {:?}",
RegisterError::PayeesMissing
);
return Err(RegisterError::PayeesMissing);
}

let signed_register = register.signed_reg.clone();

let record = Record {
Expand All @@ -356,7 +347,7 @@ impl Client {
put_quorum: Quorum::All,
retry_strategy: None,
use_put_record_to: Some(payees),
verification: Some((VerificationKind::Network, get_cfg)),
verification: Some((VerificationKind::Crdt, get_cfg)),
};

debug!("Storing register at address {address} to the network");
Expand Down
Loading

0 comments on commit 2ea9e13

Please sign in to comment.