From 7be7da4388b2dc457962f4c5ac4db9ef749c1dfe Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Fri, 28 Jul 2023 15:55:46 -0700 Subject: [PATCH] refactor!(core): make it all async BREAKING CHANGE: all the traits are now async Signed-off-by: Alexis Asseman --- tap_core/Cargo.toml | 3 +- .../timeline_aggretion_protocol_benchmark.rs | 20 ++-- tap_core/src/adapters/collateral_adapter.rs | 11 +- tap_core/src/adapters/rav_storage_adapter.rs | 7 +- .../src/adapters/receipt_checks_adapter.rs | 10 +- .../src/adapters/receipt_storage_adapter.rs | 15 +-- .../adapters/test/collateral_adapter_mock.rs | 37 ++++--- .../adapters/test/collateral_adapter_test.rs | 20 ++-- .../adapters/test/rav_storage_adapter_mock.rs | 16 +-- .../adapters/test/rav_storage_adapter_test.rs | 15 +-- .../test/receipt_checks_adapter_mock.rs | 21 ++-- .../test/receipt_checks_adapter_test.rs | 31 ++++-- .../test/receipt_storage_adapter_mock.rs | 102 +++++++----------- .../test/receipt_storage_adapter_test.rs | 43 ++++++-- tap_core/src/lib.rs | 3 + tap_core/src/tap_manager/manager.rs | 52 +++++---- tap_core/src/tap_manager/test/manager_test.rs | 59 ++++++---- tap_core/src/tap_receipt/receipt_auditor.rs | 59 ++++++---- tap_core/src/tap_receipt/received_receipt.rs | 25 +++-- .../tests/received_receipt_tests.rs | 24 +++-- .../tests/indexer_mock/mod.rs | 84 +++++++-------- tap_integration_tests/tests/showcase.rs | 7 +- 22 files changed, 384 insertions(+), 280 deletions(-) diff --git a/tap_core/Cargo.toml b/tap_core/Cargo.toml index 627642c2..dffd1140 100644 --- a/tap_core/Cargo.toml +++ b/tap_core/Cargo.toml @@ -12,7 +12,6 @@ rand="0.8.5" thiserror="1.0.38" ethereum-types={version="0.14.1"} rstest = "0.17.0" -async-std = { version = "1.5", features = ["attributes"] } ethers = { version = "2.0.0", default-features = false } ethers-core = "2.0.0" ethers-contract = "2.0.0" @@ -21,6 +20,8 @@ anyhow = "1" strum = "0.24.1" strum_macros = "0.24.3" +async-trait = "0.1.72" +tokio = { version = "1.29.1", features = ["macros"] } [dev-dependencies] criterion = { version = "0.5", features = ["async_std"] } diff --git a/tap_core/benches/timeline_aggretion_protocol_benchmark.rs b/tap_core/benches/timeline_aggretion_protocol_benchmark.rs index cae9e88c..47c31af7 100644 --- a/tap_core/benches/timeline_aggretion_protocol_benchmark.rs +++ b/tap_core/benches/timeline_aggretion_protocol_benchmark.rs @@ -10,7 +10,6 @@ use std::str::FromStr; -use async_std::task::block_on; use criterion::async_executor::AsyncStdExecutor; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use ethereum_types::Address; @@ -21,6 +20,7 @@ use tap_core::{ eip_712_signed_message::EIP712SignedMessage, receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::Receipt, }; +use tokio::runtime::Runtime; pub async fn create_and_sign_receipt( allocation_id: Address, @@ -33,6 +33,8 @@ pub async fn create_and_sign_receipt( } pub fn criterion_benchmark(c: &mut Criterion) { + let async_runtime = Runtime::new().unwrap(); + let wallet = LocalWallet::new(&mut OsRng); // Arbitrary values wrapped in black box to avoid compiler optimizing them out @@ -49,7 +51,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { }) }); - let receipt = block_on(create_and_sign_receipt(allocation_id, value, &wallet)); + let receipt = async_runtime.block_on(create_and_sign_receipt(allocation_id, value, &wallet)); c.bench_function("Validate Receipt", |b| { b.iter(|| { @@ -63,7 +65,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { for log_number_of_receipts in 10..30 { let receipts = (0..2 ^ log_number_of_receipts) - .map(|_| block_on(create_and_sign_receipt(allocation_id, value, &wallet))) + .map(|_| async_runtime.block_on(create_and_sign_receipt(allocation_id, value, &wallet))) .collect::>(); rav_group.bench_function( @@ -79,11 +81,13 @@ pub fn criterion_benchmark(c: &mut Criterion) { }, ); - let signed_rav = block_on(EIP712SignedMessage::new( - ReceiptAggregateVoucher::aggregate_receipts(allocation_id, &receipts, None).unwrap(), - &wallet, - )) - .unwrap(); + let signed_rav = async_runtime + .block_on(EIP712SignedMessage::new( + ReceiptAggregateVoucher::aggregate_receipts(allocation_id, &receipts, None) + .unwrap(), + &wallet, + )) + .unwrap(); rav_group.bench_function( &format!("Validate RAV w/ 2^{} receipt's", log_number_of_receipts), diff --git a/tap_core/src/adapters/collateral_adapter.rs b/tap_core/src/adapters/collateral_adapter.rs index b8362e45..fc9f32f9 100644 --- a/tap_core/src/adapters/collateral_adapter.rs +++ b/tap_core/src/adapters/collateral_adapter.rs @@ -1,6 +1,7 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use async_trait::async_trait; use ethereum_types::Address; /// `CollateralAdapter` defines a trait for adapters to handle collateral related operations. @@ -27,6 +28,7 @@ use ethereum_types::Address; /// /// For example code see [crate::adapters::collateral_adapter_mock] +#[async_trait] pub trait CollateralAdapter { /// Defines the user-specified error type. /// @@ -39,15 +41,18 @@ pub trait CollateralAdapter { /// This method should be implemented to fetch the local accounting amount of available collateral for a /// specified gateway from your system. Any errors that occur during this process should /// be captured and returned as an `AdapterError`. - fn get_available_collateral(&self, gateway_id: Address) -> Result; + async fn get_available_collateral( + &self, + gateway_id: Address, + ) -> Result; /// Deducts a specified value from the local accounting of available collateral for a specified gateway. /// /// This method should be implemented to deduct a specified value from the local accounting of /// available collateral of a specified gateway in your system. Any errors that occur during this /// process should be captured and returned as an `AdapterError`. - fn subtract_collateral( - &mut self, + async fn subtract_collateral( + &self, gateway_id: Address, value: u128, ) -> Result<(), Self::AdapterError>; diff --git a/tap_core/src/adapters/rav_storage_adapter.rs b/tap_core/src/adapters/rav_storage_adapter.rs index 55e46340..9f917fcb 100644 --- a/tap_core/src/adapters/rav_storage_adapter.rs +++ b/tap_core/src/adapters/rav_storage_adapter.rs @@ -1,6 +1,8 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use async_trait::async_trait; + use crate::tap_manager::SignedRAV; /// `RAVStorageAdapter` defines a trait for storage adapters to handle `SignedRAV` data. @@ -27,6 +29,7 @@ use crate::tap_manager::SignedRAV; /// /// For example code see [crate::adapters::rav_storage_adapter_mock] +#[async_trait] pub trait RAVStorageAdapter { /// Defines the user-specified error type. /// @@ -38,12 +41,12 @@ pub trait RAVStorageAdapter { /// /// This method should be implemented to store the most recent validated `SignedRAV` into your chosen storage system. /// Any errors that occur during this process should be captured and returned as an `AdapterError`. - fn update_last_rav(&mut self, rav: SignedRAV) -> Result<(), Self::AdapterError>; + async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>; /// Retrieves the latest `SignedRAV` from the storage. /// /// This method should be implemented to fetch the latest `SignedRAV` from your storage system. /// If no `SignedRAV` is available, this method should return `None`. /// Any errors that occur during this process should be captured and returned as an `AdapterError`. - fn last_rav(&self) -> Result, Self::AdapterError>; + async fn last_rav(&self) -> Result, Self::AdapterError>; } diff --git a/tap_core/src/adapters/receipt_checks_adapter.rs b/tap_core/src/adapters/receipt_checks_adapter.rs index 56fd2abe..52f69b9b 100644 --- a/tap_core/src/adapters/receipt_checks_adapter.rs +++ b/tap_core/src/adapters/receipt_checks_adapter.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt}; +use async_trait::async_trait; use ethereum_types::Address; /// `ReceiptChecksAdapter` defines a trait for adapters to handle checks related to TAP receipts. @@ -27,28 +28,29 @@ use ethereum_types::Address; /// /// For example code see [crate::adapters::receipt_checks_adapter_mock] +#[async_trait] pub trait ReceiptChecksAdapter { /// Checks if the given receipt is unique in the system. /// /// This method should be implemented to verify the uniqueness of a given receipt in your system. Keep in mind that /// the receipt likely will be in storage when this check is performed so the receipt id should be used to check /// for uniqueness. - fn is_unique(&self, receipt: &EIP712SignedMessage, receipt_id: u64) -> bool; + async fn is_unique(&self, receipt: &EIP712SignedMessage, receipt_id: u64) -> bool; /// Verifies if the allocation ID is valid. /// /// This method should be implemented to validate the given allocation ID is a valid allocation for the indexer. Valid is defined as /// an allocation ID that is owned by the indexer and still available for redeeming. - fn is_valid_allocation_id(&self, allocation_id: Address) -> bool; + async fn is_valid_allocation_id(&self, allocation_id: Address) -> bool; /// Confirms the value of the receipt is valid for the given query ID. /// /// This method should be implemented to confirm the validity of the given value for a specific query ID. - fn is_valid_value(&self, value: u128, query_id: u64) -> bool; + async fn is_valid_value(&self, value: u128, query_id: u64) -> bool; /// Confirms the gateway ID is valid. /// /// This method should be implemented to validate the given gateway ID is one associated with a gateway the indexer considers valid. /// The provided gateway ID is the address of the gateway that is recovered from the signature of the receipt. - fn is_valid_gateway_id(&self, gateway_id: Address) -> bool; + async fn is_valid_gateway_id(&self, gateway_id: Address) -> bool; } diff --git a/tap_core/src/adapters/receipt_storage_adapter.rs b/tap_core/src/adapters/receipt_storage_adapter.rs index 6993e840..1e848434 100644 --- a/tap_core/src/adapters/receipt_storage_adapter.rs +++ b/tap_core/src/adapters/receipt_storage_adapter.rs @@ -3,6 +3,8 @@ use std::ops::RangeBounds; +use async_trait::async_trait; + use crate::tap_receipt::ReceivedReceipt; /// `ReceiptStorageAdapter` defines a trait for storage adapters to manage `ReceivedReceipt` data. @@ -35,6 +37,7 @@ use crate::tap_receipt::ReceivedReceipt; /// /// For example code see [crate::adapters::receipt_storage_adapter_mock] +#[async_trait] pub trait ReceiptStorageAdapter { /// Defines the user-specified error type. /// @@ -47,7 +50,7 @@ pub trait ReceiptStorageAdapter { /// This method should be implemented to store a new `ReceivedReceipt` into your chosen storage system. /// It returns a unique receipt_id associated with the stored receipt. Any errors that occur during /// this process should be captured and returned as an `AdapterError`. - fn store_receipt(&mut self, receipt: ReceivedReceipt) -> Result; + async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result; /// Retrieves all `ReceivedReceipts` within a specific timestamp range. /// @@ -55,7 +58,7 @@ pub trait ReceiptStorageAdapter { /// from your storage system. The returned receipts should be in the form of a vector of tuples where /// each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`. /// Any errors that occur during this process should be captured and returned as an `AdapterError`. - fn retrieve_receipts_in_timestamp_range>( + async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( &self, timestamp_range_ns: R, ) -> Result, Self::AdapterError>; @@ -65,8 +68,8 @@ pub trait ReceiptStorageAdapter { /// This method should be implemented to update a specific `ReceivedReceipt` identified by a unique /// receipt_id in your storage system. Any errors that occur during this process should be captured /// and returned as an `AdapterError`. - fn update_receipt_by_id( - &mut self, + async fn update_receipt_by_id( + &self, receipt_id: u64, receipt: ReceivedReceipt, ) -> Result<(), Self::AdapterError>; @@ -76,8 +79,8 @@ pub trait ReceiptStorageAdapter { /// This method should be implemented to remove all `ReceivedReceipts` within a specific timestamp /// range from your storage system. Any errors that occur during this process should be captured and /// returned as an `AdapterError`. - fn remove_receipts_in_timestamp_range>( - &mut self, + async fn remove_receipts_in_timestamp_range + std::marker::Send>( + &self, timestamp_ns: R, ) -> Result<(), Self::AdapterError>; } diff --git a/tap_core/src/adapters/test/collateral_adapter_mock.rs b/tap_core/src/adapters/test/collateral_adapter_mock.rs index 49f6a582..306efd03 100644 --- a/tap_core/src/adapters/test/collateral_adapter_mock.rs +++ b/tap_core/src/adapters/test/collateral_adapter_mock.rs @@ -1,12 +1,11 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; +use async_trait::async_trait; use ethereum_types::Address; +use tokio::sync::RwLock; use crate::adapters::collateral_adapter::CollateralAdapter; @@ -27,8 +26,8 @@ impl CollateralAdapterMock { gateway_collateral_storage, } } - pub fn collateral(&self, gateway_id: Address) -> Result { - let gateway_collateral_storage = self.gateway_collateral_storage.read().unwrap(); + pub async fn collateral(&self, gateway_id: Address) -> Result { + let gateway_collateral_storage = self.gateway_collateral_storage.read().await; if let Some(collateral) = gateway_collateral_storage.get(&gateway_id) { return Ok(*collateral); } @@ -37,23 +36,23 @@ impl CollateralAdapterMock { }) } - pub fn increase_collateral(&mut self, gateway_id: Address, value: u128) { - let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap(); + pub async fn increase_collateral(&mut self, gateway_id: Address, value: u128) { + let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await; if let Some(current_value) = gateway_collateral_storage.get(&gateway_id) { - let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap(); + let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await; gateway_collateral_storage.insert(gateway_id, current_value + value); } else { gateway_collateral_storage.insert(gateway_id, value); } } - pub fn reduce_collateral( - &mut self, + pub async fn reduce_collateral( + &self, gateway_id: Address, value: u128, ) -> Result<(), AdpaterErrorMock> { - let mut gateway_collateral_storage = self.gateway_collateral_storage.write().unwrap(); + let mut gateway_collateral_storage = self.gateway_collateral_storage.write().await; if let Some(current_value) = gateway_collateral_storage.get(&gateway_id) { let checked_new_value = current_value.checked_sub(value); @@ -68,16 +67,20 @@ impl CollateralAdapterMock { } } +#[async_trait] impl CollateralAdapter for CollateralAdapterMock { type AdapterError = AdpaterErrorMock; - fn get_available_collateral(&self, gateway_id: Address) -> Result { - self.collateral(gateway_id) + async fn get_available_collateral( + &self, + gateway_id: Address, + ) -> Result { + self.collateral(gateway_id).await } - fn subtract_collateral( - &mut self, + async fn subtract_collateral( + &self, gateway_id: Address, value: u128, ) -> Result<(), Self::AdapterError> { - self.reduce_collateral(gateway_id, value) + self.reduce_collateral(gateway_id, value).await } } diff --git a/tap_core/src/adapters/test/collateral_adapter_test.rs b/tap_core/src/adapters/test/collateral_adapter_test.rs index 0fc0937f..a1330387 100644 --- a/tap_core/src/adapters/test/collateral_adapter_test.rs +++ b/tap_core/src/adapters/test/collateral_adapter_test.rs @@ -3,20 +3,19 @@ #[cfg(test)] mod collateral_adapter_unit_test { - use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - }; + use std::{collections::HashMap, sync::Arc}; use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; use rstest::*; + use tokio::sync::RwLock; use crate::adapters::{ collateral_adapter::CollateralAdapter, collateral_adapter_mock::CollateralAdapterMock, }; #[rstest] - fn collateral_adapter_test() { + #[tokio::test] + async fn collateral_adapter_test() { let collateral_storage = Arc::new(RwLock::new(HashMap::new())); let mut collateral_adapter = CollateralAdapterMock::new(collateral_storage); @@ -36,15 +35,19 @@ mod collateral_adapter_unit_test { let initial_value = 500u128; - collateral_adapter.increase_collateral(gateway_id, initial_value); + collateral_adapter + .increase_collateral(gateway_id, initial_value) + .await; // Check that gateway exists and has valid value through adapter assert!(collateral_adapter .get_available_collateral(gateway_id) + .await .is_ok()); assert_eq!( collateral_adapter .get_available_collateral(gateway_id) + .await .unwrap(), initial_value ); @@ -52,13 +55,16 @@ mod collateral_adapter_unit_test { // Check that subtracting is valid for valid gateway, and results in expected value assert!(collateral_adapter .subtract_collateral(gateway_id, initial_value) + .await .is_ok()); assert!(collateral_adapter .get_available_collateral(gateway_id) + .await .is_ok()); assert_eq!( collateral_adapter .get_available_collateral(gateway_id) + .await .unwrap(), 0 ); @@ -66,11 +72,13 @@ mod collateral_adapter_unit_test { // Check that subtracting to negative collateral results in err assert!(collateral_adapter .subtract_collateral(gateway_id, initial_value) + .await .is_err()); // Check that accessing non initialized gateway results in err assert!(collateral_adapter .get_available_collateral(invalid_gateway_id) + .await .is_err()); } } diff --git a/tap_core/src/adapters/test/rav_storage_adapter_mock.rs b/tap_core/src/adapters/test/rav_storage_adapter_mock.rs index e64cca42..5c471b1d 100644 --- a/tap_core/src/adapters/test/rav_storage_adapter_mock.rs +++ b/tap_core/src/adapters/test/rav_storage_adapter_mock.rs @@ -1,9 +1,11 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +use async_trait::async_trait; use thiserror::Error; +use tokio::sync::RwLock; use crate::{adapters::rav_storage_adapter::RAVStorageAdapter, tap_manager::SignedRAV}; @@ -28,7 +30,8 @@ use crate::{adapters::rav_storage_adapter::RAVStorageAdapter, tap_manager::Signe /// `RAVStorageAdapterMock::new()`. Now, it can be used anywhere a `RAVStorageAdapter` is required. /// /// ```rust -/// use std::sync::{Arc, RwLock}; +/// use std::sync::{Arc}; +/// use tokio::sync::RwLock; /// use tap_core::{tap_manager::SignedRAV, adapters::rav_storage_adapter_mock::RAVStorageAdapterMock}; /// /// let rav_storage: Arc>> = Arc::new(RwLock::new(None)); @@ -51,15 +54,16 @@ pub enum AdpaterErrorMock { AdapterError { error: String }, } +#[async_trait] impl RAVStorageAdapter for RAVStorageAdapterMock { type AdapterError = AdpaterErrorMock; - fn update_last_rav(&mut self, rav: SignedRAV) -> Result<(), Self::AdapterError> { - let mut rav_storage = self.rav_storage.write().unwrap(); + async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> { + let mut rav_storage = self.rav_storage.write().await; *rav_storage = Some(rav); Ok(()) } - fn last_rav(&self) -> Result, Self::AdapterError> { - Ok(self.rav_storage.read().unwrap().clone()) + async fn last_rav(&self) -> Result, Self::AdapterError> { + Ok(self.rav_storage.read().await.clone()) } } diff --git a/tap_core/src/adapters/test/rav_storage_adapter_test.rs b/tap_core/src/adapters/test/rav_storage_adapter_test.rs index b840a5d6..d6e3b007 100644 --- a/tap_core/src/adapters/test/rav_storage_adapter_test.rs +++ b/tap_core/src/adapters/test/rav_storage_adapter_test.rs @@ -3,15 +3,13 @@ #[cfg(test)] mod rav_storage_adapter_unit_test { - use std::{ - str::FromStr, - sync::{Arc, RwLock}, - }; + use std::{str::FromStr, sync::Arc}; use ethereum_types::Address; use ethers::signers::coins_bip39::English; use ethers::signers::{LocalWallet, MnemonicBuilder}; use rstest::*; + use tokio::sync::RwLock; use crate::adapters::{ rav_storage_adapter::RAVStorageAdapter, rav_storage_adapter_mock::RAVStorageAdapterMock, @@ -22,9 +20,10 @@ mod rav_storage_adapter_unit_test { }; #[rstest] + #[tokio::test] async fn rav_storage_adapter_test() { let rav_storage = Arc::new(RwLock::new(None)); - let mut rav_storage_adapter = RAVStorageAdapterMock::new(rav_storage); + let rav_storage_adapter = RAVStorageAdapterMock::new(rav_storage); let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") @@ -53,10 +52,11 @@ mod rav_storage_adapter_unit_test { rav_storage_adapter .update_last_rav(signed_rav.clone()) + .await .unwrap(); // Retreive rav - let retrieved_rav = rav_storage_adapter.last_rav(); + let retrieved_rav = rav_storage_adapter.last_rav().await; assert!(retrieved_rav.unwrap().unwrap() == signed_rav); // Testing the last rav update... @@ -81,10 +81,11 @@ mod rav_storage_adapter_unit_test { // Update the last rav rav_storage_adapter .update_last_rav(signed_rav.clone()) + .await .unwrap(); // Retreive rav - let retrieved_rav = rav_storage_adapter.last_rav(); + let retrieved_rav = rav_storage_adapter.last_rav().await; assert!(retrieved_rav.unwrap().unwrap() == signed_rav); } } diff --git a/tap_core/src/adapters/test/receipt_checks_adapter_mock.rs b/tap_core/src/adapters/test/receipt_checks_adapter_mock.rs index 9236934a..00affcaf 100644 --- a/tap_core/src/adapters/test/receipt_checks_adapter_mock.rs +++ b/tap_core/src/adapters/test/receipt_checks_adapter_mock.rs @@ -3,10 +3,12 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, + sync::Arc, }; +use async_trait::async_trait; use ethereum_types::Address; +use tokio::sync::RwLock; use crate::{ adapters::receipt_checks_adapter::ReceiptChecksAdapter, @@ -37,9 +39,10 @@ impl ReceiptChecksAdapterMock { } } +#[async_trait] impl ReceiptChecksAdapter for ReceiptChecksAdapterMock { - fn is_unique(&self, receipt: &EIP712SignedMessage, receipt_id: u64) -> bool { - let receipt_storage = self.receipt_storage.read().unwrap(); + async fn is_unique(&self, receipt: &EIP712SignedMessage, receipt_id: u64) -> bool { + let receipt_storage = self.receipt_storage.read().await; receipt_storage .iter() .all(|(stored_receipt_id, stored_receipt)| { @@ -48,13 +51,13 @@ impl ReceiptChecksAdapter for ReceiptChecksAdapterMock { }) } - fn is_valid_allocation_id(&self, allocation_id: Address) -> bool { - let allocation_ids = self.allocation_ids.read().unwrap(); + async fn is_valid_allocation_id(&self, allocation_id: Address) -> bool { + let allocation_ids = self.allocation_ids.read().await; allocation_ids.contains(&allocation_id) } - fn is_valid_value(&self, value: u128, query_id: u64) -> bool { - let query_appraisals = self.query_appraisals.read().unwrap(); + async fn is_valid_value(&self, value: u128, query_id: u64) -> bool { + let query_appraisals = self.query_appraisals.read().await; let appraised_value = query_appraisals.get(&query_id).unwrap(); if value != *appraised_value { @@ -63,8 +66,8 @@ impl ReceiptChecksAdapter for ReceiptChecksAdapterMock { true } - fn is_valid_gateway_id(&self, gateway_id: Address) -> bool { - let gateway_ids = self.gateway_ids.read().unwrap(); + async fn is_valid_gateway_id(&self, gateway_id: Address) -> bool { + let gateway_ids = self.gateway_ids.read().await; gateway_ids.contains(&gateway_id) } } diff --git a/tap_core/src/adapters/test/receipt_checks_adapter_test.rs b/tap_core/src/adapters/test/receipt_checks_adapter_test.rs index f4a56895..aab6203d 100644 --- a/tap_core/src/adapters/test/receipt_checks_adapter_test.rs +++ b/tap_core/src/adapters/test/receipt_checks_adapter_test.rs @@ -6,13 +6,14 @@ mod receipt_checks_adapter_unit_test { use std::{ collections::{HashMap, HashSet}, str::FromStr, - sync::{Arc, RwLock}, + sync::Arc, }; use ethereum_types::Address; use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder}; use futures::{stream, StreamExt}; use rstest::*; + use tokio::sync::RwLock; use crate::{ adapters::{ @@ -24,6 +25,7 @@ mod receipt_checks_adapter_unit_test { }; #[rstest] + #[tokio::test] async fn receipt_checks_adapter_test() { let gateway_ids = [ Address::from_str("0xfbfbfbfbfbfbfbfbfbfbfbfbfbfbfbfbfbfbfbfb").unwrap(), @@ -93,17 +95,28 @@ mod receipt_checks_adapter_unit_test { let unique_receipt_id = 0u64; receipt_storage .write() - .unwrap() + .await .insert(unique_receipt_id, new_receipt.1.clone()); - assert!(receipt_checks_adapter.is_unique(&new_receipt.1.signed_receipt, unique_receipt_id)); - assert!(receipt_checks_adapter - .is_valid_allocation_id(new_receipt.1.signed_receipt.message.allocation_id)); + assert!( + receipt_checks_adapter + .is_unique(&new_receipt.1.signed_receipt, unique_receipt_id) + .await + ); + assert!( + receipt_checks_adapter + .is_valid_allocation_id(new_receipt.1.signed_receipt.message.allocation_id) + .await + ); // TODO: Add check when gateway_id is available from received receipt (issue: #56) // assert!(receipt_checks_adapter.is_valid_gateway_id(gateway_id)); - assert!(receipt_checks_adapter.is_valid_value( - new_receipt.1.signed_receipt.message.value, - new_receipt.1.query_id - )); + assert!( + receipt_checks_adapter + .is_valid_value( + new_receipt.1.signed_receipt.message.value, + new_receipt.1.query_id + ) + .await + ); } } diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs b/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs index 943e34b5..d4fc033a 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs @@ -1,11 +1,10 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{ - collections::HashMap, - ops::RangeBounds, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, ops::RangeBounds, sync::Arc}; + +use async_trait::async_trait; +use tokio::sync::RwLock; use crate::{ adapters::receipt_storage_adapter::ReceiptStorageAdapter, tap_receipt::ReceivedReceipt, @@ -13,26 +12,21 @@ use crate::{ pub struct ReceiptStorageAdapterMock { receipt_storage: Arc>>, - unique_id: u64, + unique_id: RwLock, } impl ReceiptStorageAdapterMock { pub fn new(receipt_storage: Arc>>) -> Self { Self { receipt_storage, - unique_id: 0u64, + unique_id: RwLock::new(0u64), } } - pub fn retrieve_receipt_by_id( + pub async fn retrieve_receipt_by_id( &self, receipt_id: u64, ) -> Result { - let receipt_storage = - self.receipt_storage - .read() - .map_err(|e| AdapterErrorMock::AdapterError { - error: e.to_string(), - })?; + let receipt_storage = self.receipt_storage.read().await; receipt_storage .get(&receipt_id) @@ -41,16 +35,11 @@ impl ReceiptStorageAdapterMock { error: "No receipt found with ID".to_owned(), }) } - pub fn retrieve_receipts_by_timestamp( + pub async fn retrieve_receipts_by_timestamp( &self, timestamp_ns: u64, ) -> Result, AdapterErrorMock> { - let receipt_storage = - self.receipt_storage - .read() - .map_err(|e| AdapterErrorMock::AdapterError { - error: e.to_string(), - })?; + let receipt_storage = self.receipt_storage.read().await; Ok(receipt_storage .iter() .filter(|(_, rx_receipt)| { @@ -59,19 +48,15 @@ impl ReceiptStorageAdapterMock { .map(|(&id, rx_receipt)| (id, rx_receipt.clone())) .collect()) } - pub fn retrieve_receipts_upto_timestamp( + pub async fn retrieve_receipts_upto_timestamp( &self, timestamp_ns: u64, ) -> Result, AdapterErrorMock> { self.retrieve_receipts_in_timestamp_range(..=timestamp_ns) + .await } - pub fn remove_receipt_by_id(&mut self, receipt_id: u64) -> Result<(), AdapterErrorMock> { - let mut receipt_storage = - self.receipt_storage - .write() - .map_err(|e| AdapterErrorMock::AdapterError { - error: e.to_string(), - })?; + pub async fn remove_receipt_by_id(&mut self, receipt_id: u64) -> Result<(), AdapterErrorMock> { + let mut receipt_storage = self.receipt_storage.write().await; receipt_storage .remove(&receipt_id) .map(|_| ()) @@ -79,9 +64,12 @@ impl ReceiptStorageAdapterMock { error: "No receipt found with ID".to_owned(), }) } - pub fn remove_receipts_by_ids(&mut self, receipt_ids: &[u64]) -> Result<(), AdapterErrorMock> { + pub async fn remove_receipts_by_ids( + &mut self, + receipt_ids: &[u64], + ) -> Result<(), AdapterErrorMock> { for receipt_id in receipt_ids { - self.remove_receipt_by_id(*receipt_id)?; + self.remove_receipt_by_id(*receipt_id).await?; } Ok(()) } @@ -94,30 +82,22 @@ pub enum AdapterErrorMock { AdapterError { error: String }, } +#[async_trait] impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { type AdapterError = AdapterErrorMock; - fn store_receipt(&mut self, receipt: ReceivedReceipt) -> Result { - let id = self.unique_id; - let mut receipt_storage = - self.receipt_storage - .write() - .map_err(|e| Self::AdapterError::AdapterError { - error: e.to_string(), - })?; - receipt_storage.insert(id, receipt); - self.unique_id += 1; - Ok(id) + async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result { + let mut id_pointer = self.unique_id.write().await; + let id_previous = *id_pointer; + let mut receipt_storage = self.receipt_storage.write().await; + receipt_storage.insert(*id_pointer, receipt); + *id_pointer += 1; + Ok(id_previous) } - fn retrieve_receipts_in_timestamp_range>( + async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( &self, timestamp_range_ns: R, ) -> Result, Self::AdapterError> { - let receipt_storage = - self.receipt_storage - .read() - .map_err(|e| Self::AdapterError::AdapterError { - error: e.to_string(), - })?; + let receipt_storage = self.receipt_storage.read().await; Ok(receipt_storage .iter() .filter(|(_, rx_receipt)| { @@ -126,17 +106,12 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { .map(|(&id, rx_receipt)| (id, rx_receipt.clone())) .collect()) } - fn update_receipt_by_id( - &mut self, + async fn update_receipt_by_id( + &self, receipt_id: u64, receipt: ReceivedReceipt, ) -> Result<(), Self::AdapterError> { - let mut receipt_storage = - self.receipt_storage - .write() - .map_err(|e| Self::AdapterError::AdapterError { - error: e.to_string(), - })?; + let mut receipt_storage = self.receipt_storage.write().await; if !receipt_storage.contains_key(&receipt_id) { return Err(AdapterErrorMock::AdapterError { @@ -145,19 +120,14 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { }; receipt_storage.insert(receipt_id, receipt); - self.unique_id += 1; + *self.unique_id.write().await += 1; Ok(()) } - fn remove_receipts_in_timestamp_range>( - &mut self, + async fn remove_receipts_in_timestamp_range + std::marker::Send>( + &self, timestamp_ns: R, ) -> Result<(), Self::AdapterError> { - let mut receipt_storage = - self.receipt_storage - .write() - .map_err(|e| Self::AdapterError::AdapterError { - error: e.to_string(), - })?; + let mut receipt_storage = self.receipt_storage.write().await; receipt_storage.retain(|_, rx_receipt| { !timestamp_ns.contains(&rx_receipt.signed_receipt.message.timestamp_ns) }); diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs index 6406b545..78c61f82 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs @@ -5,12 +5,13 @@ mod receipt_storage_adapter_unit_test { use std::collections::HashMap; use std::str::FromStr; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use ethereum_types::Address; use ethers::signers::coins_bip39::English; use ethers::signers::{LocalWallet, MnemonicBuilder}; use rstest::*; + use tokio::sync::RwLock; use crate::adapters::{ receipt_storage_adapter::ReceiptStorageAdapter, @@ -22,6 +23,7 @@ mod receipt_storage_adapter_unit_test { }; #[rstest] + #[tokio::test] async fn receipt_adapter_test() { let receipt_storage = Arc::new(RwLock::new(HashMap::new())); let mut receipt_adapter = ReceiptStorageAdapterMock::new(receipt_storage); @@ -45,28 +47,41 @@ mod receipt_storage_adapter_unit_test { &get_full_list_of_checks(), ); - let receipt_store_result = receipt_adapter.store_receipt(received_receipt); + let receipt_store_result = receipt_adapter.store_receipt(received_receipt).await; assert!(receipt_store_result.is_ok()); let receipt_id = receipt_store_result.unwrap(); // Retreive receipt with id expected to be valid - assert!(receipt_adapter.retrieve_receipt_by_id(receipt_id).is_ok()); + assert!(receipt_adapter + .retrieve_receipt_by_id(receipt_id) + .await + .is_ok()); // Retreive receipt with arbitrary id expected to be invalid - assert!(receipt_adapter.retrieve_receipt_by_id(999).is_err()); + assert!(receipt_adapter.retrieve_receipt_by_id(999).await.is_err()); // Remove receipt with id expected to be valid - assert!(receipt_adapter.remove_receipt_by_id(receipt_id).is_ok()); + assert!(receipt_adapter + .remove_receipt_by_id(receipt_id) + .await + .is_ok()); // Remove receipt with arbitrary id expected to be invalid - assert!(receipt_adapter.remove_receipt_by_id(999).is_err()); + assert!(receipt_adapter.remove_receipt_by_id(999).await.is_err()); // Retreive receipt that was removed previously - assert!(receipt_adapter.retrieve_receipt_by_id(receipt_id).is_err()); + assert!(receipt_adapter + .retrieve_receipt_by_id(receipt_id) + .await + .is_err()); // Remove receipt that was removed previously - assert!(receipt_adapter.remove_receipt_by_id(receipt_id).is_err()); + assert!(receipt_adapter + .remove_receipt_by_id(receipt_id) + .await + .is_err()); } #[rstest] + #[tokio::test] async fn multi_receipt_adapter_test() { let receipt_storage = Arc::new(RwLock::new(HashMap::new())); let mut receipt_adapter = ReceiptStorageAdapterMock::new(receipt_storage); @@ -96,6 +111,7 @@ mod receipt_storage_adapter_unit_test { receipt_ids.push( receipt_adapter .store_receipt(received_receipt.clone()) + .await .unwrap(), ); receipt_timestamps.push(received_receipt.signed_receipt.message.timestamp_ns) @@ -104,19 +120,23 @@ mod receipt_storage_adapter_unit_test { // Retreive receipts with timestamp assert!(receipt_adapter .retrieve_receipts_by_timestamp(receipt_timestamps[0]) + .await .is_ok()); assert!(!receipt_adapter .retrieve_receipts_by_timestamp(receipt_timestamps[0]) + .await .unwrap() .is_empty()); // Retreive receipts before timestamp assert!(receipt_adapter .retrieve_receipts_upto_timestamp(receipt_timestamps[3]) + .await .is_ok()); assert!( receipt_adapter .retrieve_receipts_upto_timestamp(receipt_timestamps[3]) + .await .unwrap() .len() >= 4 @@ -125,14 +145,19 @@ mod receipt_storage_adapter_unit_test { // Remove all receipts with one call assert!(receipt_adapter .remove_receipts_by_ids(receipt_ids.as_slice()) + .await .is_ok()); // Removal should no longer be valid assert!(receipt_adapter .remove_receipts_by_ids(receipt_ids.as_slice()) + .await .is_err()); // Retrieval should be invalid for receipt_id in receipt_ids { - assert!(receipt_adapter.retrieve_receipt_by_id(receipt_id).is_err()); + assert!(receipt_adapter + .retrieve_receipt_by_id(receipt_id) + .await + .is_err()); } } } diff --git a/tap_core/src/lib.rs b/tap_core/src/lib.rs index 139f2f50..211d936d 100644 --- a/tap_core/src/lib.rs +++ b/tap_core/src/lib.rs @@ -64,6 +64,7 @@ mod tap_tests { #[rstest] #[case::basic_rav_test (vec![45,56,34,23])] #[case::rav_from_zero_valued_receipts (vec![0,0,0,0])] + #[tokio::test] async fn signed_rav_is_valid_with_no_previous_rav( keys: (LocalWallet, Address), allocation_ids: Vec
, @@ -90,6 +91,7 @@ mod tap_tests { #[rstest] #[case::basic_rav_test(vec![45,56,34,23])] #[case::rav_from_zero_valued_receipts(vec![0,0,0,0])] + #[tokio::test] async fn signed_rav_is_valid_with_previous_rav( keys: (LocalWallet, Address), allocation_ids: Vec
, @@ -127,6 +129,7 @@ mod tap_tests { } #[rstest] + #[tokio::test] async fn verify_signature(keys: (LocalWallet, Address), allocation_ids: Vec
) { let signed_message = EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0) diff --git a/tap_core/src/tap_manager/manager.rs b/tap_core/src/tap_manager/manager.rs index aff047e8..15702805 100644 --- a/tap_core/src/tap_manager/manager.rs +++ b/tap_core/src/tap_manager/manager.rs @@ -73,8 +73,8 @@ impl< /// /// Returns [`Error::InvalidCheckError`] if check in `initial_checks` is not in `required_checks` provided when manager was created /// - pub fn verify_and_store_receipt( - &mut self, + pub async fn verify_and_store_receipt( + &self, signed_receipt: SignedReceipt, query_id: u64, initial_checks: Vec, @@ -88,18 +88,18 @@ impl< let receipt_id = self .receipt_storage_adapter .store_receipt(received_receipt.clone()) + .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), })?; - received_receipt.perform_checks( - initial_checks.as_slice(), - receipt_id, - &mut self.receipt_auditor, - )?; + received_receipt + .perform_checks(initial_checks.as_slice(), receipt_id, &self.receipt_auditor) + .await?; self.receipt_storage_adapter .update_receipt_by_id(receipt_id, received_receipt) + .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), })?; @@ -112,12 +112,14 @@ impl< /// /// Returns [`Error::AdapterError`] if there are any errors while storing RAV /// - pub fn verify_and_store_rav( - &mut self, + pub async fn verify_and_store_rav( + &self, expected_rav: ReceiptAggregateVoucher, signed_rav: SignedRAV, ) -> std::result::Result<(), Error> { - self.receipt_auditor.check_rav_signature(&signed_rav)?; + self.receipt_auditor + .check_rav_signature(&signed_rav) + .await?; if signed_rav.message != expected_rav { return Err(Error::InvalidReceivedRAV { @@ -128,6 +130,7 @@ impl< self.rav_storage_adapter .update_last_rav(signed_rav) + .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), })?; @@ -144,11 +147,12 @@ impl< /// /// Returns [`Error::AdapterError`] if there are any errors while retrieving last RAV or removing receipts /// - pub fn remove_obsolete_receipts(&mut self) -> Result<(), Error> { - match self.get_previous_rav()? { + pub async fn remove_obsolete_receipts(&mut self) -> Result<(), Error> { + match self.get_previous_rav().await? { Some(last_rav) => { self.receipt_storage_adapter .remove_receipts_in_timestamp_range(..=last_rav.message.timestamp_ns) + .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), })?; @@ -168,20 +172,22 @@ impl< /// /// Returns [`Error::TimestampRangeError`] if the max timestamp of the previous RAV is greater than the min timestamp. Caused by timestamp buffer being too large, or requests coming too soon. /// - pub fn create_rav_request(&mut self, timestamp_buffer_ns: u64) -> Result { - let previous_rav = self.get_previous_rav()?; + pub async fn create_rav_request(&self, timestamp_buffer_ns: u64) -> Result { + let previous_rav = self.get_previous_rav().await?; let min_timestamp_ns = previous_rav .as_ref() .map(|rav| rav.message.timestamp_ns + 1) .unwrap_or(0); - let (valid_receipts, invalid_receipts) = - self.collect_receipts(timestamp_buffer_ns, min_timestamp_ns)?; + let (valid_receipts, invalid_receipts) = self + .collect_receipts(timestamp_buffer_ns, min_timestamp_ns) + .await?; let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone())?; self.receipt_auditor - .update_min_timestamp_ns(expected_rav.timestamp_ns); + .update_min_timestamp_ns(expected_rav.timestamp_ns) + .await; Ok(RAVRequest { valid_receipts, @@ -191,18 +197,19 @@ impl< }) } - fn get_previous_rav(&self) -> Result, Error> { + async fn get_previous_rav(&self) -> Result, Error> { let previous_rav = self.rav_storage_adapter .last_rav() + .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), })?; Ok(previous_rav) } - fn collect_receipts( - &mut self, + async fn collect_receipts( + &self, timestamp_buffer_ns: u64, min_timestamp_ns: u64, ) -> Result<(Vec, Vec), Error> { @@ -217,6 +224,7 @@ impl< let received_receipts = self .receipt_storage_adapter .retrieve_receipts_in_timestamp_range(min_timestamp_ns..max_timestamp_ns) + .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), })?; @@ -225,7 +233,9 @@ impl< let mut failed_signed_receipts = Vec::::new(); for (receipt_id, mut received_receipt) in received_receipts { - received_receipt.finalize_receipt_checks(receipt_id, &mut self.receipt_auditor)?; + received_receipt + .finalize_receipt_checks(receipt_id, &self.receipt_auditor) + .await?; if received_receipt.is_accepted() { accepted_signed_receipts.push(received_receipt.signed_receipt); } else { diff --git a/tap_core/src/tap_manager/test/manager_test.rs b/tap_core/src/tap_manager/test/manager_test.rs index 9b0027cc..6ae0712d 100644 --- a/tap_core/src/tap_manager/test/manager_test.rs +++ b/tap_core/src/tap_manager/test/manager_test.rs @@ -6,12 +6,13 @@ mod manager_unit_test { use std::{ collections::{HashMap, HashSet}, str::FromStr, - sync::{Arc, RwLock}, + sync::Arc, }; use ethereum_types::Address; use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; use rstest::*; + use tokio::sync::RwLock; use super::super::Manager; use crate::{ @@ -103,6 +104,7 @@ mod manager_unit_test { #[case::full_checks(get_full_list_of_checks())] #[case::partial_checks(vec![ReceiptCheck::CheckSignature])] #[case::no_checks(Vec::::new())] + #[tokio::test] async fn manager_verify_and_store_varying_initial_checks( rav_storage_adapter: RAVStorageAdapterMock, collateral_adapters: (CollateralAdapterMock, Arc>>), @@ -121,7 +123,7 @@ mod manager_unit_test { // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let mut manager = Manager::new( + let manager = Manager::new( collateral_adapter, receipt_checks_adapter, rav_storage_adapter, @@ -138,12 +140,13 @@ mod manager_unit_test { .unwrap(); query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, value); - collateral_storage.write().unwrap().insert(keys.1, 999999); + collateral_storage.write().await.insert(keys.1, 999999); assert!(manager .verify_and_store_receipt(signed_receipt, query_id, initial_checks) + .await .is_ok()); } @@ -151,6 +154,7 @@ mod manager_unit_test { #[case::full_checks(get_full_list_of_checks())] #[case::partial_checks(vec![ReceiptCheck::CheckSignature])] #[case::no_checks(Vec::::new())] + #[tokio::test] async fn manager_create_rav_request_all_valid_receipts( rav_storage_adapter: RAVStorageAdapterMock, collateral_adapters: (CollateralAdapterMock, Arc>>), @@ -169,7 +173,7 @@ mod manager_unit_test { // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let mut manager = Manager::new( + let manager = Manager::new( collateral_adapter, receipt_checks_adapter, rav_storage_adapter, @@ -177,7 +181,7 @@ mod manager_unit_test { get_full_list_of_checks(), starting_min_timestamp, ); - collateral_storage.write().unwrap().insert(keys.1, 999999); + collateral_storage.write().await.insert(keys.1, 999999); let mut stored_signed_receipts = Vec::new(); for query_id in 0..10 { @@ -189,13 +193,14 @@ mod manager_unit_test { stored_signed_receipts.push(signed_receipt.clone()); query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, value); assert!(manager .verify_and_store_receipt(signed_receipt, query_id, initial_checks.clone()) + .await .is_ok()); } - let rav_request_result = manager.create_rav_request(0); + let rav_request_result = manager.create_rav_request(0).await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -212,6 +217,7 @@ mod manager_unit_test { .unwrap(); assert!(manager .verify_and_store_rav(rav_request.expected_rav, signed_rav) + .await .is_ok()); } @@ -219,6 +225,7 @@ mod manager_unit_test { #[case::full_checks(get_full_list_of_checks())] #[case::partial_checks(vec![ReceiptCheck::CheckSignature])] #[case::no_checks(Vec::::new())] + #[tokio::test] async fn manager_create_multiple_rav_requests_all_valid_receipts( rav_storage_adapter: RAVStorageAdapterMock, collateral_adapters: (CollateralAdapterMock, Arc>>), @@ -237,7 +244,7 @@ mod manager_unit_test { // give receipt 5 second variance for min start time let starting_min_timestamp = get_current_timestamp_u64_ns().unwrap() - 500000000; - let mut manager = Manager::new( + let manager = Manager::new( collateral_adapter, receipt_checks_adapter, rav_storage_adapter, @@ -246,7 +253,7 @@ mod manager_unit_test { starting_min_timestamp, ); - collateral_storage.write().unwrap().insert(keys.1, 999999); + collateral_storage.write().await.insert(keys.1, 999999); let mut stored_signed_receipts = Vec::new(); let mut expected_accumulated_value = 0; @@ -259,14 +266,15 @@ mod manager_unit_test { stored_signed_receipts.push(signed_receipt.clone()); query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, value); assert!(manager .verify_and_store_receipt(signed_receipt, query_id, initial_checks.clone()) + .await .is_ok()); expected_accumulated_value += value; } - let rav_request_result = manager.create_rav_request(0); + let rav_request_result = manager.create_rav_request(0).await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -290,6 +298,7 @@ mod manager_unit_test { .unwrap(); assert!(manager .verify_and_store_rav(rav_request.expected_rav, signed_rav) + .await .is_ok()); stored_signed_receipts.clear(); @@ -302,14 +311,15 @@ mod manager_unit_test { stored_signed_receipts.push(signed_receipt.clone()); query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, value); assert!(manager .verify_and_store_receipt(signed_receipt, query_id, initial_checks.clone()) + .await .is_ok()); expected_accumulated_value += value; } - let rav_request_result = manager.create_rav_request(0); + let rav_request_result = manager.create_rav_request(0).await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -333,10 +343,12 @@ mod manager_unit_test { .unwrap(); assert!(manager .verify_and_store_rav(rav_request.expected_rav, signed_rav) + .await .is_ok()); } #[rstest] + #[tokio::test] async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_timestamps( rav_storage_adapter: RAVStorageAdapterMock, collateral_adapters: (CollateralAdapterMock, Arc>>), @@ -366,7 +378,7 @@ mod manager_unit_test { starting_min_timestamp, ); - collateral_storage.write().unwrap().insert(keys.1, 999999); + collateral_storage.write().await.insert(keys.1, 999999); let mut stored_signed_receipts = Vec::new(); let mut expected_accumulated_value = 0; @@ -378,10 +390,11 @@ mod manager_unit_test { stored_signed_receipts.push(signed_receipt.clone()); query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, value); assert!(manager .verify_and_store_receipt(signed_receipt, query_id, initial_checks.clone()) + .await .is_ok()); expected_accumulated_value += value; } @@ -389,10 +402,10 @@ mod manager_unit_test { // Remove old receipts if requested // This shouldn't do anything since there has been no rav created yet if remove_old_receipts { - manager.remove_obsolete_receipts().unwrap(); + manager.remove_obsolete_receipts().await.unwrap(); } - let rav_request_1_result = manager.create_rav_request(0); + let rav_request_1_result = manager.create_rav_request(0).await; assert!(rav_request_1_result.is_ok()); let rav_request_1 = rav_request_1_result.unwrap(); @@ -416,6 +429,7 @@ mod manager_unit_test { .unwrap(); assert!(manager .verify_and_store_rav(rav_request_1.expected_rav, signed_rav_1) + .await .is_ok()); stored_signed_receipts.clear(); @@ -427,29 +441,31 @@ mod manager_unit_test { stored_signed_receipts.push(signed_receipt.clone()); query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, value); assert!(manager .verify_and_store_receipt(signed_receipt, query_id, initial_checks.clone()) + .await .is_ok()); expected_accumulated_value += value; } // Remove old receipts if requested if remove_old_receipts { - manager.remove_obsolete_receipts().unwrap(); + manager.remove_obsolete_receipts().await.unwrap(); // We expect to have 10 receipts left in receipt storage assert_eq!( manager .receipt_storage_adapter .retrieve_receipts_in_timestamp_range(..) + .await .unwrap() .len(), 10 ); } - let rav_request_2_result = manager.create_rav_request(0); + let rav_request_2_result = manager.create_rav_request(0).await; assert!(rav_request_2_result.is_ok()); let rav_request_2 = rav_request_2_result.unwrap(); @@ -473,6 +489,7 @@ mod manager_unit_test { .unwrap(); assert!(manager .verify_and_store_rav(rav_request_2.expected_rav, signed_rav_2) + .await .is_ok()); } } diff --git a/tap_core/src/tap_receipt/receipt_auditor.rs b/tap_core/src/tap_receipt/receipt_auditor.rs index b5eb7f37..87c4ac33 100644 --- a/tap_core/src/tap_receipt/receipt_auditor.rs +++ b/tap_core/src/tap_receipt/receipt_auditor.rs @@ -1,6 +1,8 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use tokio::sync::RwLock; + use crate::{ adapters::{ collateral_adapter::CollateralAdapter, receipt_checks_adapter::ReceiptChecksAdapter, @@ -14,7 +16,7 @@ use crate::{ pub struct ReceiptAuditor { collateral_adapter: CA, receipt_checks_adapter: RCA, - min_timestamp_ns: u64, + min_timestamp_ns: RwLock, } impl ReceiptAuditor { @@ -26,35 +28,35 @@ impl ReceiptAuditor { Self { collateral_adapter, receipt_checks_adapter, - min_timestamp_ns: starting_min_timestamp_ns, + min_timestamp_ns: RwLock::new(starting_min_timestamp_ns), } } /// Updates the minimum timestamp that will be accepted for a receipt (exclusive). - pub fn update_min_timestamp_ns(&mut self, min_timestamp_ns: u64) { - self.min_timestamp_ns = min_timestamp_ns; + pub async fn update_min_timestamp_ns(&self, min_timestamp_ns: u64) { + *self.min_timestamp_ns.write().await = min_timestamp_ns; } - pub fn check( - &mut self, + pub async fn check( + &self, receipt_check: &ReceiptCheck, signed_receipt: &EIP712SignedMessage, query_id: u64, receipt_id: u64, ) -> ReceiptResult<()> { match receipt_check { - ReceiptCheck::CheckUnique => self.check_uniqueness(signed_receipt, receipt_id), - ReceiptCheck::CheckAllocationId => self.check_allocation_id(signed_receipt), - ReceiptCheck::CheckSignature => self.check_signature(signed_receipt), - ReceiptCheck::CheckTimestamp => self.check_timestamp(signed_receipt), - ReceiptCheck::CheckValue => self.check_value(signed_receipt, query_id), + ReceiptCheck::CheckUnique => self.check_uniqueness(signed_receipt, receipt_id).await, + ReceiptCheck::CheckAllocationId => self.check_allocation_id(signed_receipt).await, + ReceiptCheck::CheckSignature => self.check_signature(signed_receipt).await, + ReceiptCheck::CheckTimestamp => self.check_timestamp(signed_receipt).await, + ReceiptCheck::CheckValue => self.check_value(signed_receipt, query_id).await, ReceiptCheck::CheckAndReserveCollateral => { - self.check_and_reserve_collateral(signed_receipt) + self.check_and_reserve_collateral(signed_receipt).await } } } - fn check_uniqueness( + async fn check_uniqueness( &self, signed_receipt: &EIP712SignedMessage, receipt_id: u64, @@ -62,19 +64,21 @@ impl ReceiptAuditor { if !self .receipt_checks_adapter .is_unique(signed_receipt, receipt_id) + .await { return Err(ReceiptError::NonUniqueReceipt); } Ok(()) } - fn check_allocation_id( + async fn check_allocation_id( &self, signed_receipt: &EIP712SignedMessage, ) -> ReceiptResult<()> { if !self .receipt_checks_adapter .is_valid_allocation_id(signed_receipt.message.allocation_id) + .await { return Err(ReceiptError::InvalidAllocationID { received_allocation_id: signed_receipt.message.allocation_id, @@ -83,16 +87,20 @@ impl ReceiptAuditor { Ok(()) } - fn check_timestamp(&self, signed_receipt: &EIP712SignedMessage) -> ReceiptResult<()> { - if signed_receipt.message.timestamp_ns <= self.min_timestamp_ns { + async fn check_timestamp( + &self, + signed_receipt: &EIP712SignedMessage, + ) -> ReceiptResult<()> { + let min_timestamp_ns = *self.min_timestamp_ns.read().await; + if signed_receipt.message.timestamp_ns <= min_timestamp_ns { return Err(ReceiptError::InvalidTimestamp { received_timestamp: signed_receipt.message.timestamp_ns, - timestamp_min: self.min_timestamp_ns, + timestamp_min: min_timestamp_ns, }); } Ok(()) } - fn check_value( + async fn check_value( &self, signed_receipt: &EIP712SignedMessage, query_id: u64, @@ -100,6 +108,7 @@ impl ReceiptAuditor { if !self .receipt_checks_adapter .is_valid_value(signed_receipt.message.value, query_id) + .await { return Err(ReceiptError::InvalidValue { received_value: signed_receipt.message.value, @@ -108,7 +117,10 @@ impl ReceiptAuditor { Ok(()) } - fn check_signature(&self, signed_receipt: &EIP712SignedMessage) -> ReceiptResult<()> { + async fn check_signature( + &self, + signed_receipt: &EIP712SignedMessage, + ) -> ReceiptResult<()> { let receipt_signer_address = signed_receipt .recover_signer() @@ -118,6 +130,7 @@ impl ReceiptAuditor { if !self .receipt_checks_adapter .is_valid_gateway_id(receipt_signer_address) + .await { return Err(ReceiptError::InvalidSignature { source_error_message: format!( @@ -129,8 +142,8 @@ impl ReceiptAuditor { Ok(()) } - fn check_and_reserve_collateral( - &mut self, + async fn check_and_reserve_collateral( + &self, signed_receipt: &EIP712SignedMessage, ) -> ReceiptResult<()> { let receipt_signer_address = @@ -142,6 +155,7 @@ impl ReceiptAuditor { if self .collateral_adapter .subtract_collateral(receipt_signer_address, signed_receipt.message.value) + .await .is_err() { return Err(ReceiptError::SubtractCollateralFailed); @@ -150,7 +164,7 @@ impl ReceiptAuditor { Ok(()) } - pub fn check_rav_signature( + pub async fn check_rav_signature( &self, signed_rav: &EIP712SignedMessage, ) -> Result<()> { @@ -158,6 +172,7 @@ impl ReceiptAuditor { if !self .receipt_checks_adapter .is_valid_gateway_id(rav_signer_address) + .await { return Err(Error::InvalidRecoveredSigner { address: rav_signer_address, diff --git a/tap_core/src/tap_receipt/received_receipt.rs b/tap_core/src/tap_receipt/received_receipt.rs index 4a3af399..10860c45 100644 --- a/tap_core/src/tap_receipt/received_receipt.rs +++ b/tap_core/src/tap_receipt/received_receipt.rs @@ -106,11 +106,11 @@ impl ReceivedReceipt { /// /// Returns [`Error::InvalidCheckError] if requested error in not a required check (list of required checks provided by user on construction) /// - pub fn perform_check( + pub async fn perform_check( &mut self, check: &ReceiptCheck, receipt_id: u64, - receipt_auditor: &mut ReceiptAuditor, + receipt_auditor: &ReceiptAuditor, ) -> crate::Result<()> { match self.state { ReceiptState::Checking | ReceiptState::Received => { @@ -137,7 +137,11 @@ impl ReceivedReceipt { if !self.check_is_complete(check) { result = self.update_check( check, - Some(receipt_auditor.check(check, &self.signed_receipt, self.query_id, receipt_id)), + Some( + receipt_auditor + .check(check, &self.signed_receipt, self.query_id, receipt_id) + .await, + ), ); } self.update_state(); @@ -154,11 +158,11 @@ impl ReceivedReceipt { /// /// Returns [`Error::InvalidCheckError] if requested error in not a required check (list of required checks provided by user on construction) /// - pub fn perform_checks( + pub async fn perform_checks( &mut self, checks: &[ReceiptCheck], receipt_id: u64, - receipt_auditor: &mut ReceiptAuditor, + receipt_auditor: &ReceiptAuditor, ) -> Result<()> { let mut check_and_reserve_collateral_included = false; for check in checks { @@ -167,7 +171,8 @@ impl ReceivedReceipt { check_and_reserve_collateral_included = true; continue; } - self.perform_check(check, receipt_id, receipt_auditor)?; + self.perform_check(check, receipt_id, receipt_auditor) + .await?; } if check_and_reserve_collateral_included && self.state != ReceiptState::Failed { // CheckAndReserveCollateral is only performed after all other checks have passed @@ -175,7 +180,8 @@ impl ReceivedReceipt { &ReceiptCheck::CheckAndReserveCollateral, receipt_id, receipt_auditor, - )?; + ) + .await?; } Ok(()) } @@ -184,16 +190,17 @@ impl ReceivedReceipt { /// /// Returns `Err` only if unable to complete a check, returns `Ok` if no check failed to complete (*Important:* this is not the result of the check, just the result of _completing_ the check) /// - pub fn finalize_receipt_checks( + pub async fn finalize_receipt_checks( &mut self, receipt_id: u64, - receipt_auditor: &mut ReceiptAuditor, + receipt_auditor: &ReceiptAuditor, ) -> Result<()> { self.perform_checks( self.incomplete_checks().as_slice(), receipt_id, receipt_auditor, ) + .await } /// Update RAV status, should be called when receipt is included in RAV request and when RAV request is received diff --git a/tap_core/src/tap_receipt/tests/received_receipt_tests.rs b/tap_core/src/tap_receipt/tests/received_receipt_tests.rs index 450e34bd..e192b4c0 100644 --- a/tap_core/src/tap_receipt/tests/received_receipt_tests.rs +++ b/tap_core/src/tap_receipt/tests/received_receipt_tests.rs @@ -6,12 +6,13 @@ mod received_receipt_unit_test { use std::{ collections::{HashMap, HashSet}, str::FromStr, - sync::{Arc, RwLock}, + sync::Arc, }; use ethereum_types::Address; use ethers::signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer}; use rstest::*; + use tokio::sync::RwLock; use crate::{ adapters::{ @@ -94,6 +95,7 @@ mod received_receipt_unit_test { } #[rstest] + #[tokio::test] async fn initialization_valid_receipt( keys: (LocalWallet, Address), allocation_ids: Vec
, @@ -114,6 +116,7 @@ mod received_receipt_unit_test { } #[rstest] + #[tokio::test] async fn partial_then_full_check_valid_receipt( keys: (LocalWallet, Address), allocation_ids: Vec
, @@ -149,12 +152,12 @@ mod received_receipt_unit_test { // add collateral for gateway collateral_storage .write() - .unwrap() + .await .insert(keys.1, query_value + 500); // appraise query query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, query_value); let checks = get_full_list_of_checks(); @@ -172,10 +175,12 @@ mod received_receipt_unit_test { receipt_id, &mut receipt_auditor ) + .await .is_ok()); assert!(received_receipt .perform_checks(&checks, receipt_id, &mut receipt_auditor) + .await .is_ok()); assert_eq!(received_receipt.state, ReceiptState::Accepted); @@ -183,6 +188,7 @@ mod received_receipt_unit_test { } #[rstest] + #[tokio::test] async fn partial_then_finalize_valid_receipt( keys: (LocalWallet, Address), allocation_ids: Vec
, @@ -218,12 +224,12 @@ mod received_receipt_unit_test { // add collateral for gateway collateral_storage .write() - .unwrap() + .await .insert(keys.1, query_value + 500); // appraise query query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, query_value); let checks = get_full_list_of_checks(); @@ -241,10 +247,12 @@ mod received_receipt_unit_test { receipt_id, &mut receipt_auditor ) + .await .is_ok()); assert!(received_receipt .finalize_receipt_checks(receipt_id, &mut receipt_auditor) + .await .is_ok()); assert_eq!(received_receipt.state, ReceiptState::Accepted); @@ -262,6 +270,7 @@ mod received_receipt_unit_test { } #[rstest] + #[tokio::test] async fn standard_lifetime_valid_receipt( keys: (LocalWallet, Address), allocation_ids: Vec
, @@ -297,12 +306,12 @@ mod received_receipt_unit_test { // add collateral for gateway collateral_storage .write() - .unwrap() + .await .insert(keys.1, query_value + 500); // appraise query query_appraisal_storage .write() - .unwrap() + .await .insert(query_id, query_value); let checks = get_full_list_of_checks(); @@ -314,6 +323,7 @@ mod received_receipt_unit_test { assert!(received_receipt .finalize_receipt_checks(receipt_id, &mut receipt_auditor) + .await .is_ok()); assert_eq!(received_receipt.state, ReceiptState::Accepted); diff --git a/tap_integration_tests/tests/indexer_mock/mod.rs b/tap_integration_tests/tests/indexer_mock/mod.rs index 1c6b467b..f77bbb85 100644 --- a/tap_integration_tests/tests/indexer_mock/mod.rs +++ b/tap_integration_tests/tests/indexer_mock/mod.rs @@ -16,7 +16,6 @@ use jsonrpsee::{ rpc_params, server::{ServerBuilder, ServerHandle}, }; -use tokio::sync::Mutex; use tap_aggregator::jsonrpsee_helpers; use tap_core::{ @@ -44,18 +43,18 @@ pub trait Rpc { /// RpcManager is a struct that implements the `Rpc` trait and it represents a JSON-RPC server manager. /// It includes a manager, initial_checks, receipt_count, threshold and aggregator_client. -/// Manager holds a Mutex-protected instance of a generic `Manager` object which is shared and can be accessed by multiple threads. +/// Manager holds an Arc to an instance of a generic `Manager` object which is shared and can be accessed by multiple threads. /// initial_checks is a list of checks that needs to be performed for every incoming request. /// receipt_count is a thread-safe counter that increments with each receipt verified and stored. /// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered. /// aggregator_client is an HTTP client used for making JSON-RPC requests to another server. pub struct RpcManager< - CA: CollateralAdapter + Send + 'static, // An instance of CollateralAdapter, marked as thread-safe with Send and given 'static lifetime - RCA: ReceiptChecksAdapter + Send + 'static, // An instance of ReceiptChecksAdapter - RSA: ReceiptStorageAdapter + Send + 'static, // An instance of ReceiptStorageAdapter - RAVSA: RAVStorageAdapter + Send + 'static, // An instance of RAVStorageAdapter + CA: CollateralAdapter + Send + Sync + 'static, // An instance of CollateralAdapter, marked as thread-safe with Send and given 'static lifetime + RCA: ReceiptChecksAdapter + Send + Sync + 'static, // An instance of ReceiptChecksAdapter + RSA: ReceiptStorageAdapter + Send + Sync + 'static, // An instance of ReceiptStorageAdapter + RAVSA: RAVStorageAdapter + Send + Sync + 'static, // An instance of RAVStorageAdapter > { - manager: Arc>>, // Manager object in a mutex for thread safety, reference counted with an Arc + manager: Arc>, // Manager object reference counted with an Arc initial_checks: Vec, // Vector of initial checks to be performed on each request receipt_count: Arc, // Thread-safe atomic counter for receipts threshold: u64, // The count at which a RAV request will be triggered @@ -66,10 +65,10 @@ pub struct RpcManager< /// Constructor initializes a new instance of `RpcManager`. /// `request` method handles incoming JSON-RPC requests and it verifies and stores the receipt from the request. impl< - CA: CollateralAdapter + Send + 'static, - RCA: ReceiptChecksAdapter + Send + 'static, - RSA: ReceiptStorageAdapter + Send + 'static, - RAVSA: RAVStorageAdapter + Send + 'static, + CA: CollateralAdapter + Send + Sync + 'static, + RCA: ReceiptChecksAdapter + Send + Sync + 'static, + RSA: ReceiptStorageAdapter + Send + Sync + 'static, + RAVSA: RAVStorageAdapter + Send + Sync + 'static, > RpcManager { pub fn new( @@ -84,14 +83,14 @@ impl< aggregate_server_api_version: String, ) -> Result { Ok(Self { - manager: Arc::new(Mutex::new(Manager::::new( + manager: Arc::new(Manager::::new( collateral_adapter, receipt_checks_adapter, rav_storage_adapter, receipt_storage_adapter, required_checks, get_current_timestamp_u64_ns()?, - ))), + )), initial_checks, receipt_count: Arc::new(AtomicU64::new(0)), threshold, @@ -105,10 +104,10 @@ impl< #[async_trait] impl< - CA: CollateralAdapter + Send + 'static, - RCA: ReceiptChecksAdapter + Send + 'static, - RSA: ReceiptStorageAdapter + Send + 'static, - RAVSA: RAVStorageAdapter + Send + 'static, + CA: CollateralAdapter + Send + Sync + 'static, + RCA: ReceiptChecksAdapter + Send + Sync + 'static, + RSA: ReceiptStorageAdapter + Send + Sync + 'static, + RAVSA: RAVStorageAdapter + Send + Sync + 'static, > RpcServer for RpcManager { async fn request( @@ -116,21 +115,17 @@ impl< request_id: u64, receipt: SignedReceipt, ) -> Result<(), jsonrpsee::types::ErrorObjectOwned> { - let verify_result; + let verify_result = match self + .manager + .verify_and_store_receipt(receipt, request_id, self.initial_checks.clone()) + .await { - let mut manager_guard = self.manager.lock().await; - verify_result = match manager_guard.verify_and_store_receipt( - receipt, - request_id, - self.initial_checks.clone(), - ) { - Ok(_) => Ok(()), - Err(e) => Err(to_rpc_error( - Box::new(e), - "Failed to verify and store receipt", - )), - }; - } + Ok(_) => Ok(()), + Err(e) => Err(to_rpc_error( + Box::new(e), + "Failed to verify and store receipt", + )), + }; // Increment the receipt count self.receipt_count.fetch_add(1, Ordering::Relaxed); @@ -165,10 +160,10 @@ impl< /// run_server function initializes and starts a JSON-RPC server that handles incoming requests. pub async fn run_server< - CA: CollateralAdapter + Send + 'static, - RCA: ReceiptChecksAdapter + Send + 'static, - RSA: ReceiptStorageAdapter + Send + 'static, - RAVSA: RAVStorageAdapter + Send + 'static, + CA: CollateralAdapter + Send + Sync + 'static, + RCA: ReceiptChecksAdapter + Send + Sync + 'static, + RSA: ReceiptStorageAdapter + Send + Sync + 'static, + RAVSA: RAVStorageAdapter + Send + Sync + 'static, >( port: u16, // Port on which the server will listen collateral_adapter: CA, // CollateralAdapter instance @@ -207,18 +202,18 @@ pub async fn run_server< // request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result. async fn request_rav< - CA: CollateralAdapter + Send + 'static, - RCA: ReceiptChecksAdapter + Send + 'static, - RSA: ReceiptStorageAdapter + Send + 'static, - RAVSA: RAVStorageAdapter + Send + 'static, + CA: CollateralAdapter + Send + Sync + 'static, + RCA: ReceiptChecksAdapter + Send + Sync + 'static, + RSA: ReceiptStorageAdapter + Send + Sync + 'static, + RAVSA: RAVStorageAdapter + Send + Sync + 'static, >( - manager: &Arc>>, // Mutex-protected manager object for thread safety + manager: &Arc>, time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server threshold: usize, ) -> Result<()> { // Create the aggregate_receipts request params - let rav_request = manager.lock().await.create_rav_request(time_stamp_buffer)?; + let rav_request = manager.create_rav_request(time_stamp_buffer).await?; // To-do: Need to add previous RAV, when tap_manager supports replacing receipts let params = rpc_params!( @@ -232,10 +227,9 @@ async fn request_rav< .0 .request("aggregate_receipts", params) .await?; - { - let mut manager_guard = manager.lock().await; - manager_guard.verify_and_store_rav(rav_request.expected_rav, remote_rav_result.data)?; - } + manager + .verify_and_store_rav(rav_request.expected_rav, remote_rav_result.data) + .await?; // For these tests, we expect every receipt to be valid, i.e. there should be no invalid receipts, nor any missing receipts (less than the expected threshold). // If there is throw an error. diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 1b6c1ec2..c90fc8bc 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -10,7 +10,7 @@ use std::{ iter::FromIterator, net::{SocketAddr, TcpListener}, str::FromStr, - sync::{Arc, RwLock}, + sync::Arc, }; use anyhow::{Error, Result}; @@ -23,6 +23,7 @@ use jsonrpsee::{ }; use rand::{rngs::StdRng, Rng, SeedableRng}; use rstest::*; +use tokio::sync::RwLock; use tap_aggregator::{jsonrpsee_helpers, server as agg_server}; use tap_core::{ @@ -911,7 +912,9 @@ async fn start_indexer_server( listener.local_addr()?.port() }; - collateral_adapter.increase_collateral(gateway_id, available_collateral); + collateral_adapter + .increase_collateral(gateway_id, available_collateral) + .await; let aggregate_server_address = "http://".to_string() + &agg_server_addr.to_string(); let (server_handle, socket_addr) = indexer_mock::run_server(