Skip to content

Commit

Permalink
refactor!(core): make it all async
Browse files Browse the repository at this point in the history
BREAKING CHANGE: all the traits are now async

Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
  • Loading branch information
aasseman committed Jul 28, 2023
1 parent 7e25081 commit 7be7da4
Show file tree
Hide file tree
Showing 22 changed files with 384 additions and 280 deletions.
3 changes: 2 additions & 1 deletion tap_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rand="0.8.5"
thiserror="1.0.38"
ethereum-types={version="0.14.1"}
rstest = "0.17.0"
async-std = { version = "1.5", features = ["attributes"] }
ethers = { version = "2.0.0", default-features = false }
ethers-core = "2.0.0"
ethers-contract = "2.0.0"
Expand All @@ -21,6 +20,8 @@ anyhow = "1"

strum = "0.24.1"
strum_macros = "0.24.3"
async-trait = "0.1.72"
tokio = { version = "1.29.1", features = ["macros"] }

[dev-dependencies]
criterion = { version = "0.5", features = ["async_std"] }
Expand Down
20 changes: 12 additions & 8 deletions tap_core/benches/timeline_aggretion_protocol_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use std::str::FromStr;

use async_std::task::block_on;
use criterion::async_executor::AsyncStdExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use ethereum_types::Address;
Expand All @@ -21,6 +20,7 @@ use tap_core::{
eip_712_signed_message::EIP712SignedMessage,
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::Receipt,
};
use tokio::runtime::Runtime;

pub async fn create_and_sign_receipt(
allocation_id: Address,
Expand All @@ -33,6 +33,8 @@ pub async fn create_and_sign_receipt(
}

pub fn criterion_benchmark(c: &mut Criterion) {
let async_runtime = Runtime::new().unwrap();

let wallet = LocalWallet::new(&mut OsRng);

// Arbitrary values wrapped in black box to avoid compiler optimizing them out
Expand All @@ -49,7 +51,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
})
});

let receipt = block_on(create_and_sign_receipt(allocation_id, value, &wallet));
let receipt = async_runtime.block_on(create_and_sign_receipt(allocation_id, value, &wallet));

c.bench_function("Validate Receipt", |b| {
b.iter(|| {
Expand All @@ -63,7 +65,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {

for log_number_of_receipts in 10..30 {
let receipts = (0..2 ^ log_number_of_receipts)
.map(|_| block_on(create_and_sign_receipt(allocation_id, value, &wallet)))
.map(|_| async_runtime.block_on(create_and_sign_receipt(allocation_id, value, &wallet)))
.collect::<Vec<_>>();

rav_group.bench_function(
Expand All @@ -79,11 +81,13 @@ pub fn criterion_benchmark(c: &mut Criterion) {
},
);

let signed_rav = block_on(EIP712SignedMessage::new(
ReceiptAggregateVoucher::aggregate_receipts(allocation_id, &receipts, None).unwrap(),
&wallet,
))
.unwrap();
let signed_rav = async_runtime
.block_on(EIP712SignedMessage::new(
ReceiptAggregateVoucher::aggregate_receipts(allocation_id, &receipts, None)
.unwrap(),
&wallet,
))
.unwrap();

rav_group.bench_function(
&format!("Validate RAV w/ 2^{} receipt's", log_number_of_receipts),
Expand Down
11 changes: 8 additions & 3 deletions tap_core/src/adapters/collateral_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use ethereum_types::Address;

/// `CollateralAdapter` defines a trait for adapters to handle collateral related operations.
Expand All @@ -27,6 +28,7 @@ use ethereum_types::Address;
///
/// For example code see [crate::adapters::collateral_adapter_mock]

#[async_trait]
pub trait CollateralAdapter {
/// Defines the user-specified error type.
///
Expand All @@ -39,15 +41,18 @@ pub trait CollateralAdapter {
/// This method should be implemented to fetch the local accounting amount of available collateral for a
/// specified gateway from your system. Any errors that occur during this process should
/// be captured and returned as an `AdapterError`.
fn get_available_collateral(&self, gateway_id: Address) -> Result<u128, Self::AdapterError>;
async fn get_available_collateral(
&self,
gateway_id: Address,
) -> Result<u128, Self::AdapterError>;

/// Deducts a specified value from the local accounting of available collateral for a specified gateway.
///
/// This method should be implemented to deduct a specified value from the local accounting of
/// available collateral of a specified gateway in your system. Any errors that occur during this
/// process should be captured and returned as an `AdapterError`.
fn subtract_collateral(
&mut self,
async fn subtract_collateral(
&self,
gateway_id: Address,
value: u128,
) -> Result<(), Self::AdapterError>;
Expand Down
7 changes: 5 additions & 2 deletions tap_core/src/adapters/rav_storage_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;

use crate::tap_manager::SignedRAV;

/// `RAVStorageAdapter` defines a trait for storage adapters to handle `SignedRAV` data.
Expand All @@ -27,6 +29,7 @@ use crate::tap_manager::SignedRAV;
///
/// For example code see [crate::adapters::rav_storage_adapter_mock]

#[async_trait]
pub trait RAVStorageAdapter {
/// Defines the user-specified error type.
///
Expand All @@ -38,12 +41,12 @@ pub trait RAVStorageAdapter {
///
/// This method should be implemented to store the most recent validated `SignedRAV` into your chosen storage system.
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
fn update_last_rav(&mut self, rav: SignedRAV) -> Result<(), Self::AdapterError>;
async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>;

/// Retrieves the latest `SignedRAV` from the storage.
///
/// This method should be implemented to fetch the latest `SignedRAV` from your storage system.
/// If no `SignedRAV` is available, this method should return `None`.
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
fn last_rav(&self) -> Result<Option<SignedRAV>, Self::AdapterError>;
async fn last_rav(&self) -> Result<Option<SignedRAV>, Self::AdapterError>;
}
10 changes: 6 additions & 4 deletions tap_core/src/adapters/receipt_checks_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt};
use async_trait::async_trait;
use ethereum_types::Address;

/// `ReceiptChecksAdapter` defines a trait for adapters to handle checks related to TAP receipts.
Expand All @@ -27,28 +28,29 @@ use ethereum_types::Address;
///
/// For example code see [crate::adapters::receipt_checks_adapter_mock]

#[async_trait]
pub trait ReceiptChecksAdapter {
/// Checks if the given receipt is unique in the system.
///
/// This method should be implemented to verify the uniqueness of a given receipt in your system. Keep in mind that
/// the receipt likely will be in storage when this check is performed so the receipt id should be used to check
/// for uniqueness.
fn is_unique(&self, receipt: &EIP712SignedMessage<Receipt>, receipt_id: u64) -> bool;
async fn is_unique(&self, receipt: &EIP712SignedMessage<Receipt>, receipt_id: u64) -> bool;

/// Verifies if the allocation ID is valid.
///
/// This method should be implemented to validate the given allocation ID is a valid allocation for the indexer. Valid is defined as
/// an allocation ID that is owned by the indexer and still available for redeeming.
fn is_valid_allocation_id(&self, allocation_id: Address) -> bool;
async fn is_valid_allocation_id(&self, allocation_id: Address) -> bool;

/// Confirms the value of the receipt is valid for the given query ID.
///
/// This method should be implemented to confirm the validity of the given value for a specific query ID.
fn is_valid_value(&self, value: u128, query_id: u64) -> bool;
async fn is_valid_value(&self, value: u128, query_id: u64) -> bool;

/// Confirms the gateway ID is valid.
///
/// This method should be implemented to validate the given gateway ID is one associated with a gateway the indexer considers valid.
/// The provided gateway ID is the address of the gateway that is recovered from the signature of the receipt.
fn is_valid_gateway_id(&self, gateway_id: Address) -> bool;
async fn is_valid_gateway_id(&self, gateway_id: Address) -> bool;
}
15 changes: 9 additions & 6 deletions tap_core/src/adapters/receipt_storage_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use std::ops::RangeBounds;

use async_trait::async_trait;

use crate::tap_receipt::ReceivedReceipt;

/// `ReceiptStorageAdapter` defines a trait for storage adapters to manage `ReceivedReceipt` data.
Expand Down Expand Up @@ -35,6 +37,7 @@ use crate::tap_receipt::ReceivedReceipt;
///
/// For example code see [crate::adapters::receipt_storage_adapter_mock]

#[async_trait]
pub trait ReceiptStorageAdapter {
/// Defines the user-specified error type.
///
Expand All @@ -47,15 +50,15 @@ pub trait ReceiptStorageAdapter {
/// This method should be implemented to store a new `ReceivedReceipt` into your chosen storage system.
/// It returns a unique receipt_id associated with the stored receipt. Any errors that occur during
/// this process should be captured and returned as an `AdapterError`.
fn store_receipt(&mut self, receipt: ReceivedReceipt) -> Result<u64, Self::AdapterError>;
async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result<u64, Self::AdapterError>;

/// Retrieves all `ReceivedReceipts` within a specific timestamp range.
///
/// This method should be implemented to fetch all `ReceivedReceipts` within a specific timestamp range
/// from your storage system. The returned receipts should be in the form of a vector of tuples where
/// each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`.
/// Any errors that occur during this process should be captured and returned as an `AdapterError`.
fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64>>(
async fn retrieve_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
&self,
timestamp_range_ns: R,
) -> Result<Vec<(u64, ReceivedReceipt)>, Self::AdapterError>;
Expand All @@ -65,8 +68,8 @@ pub trait ReceiptStorageAdapter {
/// This method should be implemented to update a specific `ReceivedReceipt` identified by a unique
/// receipt_id in your storage system. Any errors that occur during this process should be captured
/// and returned as an `AdapterError`.
fn update_receipt_by_id(
&mut self,
async fn update_receipt_by_id(
&self,
receipt_id: u64,
receipt: ReceivedReceipt,
) -> Result<(), Self::AdapterError>;
Expand All @@ -76,8 +79,8 @@ pub trait ReceiptStorageAdapter {
/// This method should be implemented to remove all `ReceivedReceipts` within a specific timestamp
/// range from your storage system. Any errors that occur during this process should be captured and
/// returned as an `AdapterError`.
fn remove_receipts_in_timestamp_range<R: RangeBounds<u64>>(
&mut self,
async fn remove_receipts_in_timestamp_range<R: RangeBounds<u64> + std::marker::Send>(
&self,
timestamp_ns: R,
) -> Result<(), Self::AdapterError>;
}
37 changes: 20 additions & 17 deletions tap_core/src/adapters/test/collateral_adapter_mock.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use ethereum_types::Address;
use tokio::sync::RwLock;

use crate::adapters::collateral_adapter::CollateralAdapter;

Expand All @@ -27,8 +26,8 @@ impl CollateralAdapterMock {
gateway_collateral_storage,
}
}
pub fn collateral(&self, gateway_id: Address) -> Result<u128, AdpaterErrorMock> {
let gateway_collateral_storage = self.gateway_collateral_storage.read().unwrap();
pub async fn collateral(&self, gateway_id: Address) -> Result<u128, AdpaterErrorMock> {
let gateway_collateral_storage = self.gateway_collateral_storage.read().await;
if let Some(collateral) = gateway_collateral_storage.get(&gateway_id) {
return Ok(*collateral);
}
Expand All @@ -37,23 +36,23 @@ impl CollateralAdapterMock {
})
}

pub fn increase_collateral(&mut self, gateway_id: Address, value: u128) {
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap();
pub async fn increase_collateral(&mut self, gateway_id: Address, value: u128) {
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await;

if let Some(current_value) = gateway_collateral_storage.get(&gateway_id) {
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap();
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await;
gateway_collateral_storage.insert(gateway_id, current_value + value);
} else {
gateway_collateral_storage.insert(gateway_id, value);
}
}

pub fn reduce_collateral(
&mut self,
pub async fn reduce_collateral(
&self,
gateway_id: Address,
value: u128,
) -> Result<(), AdpaterErrorMock> {
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap();
let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await;

if let Some(current_value) = gateway_collateral_storage.get(&gateway_id) {
let checked_new_value = current_value.checked_sub(value);
Expand All @@ -68,16 +67,20 @@ impl CollateralAdapterMock {
}
}

#[async_trait]
impl CollateralAdapter for CollateralAdapterMock {
type AdapterError = AdpaterErrorMock;
fn get_available_collateral(&self, gateway_id: Address) -> Result<u128, Self::AdapterError> {
self.collateral(gateway_id)
async fn get_available_collateral(
&self,
gateway_id: Address,
) -> Result<u128, Self::AdapterError> {
self.collateral(gateway_id).await
}
fn subtract_collateral(
&mut self,
async fn subtract_collateral(
&self,
gateway_id: Address,
value: u128,
) -> Result<(), Self::AdapterError> {
self.reduce_collateral(gateway_id, value)
self.reduce_collateral(gateway_id, value).await
}
}
Loading

0 comments on commit 7be7da4

Please sign in to comment.