Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve synchronization infrastructure #64

Merged
merged 8 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions manta-accounting/src/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,22 +859,39 @@ macro_rules! impl_asset_map_for_maps_body {

#[inline]
fn select(&self, asset: Asset) -> Selection<Self> {
// TODO: Use a smarter coin-selection algorithm (max-heap?).
if asset.is_zero() {
return Selection::default();
}
let mut sum = Asset::zero(asset.id);
let mut values = Vec::new();
for (key, assets) in self {
for item in assets {
if item.value != AssetValue(0) && sum.try_add_assign(*item) {
values.push((key.clone(), item.value));
if sum.value >= asset.value {
break;
}
let mut min_max_asset: Option<($k, AssetValue)> = None;
let map = self
.iter()
.map(|(key, assets)| assets.iter().map(move |asset| (key, asset)))
.flatten()
.filter_map(|(key, item)| {
if !item.is_zero() && item.id == asset.id {
Some((key, item.value))
} else {
None
}
});
for (key, value) in map {
if value > asset.value {
min_max_asset = Some(match min_max_asset.take() {
Some(best) if value >= best.1 => best,
_ => (key.clone(), value),
});
} else if value == asset.value {
return Selection::new(Default::default(), vec![(key.clone(), value)]);
} else {
sum.add_assign(value);
values.push((key.clone(), value));
}
}
if let Some((best_key, best_value)) = min_max_asset {
return Selection::new(best_value - asset.value, vec![(best_key, best_value)]);
}
if sum.value < asset.value {
Selection::default()
} else {
Expand Down
168 changes: 71 additions & 97 deletions manta-accounting/src/wallet/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,136 +16,110 @@

//! Ledger Connection

use crate::transfer::{Configuration, EncryptedNote, TransferPost, Utxo, VoidNumber};
use alloc::vec::Vec;
use core::{fmt::Debug, hash::Hash};
use manta_util::future::LocalBoxFutureResult;

#[cfg(feature = "serde")]
use manta_util::serde::{Deserialize, Serialize};

/// Ledger Connection
///
/// This is the base `trait` for defining a connection with a ledger. To communicate with the
/// ledger, you can establish such a connection first and then interact via the [`Read`] and
/// [`Write`] traits which send messages along the connection.
pub trait Connection {
/// Error Type
///
/// This error type corresponds to the communication channel setup by the [`Connection`] rather
/// than any errors introduced by [`read`] or [`write`] methods. Instead those methods should
/// return errors in their `Response` types.
///
/// [`read`]: Read::read
/// [`write`]: Write::write
type Error;
}

/// Ledger Checkpoint
///
/// The checkpoint type is responsible for keeping the ledger, signer, and wallet in sync with each
/// other making sure that they all have the same view of the ledger state. Checkpoints should
/// be orderable with a bottom element returned by [`Default::default`].
pub trait Checkpoint: Default + PartialOrd {
/// Returns the index into the receiver set for the ledger.
fn receiver_index(&self) -> usize;
/// be orderable with a bottom element returned by [`Default::default`]. Types implementing this
/// `trait` must also implement [`Clone`] as it must be safe (but not necessarily efficient) to
/// copy a checkpoint value.
pub trait Checkpoint: Clone + Default + PartialOrd {}

/// Returns the index into the sender set for the ledger.
fn sender_index(&self) -> usize;
}

/// Ledger Pull Configuration
pub trait PullConfiguration<C>
/// Ledger Data Pruning
pub trait Prune<T>
where
C: Configuration,
T: Checkpoint,
{
/// Ledger State Checkpoint Type
type Checkpoint: Checkpoint;

/// Receiver Chunk Iterator Type
type ReceiverChunk: IntoIterator<Item = (Utxo<C>, EncryptedNote<C>)>;

/// Sender Chunk Iterator Type
type SenderChunk: IntoIterator<Item = VoidNumber<C>>;
/// Prunes the data in `self`, which was retrieved at `origin`, so that it meets the current
/// `checkpoint`, dropping data that is older than the given `checkpoint`. This method should
/// return `true` if it dropped data from `self`.
fn prune(&mut self, origin: &T, checkpoint: &T) -> bool;
}

/// Ledger Source Connection
pub trait Connection<C>: PullConfiguration<C>
where
C: Configuration,
{
/// Push Response Type
///
/// This is the return type of the [`push`](Self::push) method. Use this type to customize the
/// ledger's response to posting a set of transactions, valid or otherwise. In most cases `bool`
/// or some result type like `Result<(), Error>` is sufficient. In other cases where the ledger
/// cannot respond immediately to the [`push`](Self::push) command, a subscription token can be
/// returned instead which can be used to listen to the result later on.
type PushResponse;

/// Error Type
///
/// This error type corresponds to the communication channel itself setup by the [`Connection`]
/// rather than any errors introduced by the [`pull`](Self::pull) or [`push`](Self::push)
/// methods themselves which would correspond to an empty [`PullResponse`] or whatever error
/// variants are stored in [`PushResponse`](Self::PushResponse).
type Error;
/// Ledger Connection Reading
pub trait Read<D>: Connection {
/// Checkpoint Type
type Checkpoint: Checkpoint;

/// Pulls receiver data from the ledger starting from `checkpoint`, returning the current
/// [`Checkpoint`](PullConfiguration::Checkpoint).
fn pull<'s>(
/// Gets data from the ledger starting from `checkpoint`, returning the current
/// [`Checkpoint`](Self::Checkpoint).
fn read<'s>(
&'s mut self,
checkpoint: &'s Self::Checkpoint,
) -> LocalBoxFutureResult<'s, PullResponse<C, Self>, Self::Error>;

/// Sends `posts` to the ledger, returning `true` or `false` depending on whether the entire
/// batch succeeded or not.
fn push(
&mut self,
posts: Vec<TransferPost<C>>,
) -> LocalBoxFutureResult<Self::PushResponse, Self::Error>;
) -> LocalBoxFutureResult<'s, ReadResponse<Self::Checkpoint, D>, Self::Error>;
}

/// Ledger Source Pull Response
/// Ledger Connection Read Response
///
/// This `struct` is created by the [`pull`](Connection::pull) method on [`Connection`].
/// This `struct` is created by the [`read`](Read::read) method on [`Read`].
/// See its documentation for more.
#[cfg_attr(
feature = "serde",
derive(Deserialize, Serialize),
serde(
bound(
deserialize = r"
L::Checkpoint: Deserialize<'de>,
L::ReceiverChunk: Deserialize<'de>,
L::SenderChunk: Deserialize<'de>
",
serialize = r"
L::Checkpoint: Serialize,
L::ReceiverChunk: Serialize,
L::SenderChunk: Serialize
",
),
crate = "manta_util::serde",
deny_unknown_fields
)
)]
#[derive(derivative::Derivative)]
#[derivative(
Clone(bound = "L::Checkpoint: Clone, L::ReceiverChunk: Clone, L::SenderChunk: Clone"),
Copy(bound = "L::Checkpoint: Copy, L::ReceiverChunk: Copy, L::SenderChunk: Copy"),
Debug(bound = "L::Checkpoint: Debug, L::ReceiverChunk: Debug, L::SenderChunk: Debug"),
Default(bound = "L::Checkpoint: Default, L::ReceiverChunk: Default, L::SenderChunk: Default"),
Eq(bound = "L::Checkpoint: Eq, L::ReceiverChunk: Eq, L::SenderChunk: Eq"),
Hash(bound = "L::Checkpoint: Hash, L::ReceiverChunk: Hash, L::SenderChunk: Hash"),
PartialEq(
bound = "L::Checkpoint: PartialEq, L::ReceiverChunk: PartialEq, L::SenderChunk: PartialEq"
)
serde(crate = "manta_util::serde", deny_unknown_fields)
)]
pub struct PullResponse<C, L>
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
pub struct ReadResponse<T, D>
where
C: Configuration,
L: PullConfiguration<C> + ?Sized,
T: Checkpoint,
{
/// Pull Continuation Flag
/// Read Continuation Flag
///
/// The `should_continue` flag is set to `true` if the client should request more data from the
/// ledger to finish the pull.
/// ledger to finish the requested [`read`](Read::read).
pub should_continue: bool,

/// Ledger Checkpoint
/// Next Ledger Checkpoint
///
/// This checkpoint represents the new checkpoint that the client should be synchronized to when
/// incorporating the [`data`] returned by the [`read`] request. To continue the request the
/// client should send this checkpoint in their next call to [`read`].
///
/// If the `should_continue` flag is set to `true` then `checkpoint` is the next
/// [`Checkpoint`](PullConfiguration::Checkpoint) to request data from the ledger. Otherwise, it
/// represents the current ledger state.
pub checkpoint: L::Checkpoint,
/// [`data`]: Self::data
/// [`read`]: Read::read
pub next_checkpoint: T,

/// Ledger Receiver Chunk
pub receivers: L::ReceiverChunk,
/// Data Payload
///
/// This is the data payload that was returned by the ledger corresponding to the
/// [`read`](Read::read) request.
pub data: D,
}

/// Ledger Sender Chunk
pub senders: L::SenderChunk,
/// Ledger Connection Writing
pub trait Write<R>: Connection {
/// Ledger Response Type
///
/// This is the return type of the [`write`] method. Use this type to customize the ledger's
/// response to performing a [`write`] call, valid or otherwise. In most cases `bool` or some
/// result type like `Result<(), Error>` is sufficient. In other cases where the ledger cannot
/// respond immediately to the [`write`] command, a subscription token can be returned instead
/// which can be used to listen to the result later on.
type Response;

/// Sends the `request` to the ledger, returning its [`Response`](Self::Response).
fn write(&mut self, request: R) -> LocalBoxFutureResult<Self::Response, Self::Error>;
}
Loading