diff --git a/sui/src/bench.rs b/sui/src/bench.rs index 57a7c69e29e2b..23b6e6e516931 100644 --- a/sui/src/bench.rs +++ b/sui/src/bench.rs @@ -167,7 +167,7 @@ impl ClientServerBenchmark { let state = AuthorityState::new( committee.clone(), public_auth0, - Box::pin(secret_auth0), + Arc::pin(secret_auth0), store, genesis::clone_genesis_compiled_modules(), &mut genesis::get_genesis_context(), diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index 3fa1943ec925a..329453fdd14b0 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -263,7 +263,7 @@ async fn make_server_with_genesis_ctx( let state = AuthorityState::new( committee.clone(), name, - Box::pin(authority.key_pair.copy()), + Arc::pin(authority.key_pair.copy()), store, preload_modules, genesis_ctx, diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index 1c63f2c98a347..b7284dce430ed 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -27,9 +27,11 @@ use sui_types::{ MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS, }; +use crate::authority_batch::BatchSender; + #[cfg(test)] #[path = "unit_tests/authority_tests.rs"] -mod authority_tests; +pub mod authority_tests; mod temporary_store; use temporary_store::AuthorityTemporaryStore; @@ -46,7 +48,8 @@ const MAX_GAS_BUDGET: u64 = 18446744073709551615 / 1000 - 1; /// /// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair` /// -type StableSyncAuthoritySigner = Pin + Send + Sync>>; +pub type StableSyncAuthoritySigner = + Pin + Send + Sync>>; pub struct AuthorityState { // Fixed size, static, identity of the authority @@ -63,6 +66,11 @@ pub struct AuthorityState { /// The database _database: Arc, + + /// The sender to notify of new transactions + /// and create batches for this authority. + /// Keep as None if there is no need for this. + batch_sender: Option, } /// The authority state encapsulates all state, drives execution, and ensures safety. @@ -72,6 +80,16 @@ pub struct AuthorityState { /// /// Repeating valid commands should produce no changes and return no error. impl AuthorityState { + /// Set a listener for transaction certificate updates. Returns an + /// error if a listener is already registered. + pub fn set_batch_sender(&mut self, batch_sender: BatchSender) -> SuiResult { + if self.batch_sender.is_some() { + return Err(SuiError::AuthorityUpdateFailure); + } + self.batch_sender = Some(batch_sender); + Ok(()) + } + /// The logic to check one object against a reference, and return the object if all is well /// or an error if not. fn check_one_lock( @@ -381,8 +399,17 @@ impl AuthorityState { &gas_object_id, ); // Update the database in an atomic manner - self.update_state(temporary_store, certificate, to_signed_effects) - .await // Returns the TransactionInfoResponse + + let (seq, resp) = self + .update_state(temporary_store, certificate, to_signed_effects) + .await?; // Returns the OrderInfoResponse + + // If there is a notifier registered, notify: + if let Some(sender) = &self.batch_sender { + sender.send_item(seq, transaction_digest).await?; + } + + Ok(resp) } fn execute_transaction( @@ -616,6 +643,7 @@ impl AuthorityState { move_vm: adapter::new_move_vm(native_functions) .expect("We defined natives to not fail here"), _database: store, + batch_sender: None, }; for genesis_modules in genesis_packages { @@ -627,6 +655,11 @@ impl AuthorityState { state } + #[cfg(test)] + pub fn db(&self) -> Arc { + self._database.clone() + } + async fn get_object(&self, object_id: &ObjectID) -> Result, SuiError> { self._database.get_object(object_id) } @@ -708,9 +741,10 @@ impl AuthorityState { async fn update_state( &self, temporary_store: AuthorityTemporaryStore, + certificate: CertifiedTransaction, signed_effects: SignedTransactionEffects, - ) -> Result { + ) -> Result<(u64, TransactionInfoResponse), SuiError> { self._database .update_state(temporary_store, certificate, signed_effects) } diff --git a/sui_core/src/authority/authority_store.rs b/sui_core/src/authority/authority_store.rs index c136001b64095..ac16528a0151d 100644 --- a/sui_core/src/authority/authority_store.rs +++ b/sui_core/src/authority/authority_store.rs @@ -6,10 +6,17 @@ use rocksdb::Options; use std::collections::BTreeSet; use std::convert::TryInto; use std::path::Path; + +use std::sync::atomic::AtomicU64; use sui_types::base_types::SequenceNumber; use typed_store::rocks::{open_cf, DBBatch, DBMap}; + +use std::sync::atomic::Ordering; use typed_store::traits::Map; +pub use crate::authority_batch::AuthorityBatch; +use crate::authority_batch::{SignedBatch, TxSequenceNumber}; + pub struct AuthorityStore { /// This is a map between the object ID and the latest state of the object, namely the /// state that is needed to process new transactions. If an object is deleted its entry is @@ -61,6 +68,16 @@ pub struct AuthorityStore { /// Internal vector of locks to manage concurrent writes to the database lock_table: Vec>, + + // Tables used for authority batch structure + /// A sequence on all executed certificates and effects. + pub executed_sequence: DBMap, + + /// A sequence of batches indexing into the sequence of executed transactions. + pub batches: DBMap, + + /// The next available sequence number to use in the `executed sequence` table. + pub next_sequence_number: AtomicU64, } impl AuthorityStore { @@ -79,9 +96,27 @@ impl AuthorityStore { "signed_effects", "sequenced", "schedule", + "executed_sequence", + "batches", ], ) .expect("Cannot open DB."); + + let executed_sequence = + DBMap::reopen(&db, Some("executed_sequence")).expect("Cannot open CF."); + + // Read the index of the last entry in the sequence of commands + // to extract the next sequence number or it is zero. + let next_sequence_number = AtomicU64::new( + executed_sequence + .iter() + .skip_prior_to(&TxSequenceNumber::MAX) + .expect("Error reading table.") + .next() + .map(|(v, _)| v + 1u64) + .unwrap_or(0), + ); + AuthorityStore { objects: DBMap::reopen(&db, Some("objects")).expect("Cannot open CF."), owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."), @@ -98,6 +133,9 @@ impl AuthorityStore { .into_iter() .map(|_| parking_lot::Mutex::new(())) .collect(), + executed_sequence, + batches: DBMap::reopen(&db, Some("batches")).expect("Cannot open CF."), + next_sequence_number, } } @@ -330,7 +368,7 @@ impl AuthorityStore { temporary_store: AuthorityTemporaryStore, certificate: CertifiedTransaction, signed_effects: SignedTransactionEffects, - ) -> Result { + ) -> Result<(TxSequenceNumber, TransactionInfoResponse), SuiError> { // Extract the new state from the execution // TODO: events are already stored in the TxDigest -> TransactionEffects store. Is that enough? let mut write_batch = self.transaction_lock.batch(); @@ -348,13 +386,19 @@ impl AuthorityStore { std::iter::once((transaction_digest, &signed_effects)), )?; - self.batch_update_objects(write_batch, temporary_store, transaction_digest)?; - - Ok(TransactionInfoResponse { - signed_transaction: self.signed_transactions.get(&transaction_digest)?, - certified_transaction: Some(certificate), - signed_effects: Some(signed_effects), - }) + // Safe to unwrap since the "true" flag ensures we get a sequence value back. + let seq: TxSequenceNumber = self + .batch_update_objects(write_batch, temporary_store, transaction_digest, true)? + .unwrap(); + + Ok(( + seq, + TransactionInfoResponse { + signed_transaction: self.signed_transactions.get(&transaction_digest)?, + certified_transaction: Some(certificate), + signed_effects: Some(signed_effects), + }, + )) } /// Persist temporary storage to DB for genesis modules @@ -365,7 +409,8 @@ impl AuthorityStore { ) -> Result<(), SuiError> { debug_assert_eq!(transaction_digest, TransactionDigest::genesis()); let write_batch = self.transaction_lock.batch(); - self.batch_update_objects(write_batch, temporary_store, transaction_digest) + self.batch_update_objects(write_batch, temporary_store, transaction_digest, false) + .map(|_| ()) } /// Helper function for updating the objects in the state @@ -374,7 +419,8 @@ impl AuthorityStore { mut write_batch: DBBatch, temporary_store: AuthorityTemporaryStore, transaction_digest: TransactionDigest, - ) -> Result<(), SuiError> { + should_sequence: bool, + ) -> Result, SuiError> { let (objects, active_inputs, written, deleted, _events) = temporary_store.into_inner(); // Archive the old lock. @@ -454,6 +500,7 @@ impl AuthorityStore { // This is the critical region: testing the locks and writing the // new locks must be atomic, and no writes should happen in between. + let mut return_seq = None; { // Acquire the lock to ensure no one else writes when we are in here. let _mutexes = self.acquire_locks(&active_inputs[..]); @@ -465,13 +512,30 @@ impl AuthorityStore { object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?; } + if should_sequence { + // Now we are sure we are going to execute, add to the sequence + // number and insert into authority sequence. + // + // NOTE: it is possible that we commit to the database transactions + // out of order with respect to their sequence number. It is also + // possible for the authority to crash without committing the + // full sequence, and the batching logic needs to deal with this. + let next_seq = self.next_sequence_number.fetch_add(1, Ordering::SeqCst); + write_batch = write_batch.insert_batch( + &self.executed_sequence, + std::iter::once((next_seq, transaction_digest)), + )?; + + return_seq = Some(next_seq); + } + // Atomic write of all locks & other data write_batch.write()?; // implicit: drop(_mutexes); } // End of critical region - Ok(()) + Ok(return_seq) } /// Returns the last entry we have for this object in the parents_sync index used diff --git a/sui_core/src/authority_batch.rs b/sui_core/src/authority_batch.rs new file mode 100644 index 0000000000000..d65c70c390ff1 --- /dev/null +++ b/sui_core/src/authority_batch.rs @@ -0,0 +1,378 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::authority::{AuthorityStore, StableSyncAuthoritySigner}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use sui_types::base_types::*; +use sui_types::error::{SuiError, SuiResult}; + +use std::collections::BTreeMap; +use std::time::Duration; +use sui_types::crypto::{sha3_hash, AuthoritySignature, BcsSignable}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::time::interval; + +use typed_store::Map; + +#[cfg(test)] +#[path = "unit_tests/batch_tests.rs"] +mod batch_tests; + +/* + +An authority asynchronously creates batches from its sequence of +certificates / effects. Then both the sequence of certificates +/ effects are transmitted to listeners (as a transaction digest) +as well as batches. + +The architecture is as follows: +- The authority store notifies through the Sender that a new + certificate / effect has been sequenced, at a specific sequence + number. +- The sender sends this information through a channel to the Manager, + that decides whether a new batch should be made. This is based on + time elapsed as well as current size of batch. If so a new batch + is created. +- The authority manager also holds the sending ends of a number of + channels that eventually go to clients that registered interest + in receiving all updates from the authority. When a new item is + sequenced of a batch created this is sent out to them. + +*/ + +pub type TxSequenceNumber = u64; + +pub type BroadcastPair = ( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Receiver, +); + +/// Either a freshly sequenced transaction hash or a batch +#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum UpdateItem { + Transaction((TxSequenceNumber, TransactionDigest)), + Batch(SignedBatch), +} + +pub struct BatchSender { + /// Channel for sending updates. + tx_send: Sender<(TxSequenceNumber, TransactionDigest)>, +} + +pub struct BatchManager { + /// Channel for receiving updates + tx_recv: Receiver<(TxSequenceNumber, TransactionDigest)>, + /// The sender end of the broadcast channel used to send updates to listeners + tx_broadcast: tokio::sync::broadcast::Sender, + /// Copy of the database to write batches and read transactions. + db: Arc, +} + +impl BatchSender { + /// Send a new event to the batch manager + pub async fn send_item( + &self, + transaction_sequence: TxSequenceNumber, + transaction_digest: TransactionDigest, + ) -> Result<(), SuiError> { + self.tx_send + .send((transaction_sequence, transaction_digest)) + .await + .map_err(|_| SuiError::BatchErrorSender) + } +} + +impl BatchManager { + pub fn new( + db: Arc, + capacity: usize, + ) -> (BatchSender, BatchManager, BroadcastPair) { + let (tx_send, tx_recv) = channel(capacity); + let (tx_broadcast, rx_broadcast) = tokio::sync::broadcast::channel(capacity); + let sender = BatchSender { tx_send }; + let manager = BatchManager { + tx_recv, + tx_broadcast: tx_broadcast.clone(), + db, + }; + + (sender, manager, (tx_broadcast, rx_broadcast)) + } + + /// Starts the manager service / tokio task + pub async fn start_service( + mut self, + authority_name: AuthorityName, + secret: StableSyncAuthoritySigner, + min_batch_size: u64, + max_delay: Duration, + ) -> Result, SuiError> { + let last_batch = self + .init_from_database(authority_name, secret.clone()) + .await?; + + let join_handle = tokio::spawn(async move { + self.run_service( + authority_name, + secret, + last_batch, + min_batch_size, + max_delay, + ) + .await + .expect("Service returns with no errors"); + drop(self); + }); + + Ok(join_handle) + } + + async fn init_from_database( + &self, + authority_name: AuthorityName, + secret: StableSyncAuthoritySigner, + ) -> Result { + // First read the last batch in the db + let mut last_batch = match self + .db + .batches + .iter() + .skip_prior_to(&TxSequenceNumber::MAX)? + .next() + { + Some((_, last_batch)) => last_batch.batch, + None => { + // Make a batch at zero + let zero_batch = + SignedBatch::new(AuthorityBatch::initial(), &*secret, authority_name); + self.db.batches.insert(&0, &zero_batch)?; + zero_batch.batch + } + }; + + // See if there are any transactions in the database not in a batch + let transactions: Vec<_> = self + .db + .executed_sequence + .iter() + .skip_to(&last_batch.next_sequence_number)? + .collect(); + + if !transactions.is_empty() { + // Make a new batch, to put the old transactions not in a batch in. + let last_signed_batch = SignedBatch::new( + AuthorityBatch::make_next(&last_batch, &transactions[..]), + &*secret, + authority_name, + ); + self.db.batches.insert( + &last_signed_batch.batch.next_sequence_number, + &last_signed_batch, + )?; + last_batch = last_signed_batch.batch; + } + + Ok(last_batch) + } + + pub async fn run_service( + &mut self, + authority_name: AuthorityName, + secret: StableSyncAuthoritySigner, + prev_batch: AuthorityBatch, + min_batch_size: u64, + max_delay: Duration, + ) -> SuiResult { + // Then we operate in a loop, where for each new update we consider + // whether to create a new batch or not. + + let mut interval = interval(max_delay); + let mut exit = false; + let mut make_batch; + + let mut prev_batch = prev_batch; + + // The structures we use to build the next batch. The current_batch holds the sequence + // of transactions in order, following the last batch. The loose transactions holds + // transactions we may have received out of order. + let mut current_batch: Vec<(TxSequenceNumber, TransactionDigest)> = Vec::new(); + let mut loose_transactions: BTreeMap = BTreeMap::new(); + + let mut next_sequence_number = prev_batch.next_sequence_number; + + while !exit { + // Reset the flags. + make_batch = false; + + // check if we should make a new block + tokio::select! { + _ = interval.tick() => { + // Every so often we check if we should make a batch + // but it should never be empty. But never empty. + make_batch = true; + }, + item_option = self.tx_recv.recv() => { + + match item_option { + None => { + make_batch = true; + exit = true; + }, + Some((seq, tx_digest)) => { + loose_transactions.insert(seq, tx_digest); + while loose_transactions.contains_key(&next_sequence_number) { + let next_item = (next_sequence_number, loose_transactions.remove(&next_sequence_number).unwrap()); + // Send the update + let _ = self.tx_broadcast.send(UpdateItem::Transaction(next_item)); + current_batch.push(next_item); + next_sequence_number += 1; + } + + if current_batch.len() as TxSequenceNumber >= min_batch_size { + make_batch = true; + } + } + } + } + } + + // Logic to make a batch + if make_batch { + if current_batch.is_empty() { + continue; + } + + // Make and store a new batch. + let new_batch = SignedBatch::new( + AuthorityBatch::make_next(&prev_batch, ¤t_batch), + &*secret, + authority_name, + ); + self.db + .batches + .insert(&new_batch.batch.next_sequence_number, &new_batch)?; + + // Send the update + let _ = self.tx_broadcast.send(UpdateItem::Batch(new_batch.clone())); + + // A new batch is actually made, so we reset the conditions. + prev_batch = new_batch.batch; + current_batch.clear(); + + // We rest the interval here to ensure that blocks + // are made either when they are full or old enough. + interval.reset(); + } + } + + // When a new batch is created we send a notification to all who have + // registered an interest. + + Ok(()) + } + + /// Register a sending channel used to send streaming + /// updates to clients. + pub fn register_listener() {} +} + +pub type BatchDigest = [u8; 32]; + +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)] +pub struct TransactionBatch(Vec<(TxSequenceNumber, TransactionDigest)>); +impl BcsSignable for TransactionBatch {} + +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Hash, Default, Debug, Serialize, Deserialize)] +pub struct AuthorityBatch { + // TODO: Add epoch + /// The next sequence number after the end of this batch + next_sequence_number: u64, + + /// The first sequence number of this batch + initial_sequence_number: u64, + + // The number of items in the batch + size: u64, + + /// The digest of the previous block, if there is one + previous_digest: Option, + + // The digest of all transactions digests in this batch + transactions_digest: [u8; 32], +} + +impl BcsSignable for AuthorityBatch {} + +impl AuthorityBatch { + pub fn digest(&self) -> BatchDigest { + sha3_hash(self) + } + + /// The first batch for any authority indexes at zero + /// and has zero length. + pub fn initial() -> AuthorityBatch { + let to_hash = TransactionBatch(Vec::new()); + let transactions_digest = sha3_hash(&to_hash); + + AuthorityBatch { + next_sequence_number: 0, + initial_sequence_number: 0, + size: 0, + previous_digest: None, + transactions_digest, + } + } + + /// Make a batch, containing some transactions, and following the previous + /// batch. + pub fn make_next( + previous_batch: &AuthorityBatch, + transactions: &[(TxSequenceNumber, TransactionDigest)], + ) -> AuthorityBatch { + let transaction_vec = transactions.to_vec(); + debug_assert!(!transaction_vec.is_empty()); + + let initial_sequence_number = transaction_vec[0].0 as u64; + let next_sequence_number = (transaction_vec[transaction_vec.len() - 1].0 + 1) as u64; + + let to_hash = TransactionBatch(transaction_vec); + let transactions_digest = sha3_hash(&to_hash); + + AuthorityBatch { + next_sequence_number, + initial_sequence_number, + size: transactions.len() as u64, + previous_digest: Some(previous_batch.digest()), + transactions_digest, + } + } +} + +/// An transaction signed by a single authority +#[derive(Eq, Clone, Debug, Serialize, Deserialize)] +pub struct SignedBatch { + pub batch: AuthorityBatch, + pub authority: AuthorityName, + pub signature: AuthoritySignature, +} + +impl SignedBatch { + pub fn new( + batch: AuthorityBatch, + secret: &dyn signature::Signer, + authority: AuthorityName, + ) -> SignedBatch { + SignedBatch { + signature: AuthoritySignature::new(&batch, secret), + batch, + authority, + } + } +} + +impl PartialEq for SignedBatch { + fn eq(&self, other: &Self) -> bool { + self.batch == other.batch && self.authority == other.authority + } +} diff --git a/sui_core/src/client.rs b/sui_core/src/client.rs index e1406f509a710..4c7e22727f9c7 100644 --- a/sui_core/src/client.rs +++ b/sui_core/src/client.rs @@ -276,7 +276,7 @@ impl ClientState { /// Returns all object references that are in `object_refs` but not in the store. pub fn object_refs_not_in_store( &self, - object_refs: &Vec, + object_refs: &[ObjectRef], ) -> SuiResult> { let result = self .store diff --git a/sui_core/src/lib.rs b/sui_core/src/lib.rs index dfe3a7c7262a5..68081cae1e8f2 100644 --- a/sui_core/src/lib.rs +++ b/sui_core/src/lib.rs @@ -4,6 +4,7 @@ pub mod authority; pub mod authority_aggregator; +pub mod authority_batch; pub mod authority_client; pub mod authority_server; pub mod client; diff --git a/sui_core/src/unit_tests/authority_tests.rs b/sui_core/src/unit_tests/authority_tests.rs index b3277e676aa3c..c71425996a442 100644 --- a/sui_core/src/unit_tests/authority_tests.rs +++ b/sui_core/src/unit_tests/authority_tests.rs @@ -31,7 +31,7 @@ pub fn system_maxfiles() -> usize { fdlimit::raise_fd_limit().unwrap_or(256u64) as usize } -fn max_files_authority_tests() -> i32 { +pub fn max_files_authority_tests() -> i32 { (system_maxfiles() / 8).try_into().unwrap() } @@ -383,7 +383,7 @@ fn make_dependent_module(m: &CompiledModule) -> CompiledModule { } #[cfg(test)] -fn check_gas_object( +pub fn check_gas_object( gas_object: &Object, expected_balance: u64, expected_sequence_number: SequenceNumber, @@ -1336,7 +1336,7 @@ async fn test_authority_persist() { committee.clone(), *authority_key.public_key_bytes(), // we assume that the node runner is in charge for its key -> it's ok to reopen a copy below. - Box::pin(authority_key.copy()), + Arc::pin(authority_key.copy()), store, vec![], &mut genesis::get_genesis_context(), @@ -1364,7 +1364,7 @@ async fn test_authority_persist() { let authority2 = AuthorityState::new( committee, *authority_key.public_key_bytes(), - Box::pin(authority_key), + Arc::pin(authority_key), store, vec![], &mut genesis::get_genesis_context(), @@ -1553,7 +1553,7 @@ async fn init_state() -> AuthorityState { AuthorityState::new( committee, *authority_key.public_key_bytes(), - Box::pin(authority_key), + Arc::pin(authority_key), store, genesis::clone_genesis_compiled_modules(), &mut genesis::get_genesis_context(), @@ -1576,7 +1576,7 @@ async fn init_state_with_ids>( state } -async fn init_state_with_objects>(objects: I) -> AuthorityState { +pub async fn init_state_with_objects>(objects: I) -> AuthorityState { let state = init_state().await; for o in objects { @@ -1717,7 +1717,7 @@ async fn call_framework_code( .await } -async fn create_move_object( +pub async fn create_move_object( authority: &AuthorityState, gas_object_id: &ObjectID, sender: &SuiAddress, diff --git a/sui_core/src/unit_tests/batch_tests.rs b/sui_core/src/unit_tests/batch_tests.rs new file mode 100644 index 0000000000000..ccbf47197f186 --- /dev/null +++ b/sui_core/src/unit_tests/batch_tests.rs @@ -0,0 +1,279 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use crate::authority::authority_tests::*; + +use std::env; +use std::fs; + +#[tokio::test] +async fn test_open_manager() { + // let (_, authority_key) = get_key_pair(); + + // Create a random directory to store the DB + let dir = env::temp_dir(); + let path = dir.join(format!("DB_{:?}", ObjectID::random())); + fs::create_dir(&path).unwrap(); + + // Make a test key pair + let (_, key_pair) = get_key_pair(); + let key_pair = Arc::pin(key_pair); + let address = *key_pair.public_key_bytes(); + + { + // Create an authority + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(max_files_authority_tests()); + let store = Arc::new(AuthorityStore::open(&path, Some(opts))); + + // TEST 1: init from an empty database should return to a zero block + let (_send, manager, _pair) = BatchManager::new(store.clone(), 100); + let last_block = manager + .init_from_database(address, key_pair.clone()) + .await + .expect("No error expected."); + + assert_eq!(0, last_block.next_sequence_number); + + // TEST 2: init from a db with a transaction not in the sequence makes a new block + // when we re-open the database. + + store + .executed_sequence + .insert(&0, &TransactionDigest::new([0; 32])) + .expect("no error on write"); + } + // drop all + { + // Create an authority + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(max_files_authority_tests()); + let store = Arc::new(AuthorityStore::open(&path, Some(opts))); + + let (_send, manager, _pair) = BatchManager::new(store.clone(), 100); + let last_block = manager + .init_from_database(address, key_pair.clone()) + .await + .expect("No error expected."); + + assert_eq!(1, last_block.next_sequence_number); + + // TEST 3: If the database contains out of order transactions we just make a block with gaps + store + .executed_sequence + .insert(&2, &TransactionDigest::new([0; 32])) + .expect("no error on write"); + } + // drop all + { + // Create an authority + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(max_files_authority_tests()); + let store = Arc::new(AuthorityStore::open(&path, Some(opts))); + + let (_send, manager, _pair) = BatchManager::new(store.clone(), 100); + let last_block = manager + .init_from_database(address, key_pair.clone()) + .await + .unwrap(); + + assert_eq!(last_block.next_sequence_number, 3); + assert_eq!(last_block.initial_sequence_number, 2); + assert_eq!(last_block.size, 1); + } +} + +#[tokio::test] +async fn test_batch_manager_happy_path() { + // let (_, authority_key) = get_key_pair(); + + // Create a random directory to store the DB + let dir = env::temp_dir(); + let path = dir.join(format!("DB_{:?}", ObjectID::random())); + fs::create_dir(&path).unwrap(); + + // Create an authority + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(max_files_authority_tests()); + let store = Arc::new(AuthorityStore::open(&path, Some(opts))); + + // Make a test key pair + let (_, key_pair) = get_key_pair(); + let key_pair = Arc::pin(key_pair); + let address = *key_pair.public_key_bytes(); + + // TEST 1: init from an empty database should return to a zero block + let (_send, manager, _pair) = BatchManager::new(store.clone(), 100); + let _join = manager + .start_service(address, key_pair, 1000, Duration::from_millis(500)) + .await + .expect("No errors starting manager."); + + // Send a transaction. + let tx_zero = TransactionDigest::new([0; 32]); + _send + .send_item(0, tx_zero) + .await + .expect("Send to the channel."); + + // First we get a transaction update + let (_tx, mut rx) = _pair; + assert!(matches!( + rx.recv().await.unwrap(), + UpdateItem::Transaction((0, _)) + )); + + // Then we (eventually) get a batch + assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_))); + + _send + .send_item(1, tx_zero) + .await + .expect("Send to the channel."); + + // When we close the sending channel we also also end the service task + drop(_send); + drop(_tx); + + _join.await.expect("No errors in task"); + + // But the block is made, and sent as a notification. + assert!(matches!( + rx.recv().await.unwrap(), + UpdateItem::Transaction((1, _)) + )); + assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_))); + assert!(matches!(rx.recv().await, Err(_))); +} + +#[tokio::test] +async fn test_batch_manager_out_of_order() { + // Create a random directory to store the DB + let dir = env::temp_dir(); + let path = dir.join(format!("DB_{:?}", ObjectID::random())); + fs::create_dir(&path).unwrap(); + + // Create an authority + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(max_files_authority_tests()); + let store = Arc::new(AuthorityStore::open(&path, Some(opts))); + + // Make a test key pair + let (_, key_pair) = get_key_pair(); + let key_pair = Arc::pin(key_pair); + let address = *key_pair.public_key_bytes(); + + // TEST 1: init from an empty database should return to a zero block + let (_send, manager, _pair) = BatchManager::new(store.clone(), 100); + let _join = manager + .start_service(address, key_pair, 4, Duration::from_millis(5000)) + .await + .expect("Start service with no issues."); + + // Send transactions out of order + let tx_zero = TransactionDigest::new([0; 32]); + _send + .send_item(1, tx_zero) + .await + .expect("Send to the channel."); + + _send + .send_item(3, tx_zero) + .await + .expect("Send to the channel."); + + _send + .send_item(2, tx_zero) + .await + .expect("Send to the channel."); + + _send + .send_item(0, tx_zero) + .await + .expect("Send to the channel."); + + // Get transactions in order then batch. + let (_tx, mut rx) = _pair; + assert!(matches!( + rx.recv().await.unwrap(), + UpdateItem::Transaction((0, _)) + )); + + assert!(matches!( + rx.recv().await.unwrap(), + UpdateItem::Transaction((1, _)) + )); + assert!(matches!( + rx.recv().await.unwrap(), + UpdateItem::Transaction((2, _)) + )); + assert!(matches!( + rx.recv().await.unwrap(), + UpdateItem::Transaction((3, _)) + )); + + // Then we (eventually) get a batch + assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_))); + + // When we close the sending channel we also also end the service task + drop(_send); + drop(_tx); + + _join.await.expect("No errors in task"); + + assert!(matches!(rx.recv().await, Err(_))); +} + +use sui_types::{crypto::get_key_pair, object::Object}; + +#[tokio::test] +async fn test_handle_move_order_with_batch() { + let (sender, sender_key) = get_key_pair(); + let gas_payment_object_id = ObjectID::random(); + let gas_payment_object = Object::with_id_owner_for_testing(gas_payment_object_id, sender); + let mut authority_state = init_state_with_objects(vec![gas_payment_object]).await; + + // Create a listening infrastrucure. + let (_send, manager, _pair) = BatchManager::new(authority_state.db(), 100); + let _join = manager + .start_service( + authority_state.name, + authority_state.secret.clone(), + 4, + Duration::from_millis(500), + ) + .await + .expect("No issues starting service."); + + authority_state + .set_batch_sender(_send) + .expect("No problem registering"); + tokio::task::yield_now().await; + + let effects = create_move_object( + &authority_state, + &gas_payment_object_id, + &sender, + &sender_key, + ) + .await + .unwrap(); + + let (_tx, mut rx) = _pair; + + // Second and after is the one + let y = rx.recv().await.unwrap(); + println!("{:?}", y); + assert!(matches!( + y, + UpdateItem::Transaction((0, x)) if x == effects.transaction_digest + )); + + assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_))); + + drop(_tx); + drop(authority_state); + + _join.await.expect("No issues ending task."); +} diff --git a/sui_core/src/unit_tests/client_tests.rs b/sui_core/src/unit_tests/client_tests.rs index 6f63db241699e..965c2d705a2a0 100644 --- a/sui_core/src/unit_tests/client_tests.rs +++ b/sui_core/src/unit_tests/client_tests.rs @@ -301,7 +301,7 @@ async fn init_local_authorities( let state = AuthorityState::new( committee.clone(), authority_name, - Box::pin(secret), + Arc::pin(secret), store, genesis::clone_genesis_compiled_modules(), &mut genesis::get_genesis_context(), @@ -348,7 +348,7 @@ async fn init_local_authorities_bad( let state = AuthorityState::new( committee.clone(), address, - Box::pin(secret), + Arc::pin(secret), store, genesis::clone_genesis_compiled_modules(), &mut genesis::get_genesis_context(), diff --git a/sui_core/tests/staged/sui.yaml b/sui_core/tests/staged/sui.yaml index f95c8b5edd44f..bac3a583a3fb0 100644 --- a/sui_core/tests/staged/sui.yaml +++ b/sui_core/tests/staged/sui.yaml @@ -545,26 +545,28 @@ SuiError: NEWTYPE: TYPENAME: TypedStoreError 70: + BatchErrorSender: UNIT + 71: QuorumNotReached: STRUCT: - errors: SEQ: TYPENAME: SuiError - 71: - ObjectSerializationError: UNIT 72: - ConcurrentTransactionError: UNIT + ObjectSerializationError: UNIT 73: - IncorrectRecipientError: UNIT + ConcurrentTransactionError: UNIT 74: - TooManyIncorrectAuthorities: UNIT + IncorrectRecipientError: UNIT 75: - IncorrectGasSplit: UNIT + TooManyIncorrectAuthorities: UNIT 76: - IncorrectGasMerge: UNIT + IncorrectGasSplit: UNIT 77: - AccountNotFound: UNIT + IncorrectGasMerge: UNIT 78: + AccountNotFound: UNIT + 79: AccountExists: UNIT Transaction: STRUCT: diff --git a/sui_types/src/error.rs b/sui_types/src/error.rs index 317a21ed5e451..0d2b0da1ec6d6 100644 --- a/sui_types/src/error.rs +++ b/sui_types/src/error.rs @@ -213,6 +213,8 @@ pub enum SuiError { }, #[error("Storage error")] StorageError(#[from] typed_store::rocks::TypedStoreError), + #[error("Batch error: cannot send transaction to batch.")] + BatchErrorSender, #[error( "Failed to achieve quorum between authorities, cause by : {:#?}",