Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[authority sync] Provide an interface for explorers to sync with single authority #509

Merged
merged 36 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f295ff4
Added tables for execution sequence and blocks
Feb 21, 2022
fd9fedc
Added architecture skeleton
Feb 21, 2022
12584a9
Rename to batch
Feb 22, 2022
9bacf5e
FMT
Feb 22, 2022
8ae557a
Added batch db open logic, and tests
Feb 22, 2022
fe359d9
Rename block to batch in code and comments
Feb 22, 2022
864ca6f
Batch logic and tests
Feb 22, 2022
fed3e4b
Appease clippy & fmt
Feb 22, 2022
9c3ba3e
Add out of order test
Feb 22, 2022
2dbead9
Logic to fix the database
Feb 22, 2022
f5477e2
Remove unused error
Feb 22, 2022
3a13f05
Rename test
Feb 22, 2022
99e3599
Added comments
Feb 22, 2022
dbdf817
Make fmt happy
Feb 22, 2022
0adec15
Minor changes
Feb 23, 2022
01d6172
Define clean consutructors
Feb 23, 2022
49a3a45
Clean Licence
Feb 23, 2022
01776c9
Integrations of batch listener into authority & tests
Feb 23, 2022
bd62573
Make fmt & clippy happy
Feb 23, 2022
060c85b
Move from usize to u64 for seq numbers
Feb 24, 2022
2ede6ea
Make fmt / clippy happy
Feb 24, 2022
a086b7f
Do not add genesis to transaction sequence
Mar 1, 2022
3d1c3cb
Updated from review comments
Mar 1, 2022
a76a810
Remove confusing comment
Mar 1, 2022
945942d
Added hashes to batches
Mar 2, 2022
eae08cd
Updated names to Batch(-er)
Mar 2, 2022
a1f4ca6
Make fmt happy
Mar 2, 2022
ba2ef2a
Created structures for signed batches
Mar 2, 2022
775c5de
Handle SignedBatches instead of Batches
Mar 2, 2022
03cee3c
Remove pub from file
Mar 2, 2022
afcba11
Appease clippy
Mar 2, 2022
1e09d11
Turn on format test and do fmt
Mar 2, 2022
6b7ae14
Use TxSequenceNumber
Mar 3, 2022
bd63cb0
Allow gaps in the sequence + simplify
Mar 3, 2022
3992943
Updated structures
Mar 3, 2022
3c94310
Fixed clippy on incoming?
Mar 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sui/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ClientServerBenchmark {
let state = AuthorityState::new(
committee.clone(),
public_auth0,
Box::pin(secret_auth0),
Arc::pin(secret_auth0),
store,
genesis::clone_genesis_compiled_modules(),
&mut genesis::get_genesis_context(),
Expand Down
2 changes: 1 addition & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async fn make_server_with_genesis_ctx(
let state = AuthorityState::new(
committee.clone(),
name,
Box::pin(authority.key_pair.copy()),
Arc::pin(authority.key_pair.copy()),
store,
preload_modules,
genesis_ctx,
Expand Down
44 changes: 39 additions & 5 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use sui_types::{
MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS,
};

use crate::authority_batch::BatchSender;

#[cfg(test)]
#[path = "unit_tests/authority_tests.rs"]
mod authority_tests;
pub mod authority_tests;

mod temporary_store;
use temporary_store::AuthorityTemporaryStore;
Expand All @@ -46,7 +48,8 @@ const MAX_GAS_BUDGET: u64 = 18446744073709551615 / 1000 - 1;
///
/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
///
type StableSyncAuthoritySigner = Pin<Box<dyn signature::Signer<AuthoritySignature> + Send + Sync>>;
pub type StableSyncAuthoritySigner =
Pin<Arc<dyn signature::Signer<AuthoritySignature> + Send + Sync>>;

pub struct AuthorityState {
// Fixed size, static, identity of the authority
Expand All @@ -63,6 +66,11 @@ pub struct AuthorityState {

/// The database
_database: Arc<AuthorityStore>,

/// The sender to notify of new transactions
/// and create batches for this authority.
/// Keep as None if there is no need for this.
batch_sender: Option<BatchSender>,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand All @@ -72,6 +80,16 @@ pub struct AuthorityState {
///
/// Repeating valid commands should produce no changes and return no error.
impl AuthorityState {
/// Set a listener for transaction certificate updates. Returns an
/// error if a listener is already registered.
pub fn set_batch_sender(&mut self, batch_sender: BatchSender) -> SuiResult {
if self.batch_sender.is_some() {
return Err(SuiError::AuthorityUpdateFailure);
}
self.batch_sender = Some(batch_sender);
Ok(())
}

/// The logic to check one object against a reference, and return the object if all is well
/// or an error if not.
fn check_one_lock(
Expand Down Expand Up @@ -381,8 +399,17 @@ impl AuthorityState {
&gas_object_id,
);
// Update the database in an atomic manner
self.update_state(temporary_store, certificate, to_signed_effects)
.await // Returns the TransactionInfoResponse

let (seq, resp) = self
.update_state(temporary_store, certificate, to_signed_effects)
.await?; // Returns the OrderInfoResponse

// If there is a notifier registered, notify:
if let Some(sender) = &self.batch_sender {
sender.send_item(seq, transaction_digest).await?;
}

Ok(resp)
}

fn execute_transaction(
Expand Down Expand Up @@ -616,6 +643,7 @@ impl AuthorityState {
move_vm: adapter::new_move_vm(native_functions)
.expect("We defined natives to not fail here"),
_database: store,
batch_sender: None,
};

for genesis_modules in genesis_packages {
Expand All @@ -627,6 +655,11 @@ impl AuthorityState {
state
}

#[cfg(test)]
pub fn db(&self) -> Arc<AuthorityStore> {
self._database.clone()
}

async fn get_object(&self, object_id: &ObjectID) -> Result<Option<Object>, SuiError> {
self._database.get_object(object_id)
}
Expand Down Expand Up @@ -708,9 +741,10 @@ impl AuthorityState {
async fn update_state(
&self,
temporary_store: AuthorityTemporaryStore,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extraneous line can be removed

certificate: CertifiedTransaction,
signed_effects: SignedTransactionEffects,
) -> Result<TransactionInfoResponse, SuiError> {
) -> Result<(u64, TransactionInfoResponse), SuiError> {
self._database
.update_state(temporary_store, certificate, signed_effects)
}
Expand Down
86 changes: 75 additions & 11 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ use rocksdb::Options;
use std::collections::BTreeSet;
use std::convert::TryInto;
use std::path::Path;

use std::sync::atomic::AtomicU64;
use sui_types::base_types::SequenceNumber;
use typed_store::rocks::{open_cf, DBBatch, DBMap};

use std::sync::atomic::Ordering;
use typed_store::traits::Map;

pub use crate::authority_batch::AuthorityBatch;
use crate::authority_batch::{SignedBatch, TxSequenceNumber};

pub struct AuthorityStore {
/// This is a map between the object ID and the latest state of the object, namely the
/// state that is needed to process new transactions. If an object is deleted its entry is
Expand Down Expand Up @@ -61,6 +68,16 @@ pub struct AuthorityStore {

/// Internal vector of locks to manage concurrent writes to the database
lock_table: Vec<parking_lot::Mutex<()>>,

// Tables used for authority batch structure
/// A sequence on all executed certificates and effects.
pub executed_sequence: DBMap<TxSequenceNumber, TransactionDigest>,

/// A sequence of batches indexing into the sequence of executed transactions.
pub batches: DBMap<TxSequenceNumber, SignedBatch>,

/// The next available sequence number to use in the `executed sequence` table.
pub next_sequence_number: AtomicU64,
}

impl AuthorityStore {
Expand All @@ -79,9 +96,27 @@ impl AuthorityStore {
"signed_effects",
"sequenced",
"schedule",
"executed_sequence",
"batches",
],
)
.expect("Cannot open DB.");

let executed_sequence =
DBMap::reopen(&db, Some("executed_sequence")).expect("Cannot open CF.");

// Read the index of the last entry in the sequence of commands
// to extract the next sequence number or it is zero.
let next_sequence_number = AtomicU64::new(
executed_sequence
.iter()
.skip_prior_to(&TxSequenceNumber::MAX)
.expect("Error reading table.")
.next()
.map(|(v, _)| v + 1u64)
.unwrap_or(0),
);

AuthorityStore {
objects: DBMap::reopen(&db, Some("objects")).expect("Cannot open CF."),
owner_index: DBMap::reopen(&db, Some("owner_index")).expect("Cannot open CF."),
Expand All @@ -98,6 +133,9 @@ impl AuthorityStore {
.into_iter()
.map(|_| parking_lot::Mutex::new(()))
.collect(),
executed_sequence,
batches: DBMap::reopen(&db, Some("batches")).expect("Cannot open CF."),
next_sequence_number,
}
}

Expand Down Expand Up @@ -330,7 +368,7 @@ impl AuthorityStore {
temporary_store: AuthorityTemporaryStore,
certificate: CertifiedTransaction,
signed_effects: SignedTransactionEffects,
) -> Result<TransactionInfoResponse, SuiError> {
) -> Result<(TxSequenceNumber, TransactionInfoResponse), SuiError> {
// Extract the new state from the execution
// TODO: events are already stored in the TxDigest -> TransactionEffects store. Is that enough?
let mut write_batch = self.transaction_lock.batch();
Expand All @@ -348,13 +386,19 @@ impl AuthorityStore {
std::iter::once((transaction_digest, &signed_effects)),
)?;

self.batch_update_objects(write_batch, temporary_store, transaction_digest)?;

Ok(TransactionInfoResponse {
signed_transaction: self.signed_transactions.get(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
})
// Safe to unwrap since the "true" flag ensures we get a sequence value back.
let seq: TxSequenceNumber = self
.batch_update_objects(write_batch, temporary_store, transaction_digest, true)?
.unwrap();

Ok((
seq,
TransactionInfoResponse {
signed_transaction: self.signed_transactions.get(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
},
))
}

/// Persist temporary storage to DB for genesis modules
Expand All @@ -365,7 +409,8 @@ impl AuthorityStore {
) -> Result<(), SuiError> {
debug_assert_eq!(transaction_digest, TransactionDigest::genesis());
let write_batch = self.transaction_lock.batch();
self.batch_update_objects(write_batch, temporary_store, transaction_digest)
self.batch_update_objects(write_batch, temporary_store, transaction_digest, false)
.map(|_| ())
}

/// Helper function for updating the objects in the state
Expand All @@ -374,7 +419,8 @@ impl AuthorityStore {
mut write_batch: DBBatch,
temporary_store: AuthorityTemporaryStore,
transaction_digest: TransactionDigest,
) -> Result<(), SuiError> {
should_sequence: bool,
) -> Result<Option<TxSequenceNumber>, SuiError> {
let (objects, active_inputs, written, deleted, _events) = temporary_store.into_inner();

// Archive the old lock.
Expand Down Expand Up @@ -454,6 +500,7 @@ impl AuthorityStore {

// This is the critical region: testing the locks and writing the
// new locks must be atomic, and no writes should happen in between.
let mut return_seq = None;
{
// Acquire the lock to ensure no one else writes when we are in here.
let _mutexes = self.acquire_locks(&active_inputs[..]);
Expand All @@ -465,13 +512,30 @@ impl AuthorityStore {
object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?;
}

if should_sequence {
// Now we are sure we are going to execute, add to the sequence
// number and insert into authority sequence.
//
// NOTE: it is possible that we commit to the database transactions
// out of order with respect to their sequence number. It is also
// possible for the authority to crash without committing the
// full sequence, and the batching logic needs to deal with this.
let next_seq = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
write_batch = write_batch.insert_batch(
&self.executed_sequence,
std::iter::once((next_seq, transaction_digest)),
)?;

return_seq = Some(next_seq);
}

// Atomic write of all locks & other data
write_batch.write()?;

// implicit: drop(_mutexes);
} // End of critical region

Ok(())
Ok(return_seq)
}

/// Returns the last entry we have for this object in the parents_sync index used
Expand Down
Loading