Skip to content

Commit

Permalink
Merge pull request #2364 from mickvandijke/feat-autonomi-prepaid-uploads
Browse files Browse the repository at this point in the history
feat(autonomi): support prepaid put operations
  • Loading branch information
grumbach authored Oct 31, 2024
2 parents a2cc3e1 + 685c881 commit 374e84b
Show file tree
Hide file tree
Showing 23 changed files with 506 additions and 186 deletions.
4 changes: 2 additions & 2 deletions autonomi-cli/src/commands/vault.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn create(peers: Vec<Multiaddr>) -> Result<()> {

println!("Pushing to network vault...");
let total_cost = client
.put_user_data_to_vault(&vault_sk, &wallet, local_user_data)
.put_user_data_to_vault(&vault_sk, wallet.into(), local_user_data)
.await?;

if total_cost.is_zero() {
Expand Down Expand Up @@ -82,7 +82,7 @@ pub async fn sync(peers: Vec<Multiaddr>, force: bool) -> Result<()> {
let private_file_archives_len = local_user_data.private_file_archives.len();
let registers_len = local_user_data.registers.len();
client
.put_user_data_to_vault(&vault_sk, &wallet, local_user_data)
.put_user_data_to_vault(&vault_sk, wallet.into(), local_user_data)
.await?;

println!("✅ Successfully synced vault");
Expand Down
2 changes: 1 addition & 1 deletion autonomi/src/client/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Client {
let bytes = archive
.into_bytes()
.map_err(|e| PutError::Serialization(format!("Failed to serialize archive: {e:?}")))?;
self.data_put(bytes, wallet).await
self.data_put(bytes, wallet.into()).await
}

/// Get the cost to upload an archive
Expand Down
6 changes: 3 additions & 3 deletions autonomi/src/client/archive_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use super::{
data_private::PrivateDataAccess,
Client,
};
use crate::client::payment::PaymentOption;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use sn_evm::EvmWallet;

/// The address of a private archive
/// Contains the [`PrivateDataAccess`] leading to the [`PrivateArchive`] data
Expand Down Expand Up @@ -130,11 +130,11 @@ impl Client {
pub async fn private_archive_put(
&self,
archive: PrivateArchive,
wallet: &EvmWallet,
payment_option: PaymentOption,
) -> Result<PrivateArchiveAccess, PutError> {
let bytes = archive
.into_bytes()
.map_err(|e| PutError::Serialization(format!("Failed to serialize archive: {e:?}")))?;
self.private_data_put(bytes, wallet).await
self.private_data_put(bytes, payment_option).await
}
}
17 changes: 11 additions & 6 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use std::collections::HashSet;
use std::sync::LazyLock;
use xor_name::XorName;

use crate::client::payment::PaymentOption;
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};
use sn_evm::EvmWalletError;
use sn_evm::{Amount, AttoTokens};
use sn_evm::{EvmWallet, EvmWalletError};
use sn_networking::{GetRecordCfg, NetworkError};
use sn_protocol::{
storage::{try_deserialize_record, Chunk, ChunkAddress, RecordHeader, RecordKind},
Expand Down Expand Up @@ -136,7 +137,11 @@ impl Client {
/// Upload a piece of data to the network.
/// Returns the Data Address at which the data was stored.
/// This data is publicly accessible.
pub async fn data_put(&self, data: Bytes, wallet: &EvmWallet) -> Result<DataAddr, PutError> {
pub async fn data_put(
&self,
data: Bytes,
payment_option: PaymentOption,
) -> Result<DataAddr, PutError> {
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;
let data_map_addr = data_map_chunk.address();
Expand All @@ -152,8 +157,8 @@ impl Client {

// Pay for all chunks + data map chunk
info!("Paying for {} addresses", xor_names.len());
let (payment_proofs, _free_chunks) = self
.pay(xor_names.into_iter(), wallet)
let receipt = self
.pay_for_content_addrs(xor_names.into_iter(), payment_option)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

Expand All @@ -163,7 +168,7 @@ impl Client {
for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
if let Some(proof) = receipt.get(chunk.name()) {
let proof_clone = proof.clone();
upload_tasks.push(async move {
self_clone
Expand Down Expand Up @@ -191,7 +196,7 @@ impl Client {

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = payment_proofs
let tokens_spent = receipt
.values()
.map(|proof| proof.quote.cost.as_atto())
.sum::<Amount>();
Expand Down
13 changes: 7 additions & 6 deletions autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use std::hash::{DefaultHasher, Hash, Hasher};

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use sn_evm::{Amount, EvmWallet};
use sn_evm::Amount;
use sn_protocol::storage::Chunk;

use super::data::CHUNK_UPLOAD_BATCH_SIZE;
use super::data::{GetError, PutError};
use crate::client::payment::PaymentOption;
use crate::client::utils::process_tasks_with_max_concurrency;
use crate::client::{ClientEvent, UploadSummary};
use crate::{self_encryption::encrypt, Client};
Expand Down Expand Up @@ -64,7 +65,7 @@ impl Client {
pub async fn private_data_put(
&self,
data: Bytes,
wallet: &EvmWallet,
payment_option: PaymentOption,
) -> Result<PrivateDataAccess, PutError> {
let now = sn_networking::target_arch::Instant::now();
let (data_map_chunk, chunks) = encrypt(data)?;
Expand All @@ -73,8 +74,8 @@ impl Client {
// Pay for all chunks
let xor_names: Vec<_> = chunks.iter().map(|chunk| *chunk.name()).collect();
info!("Paying for {} addresses", xor_names.len());
let (payment_proofs, _free_chunks) = self
.pay(xor_names.into_iter(), wallet)
let receipt = self
.pay_for_content_addrs(xor_names.into_iter(), payment_option)
.await
.inspect_err(|err| error!("Error paying for data: {err:?}"))?;

Expand All @@ -84,7 +85,7 @@ impl Client {
for chunk in chunks {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
if let Some(proof) = receipt.get(chunk.name()) {
let proof_clone = proof.clone();
upload_tasks.push(async move {
self_clone
Expand Down Expand Up @@ -112,7 +113,7 @@ impl Client {

// Reporting
if let Some(channel) = self.client_event_sender.as_ref() {
let tokens_spent = payment_proofs
let tokens_spent = receipt
.values()
.map(|proof| proof.quote.cost.as_atto())
.sum::<Amount>();
Expand Down
72 changes: 9 additions & 63 deletions autonomi/src/client/external_signer.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,23 @@
use crate::client::data::{DataAddr, PutError};
use crate::client::data::PutError;
use crate::client::utils::extract_quote_payments;
use crate::self_encryption::encrypt;
use crate::Client;
use bytes::Bytes;
use sn_evm::{PaymentQuote, ProofOfPayment, QuotePayment};
use sn_evm::{PaymentQuote, QuotePayment};
use sn_protocol::storage::Chunk;
use std::collections::HashMap;
use xor_name::XorName;

use crate::utils::cost_map_to_quotes;
#[allow(unused_imports)]
pub use sn_evm::external_signer::*;

impl Client {
/// Upload a piece of data to the network. This data will be self-encrypted.
/// Payment will not be done automatically as opposed to the regular `data_put`, so the proof of payment has to be provided.
/// Returns the Data Address at which the data was stored.
pub async fn data_put_with_proof_of_payment(
&self,
data: Bytes,
proof: HashMap<XorName, ProofOfPayment>,
) -> Result<DataAddr, PutError> {
let (data_map_chunk, chunks, _) = encrypt_data(data)?;
self.upload_data_map(&proof, &data_map_chunk).await?;
self.upload_chunks(&chunks, &proof).await?;
Ok(*data_map_chunk.address().xorname())
}

/// Get quotes for data.
/// Returns a cost map, data payments to be executed and a list of free (already paid for) chunks.
pub async fn get_quotes_for_data(
pub async fn get_quotes_for_content_addresses(
&self,
data: Bytes,
content_addrs: impl Iterator<Item = XorName>,
) -> Result<
(
HashMap<XorName, PaymentQuote>,
Expand All @@ -39,59 +26,18 @@ impl Client {
),
PutError,
> {
// Encrypt the data as chunks
let (_data_map_chunk, _chunks, xor_names) = encrypt_data(data)?;

let cost_map: HashMap<XorName, PaymentQuote> = self
.get_store_quotes(xor_names.into_iter())
.await?
.into_iter()
.map(|(name, (_, _, q))| (name, q))
.collect();

let cost_map = self.get_store_quotes(content_addrs).await?;
let (quote_payments, free_chunks) = extract_quote_payments(&cost_map);
Ok((cost_map, quote_payments, free_chunks))
}
let quotes = cost_map_to_quotes(cost_map);

async fn upload_data_map(
&self,
payment_proofs: &HashMap<XorName, ProofOfPayment>,
data_map_chunk: &Chunk,
) -> Result<(), PutError> {
let map_xor_name = data_map_chunk.name();

if let Some(proof) = payment_proofs.get(map_xor_name) {
debug!("Uploading data map chunk: {map_xor_name:?}");
self.chunk_upload_with_payment(data_map_chunk.clone(), proof.clone())
.await
.inspect_err(|err| error!("Error uploading data map chunk: {err:?}"))
} else {
Ok(())
}
}

async fn upload_chunks(
&self,
chunks: &[Chunk],
payment_proofs: &HashMap<XorName, ProofOfPayment>,
) -> Result<(), PutError> {
debug!("Uploading {} chunks", chunks.len());
for chunk in chunks {
if let Some(proof) = payment_proofs.get(chunk.name()) {
let address = *chunk.address();
self.chunk_upload_with_payment(chunk.clone(), proof.clone())
.await
.inspect_err(|err| error!("Error uploading chunk {address:?} :{err:?}"))?;
}
}
Ok(())
Ok((quotes, quote_payments, free_chunks))
}
}

/// Encrypts data as chunks.
///
/// Returns the data map chunk, file chunks and a list of all content addresses including the data map.
fn encrypt_data(data: Bytes) -> Result<(Chunk, Vec<Chunk>, Vec<XorName>), PutError> {
pub fn encrypt_data(data: Bytes) -> Result<(Chunk, Vec<Chunk>, Vec<XorName>), PutError> {
let now = sn_networking::target_arch::Instant::now();
let result = encrypt(data)?;

Expand Down
4 changes: 2 additions & 2 deletions autonomi/src/client/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Client {

// upload archive
let archive_serialized = archive.into_bytes()?;
let arch_addr = self.data_put(archive_serialized, wallet).await?;
let arch_addr = self.data_put(archive_serialized, wallet.into()).await?;

info!("Complete archive upload completed in {:?}", start.elapsed());
#[cfg(feature = "loud")]
Expand All @@ -175,7 +175,7 @@ impl Client {

let data = tokio::fs::read(path).await?;
let data = Bytes::from(data);
let addr = self.data_put(data, wallet).await?;
let addr = self.data_put(data, wallet.into()).await?;
Ok(addr)
}

Expand Down
6 changes: 4 additions & 2 deletions autonomi/src/client/fs_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ impl Client {

// upload archive
let archive_serialized = archive.into_bytes()?;
let arch_addr = self.private_data_put(archive_serialized, wallet).await?;
let arch_addr = self
.private_data_put(archive_serialized, wallet.into())
.await?;

info!(
"Complete private archive upload completed in {:?}",
Expand All @@ -126,7 +128,7 @@ impl Client {

let data = tokio::fs::read(path).await?;
let data = Bytes::from(data);
let addr = self.private_data_put(data, wallet).await?;
let addr = self.private_data_put(data, wallet.into()).await?;
Ok(addr)
}
}
1 change: 1 addition & 0 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

pub mod address;
pub mod payment;

#[cfg(feature = "data")]
pub mod archive;
Expand Down
49 changes: 49 additions & 0 deletions autonomi/src/client/payment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::client::data::PayError;
use crate::Client;
use sn_evm::{EvmWallet, ProofOfPayment};
use std::collections::HashMap;
use xor_name::XorName;

/// Contains the proof of payment for XOR addresses.
pub type Receipt = HashMap<XorName, ProofOfPayment>;

/// Payment options for data payments.
#[derive(Clone)]
pub enum PaymentOption {
Wallet(EvmWallet),
Receipt(Receipt),
}

impl From<EvmWallet> for PaymentOption {
fn from(value: EvmWallet) -> Self {
PaymentOption::Wallet(value)
}
}

impl From<&EvmWallet> for PaymentOption {
fn from(value: &EvmWallet) -> Self {
PaymentOption::Wallet(value.clone())
}
}

impl From<Receipt> for PaymentOption {
fn from(value: Receipt) -> Self {
PaymentOption::Receipt(value)
}
}

impl Client {
pub(crate) async fn pay_for_content_addrs(
&self,
content_addrs: impl Iterator<Item = XorName>,
payment_option: PaymentOption,
) -> Result<Receipt, PayError> {
match payment_option {
PaymentOption::Wallet(wallet) => {
let (receipt, _) = self.pay(content_addrs, &wallet).await?;
Ok(receipt)
}
PaymentOption::Receipt(receipt) => Ok(receipt),
}
}
}
Loading

0 comments on commit 374e84b

Please sign in to comment.