Skip to content

Commit

Permalink
Execution driver uses NodeSync for cert execution
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Jul 20, 2022
1 parent a50a934 commit fd1e726
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 93 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ prometheus = "0.13.1"
arc-swap = "1.5.0"
tokio-retry = "0.3"
scopeguard = "1.1"
once_cell = "1.11.0"

sui-adapter = { path = "../sui-adapter" }
sui-framework = { path = "../sui-framework" }
Expand Down
17 changes: 15 additions & 2 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ use tokio::task::JoinHandle;
use tracing::info;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI, node_sync::NodeSyncState,
authority::AuthorityState,
authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
node_sync::{NodeSyncHandle, NodeSyncState},
};
use once_cell::sync::OnceCell;
use tokio::time::Instant;

pub mod gossip;
Expand Down Expand Up @@ -108,6 +111,7 @@ pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
pub node_sync_state: Arc<NodeSyncState<A>>,
node_sync_handle: OnceCell<NodeSyncHandle>,

pub follower_store: Arc<FollowerStore>,
// The network interfaces to other authorities
Expand Down Expand Up @@ -142,6 +146,7 @@ impl<A> ActiveAuthority<A> {
)),
state: authority,
node_sync_state,
node_sync_handle: OnceCell::new(),
follower_store,
net: ArcSwap::from(net),
})
Expand Down Expand Up @@ -216,6 +221,7 @@ impl<A> Clone for ActiveAuthority<A> {
ActiveAuthority {
state: self.state.clone(),
node_sync_state: self.node_sync_state.clone(),
node_sync_handle: self.node_sync_handle.clone(),
follower_store: self.follower_store.clone(),
net: ArcSwap::from(self.net.load().clone()),
health: self.health.clone(),
Expand All @@ -227,6 +233,13 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
fn node_sync_handle(&self) -> NodeSyncHandle {
let node_sync_state = self.node_sync_state.clone();
self.node_sync_handle
.get_or_init(|| NodeSyncHandle::new(node_sync_state))
.clone()
}

pub async fn sync_to_latest_checkpoint(&self, metrics: &CheckpointMetrics) -> SuiResult {
self.sync_to_latest_checkpoint_with_config(metrics, Default::default())
.await
Expand Down
27 changes: 21 additions & 6 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use sui_types::{
};
use tokio::time::Instant;

use crate::epoch::reconfiguration::Reconfigurable;
use futures::stream::StreamExt;

use crate::{
authority_aggregator::{AuthorityAggregator, ReduceOutput},
authority_client::AuthorityAPI,
checkpoints::CheckpointStore,
node_sync::NodeSyncState,
epoch::reconfiguration::Reconfigurable,
};

use sui_types::committee::{Committee, StakeUnit};
Expand Down Expand Up @@ -647,11 +648,25 @@ where
let (past, contents) =
get_one_checkpoint_with_contents(net.clone(), seq, &available_authorities).await?;

active_authority
.node_sync_state
.clone()
let errors = active_authority
.node_sync_handle()
.sync_checkpoint(&contents)
.await?;
.zip(futures::stream::iter(contents.iter()))
.filter_map(|(r, digests)| async move {
r.map_err(|e| {
info!(?digests, "failed to execute digest from checkpoint: {}", e);
e
})
.err()
})
.collect::<Vec<SuiError>>()
.await;

if !errors.is_empty() {
let error = "Failed to sync transactions in checkpoint".to_string();
error!(?seq, "{}", error);
return Err(SuiError::CheckpointingError { error });
}

checkpoint_db.lock().process_new_checkpoint_certificate(
&past,
Expand Down
64 changes: 17 additions & 47 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
use std::sync::Arc;
use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::CertifiedTransaction};
use tracing::{debug, info};
use typed_store::Map;

use crate::authority::AuthorityStore;
use crate::authority_client::AuthorityAPI;

use super::{gossip::LocalCertificateHandler, ActiveAuthority};
use futures::{stream, StreamExt};

use super::ActiveAuthority;

#[cfg(test)]
pub(crate) mod tests;
Expand Down Expand Up @@ -74,54 +75,23 @@ async fn execute_pending<A>(active_authority: &ActiveAuthority<A>) -> SuiResult<
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let _committee = active_authority.state.committee.load().clone();
let net = active_authority.net.load().clone();

// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;

// Get all the actual certificates mapping to these pending transactions
let certs = active_authority
.state
.database
.certificates
.multi_get(pending_transactions.iter().map(|(_, d)| *d))?;

// Zip seq, digest with certs. Note the cert must exist in the DB
let cert_seq: Vec<_> = pending_transactions
.iter()
.zip(certs.iter())
.map(|((i, d), c)| (i, d, c.as_ref().expect("certificate must exist")))
.collect();

let local_handler = LocalCertificateHandler {
state: active_authority.state.clone(),
};

// TODO: implement properly efficient execution for the block of transactions.
let mut executed = vec![];
for (i, d, c) in cert_seq {
// Only execute if not already executed.
if active_authority.state.database.effects_exists(d)? {
executed.push(*i);
continue;
}

debug!(digest=?d, "Pending execution for certificate.");

// Sync and Execute with local authority state
net.sync_certificate_to_authority_with_timeout_inner(
c.clone(),
active_authority.state.name,
&local_handler,
tokio::time::Duration::from_secs(10),
10,
)
.await?;

// Remove from the execution list
executed.push(*i);
}
let sync_handle = active_authority.node_sync_handle();

// Send them for execution
let executed = sync_handle
// map to extract digest
.handle_execution_request(pending_transactions.iter().map(|(_, digest)| *digest))
// zip results back together with seq
.zip(stream::iter(
pending_transactions.iter().map(|(seq, _)| *seq),
))
// filter out errors
.filter_map(|(result, seq)| async move { result.ok().map(|_| seq) })
.collect()
.await;

// Now update the pending store.
active_authority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn pending_exec_storage_notify() {
// get back the certificates
let certs_back = authority_state
.database
.get_pending_certificates()
.get_pending_digests()
.expect("DB should be there");
assert_eq!(num_certs, certs_back.len());
}
Expand Down Expand Up @@ -166,7 +166,7 @@ async fn pending_exec_full() {
.expect("Storage is ok");
let certs_back = authority_state
.database
.get_pending_certificates()
.get_pending_digests()
.expect("DB should be there");
assert_eq!(num_certs, certs_back.len());

Expand All @@ -177,7 +177,7 @@ async fn pending_exec_full() {
// get back the certificates
let certs_back = authority_state
.database
.get_pending_certificates()
.get_pending_digests()
.expect("DB should be there");
assert_eq!(0, certs_back.len());
}
3 changes: 1 addition & 2 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
authority::AuthorityState,
authority_aggregator::{AuthorityAggregator, CertificateHandler},
authority_client::AuthorityAPI,
node_sync::NodeSyncDigestHandler,
safe_client::SafeClient,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -83,7 +82,7 @@ where
follower_process(
active_authority,
degree,
NodeSyncDigestHandler::new(active_authority.node_sync_state.clone()),
active_authority.node_sync_handle(),
GossipType::Full,
)
.await;
Expand Down
Loading

0 comments on commit fd1e726

Please sign in to comment.