Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified block storage #53

Merged
merged 40 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
013478c
moved stuff around
pompon0 Dec 14, 2023
79f0983
snapshot
pompon0 Dec 15, 2023
eb9ec66
snapshot; config verification got distorted
pompon0 Dec 15, 2023
7b9e104
stuff compiles except for tests
pompon0 Dec 15, 2023
e5f76be
tests pass
pompon0 Dec 15, 2023
e255fa7
simplified blocks in test
pompon0 Dec 15, 2023
50342c1
removed unused loadtest binary modes
pompon0 Dec 18, 2023
5963f69
removed header from FinalBlock
pompon0 Dec 19, 2023
ed6fe4b
rewritten storage
pompon0 Dec 20, 2023
27e8a1a
bft compiles; ut_harness is not running storage background tasks
pompon0 Dec 20, 2023
8123774
before moving payload back to bft
pompon0 Dec 21, 2023
b03d770
reorganized storage crate
pompon0 Dec 21, 2023
ae338ef
splitted traits
pompon0 Dec 21, 2023
fcbaa16
leader tests migrated
pompon0 Dec 21, 2023
25da63b
bft tests pass
pompon0 Dec 22, 2023
4fd6acf
sync_blocks rewrite wip
pompon0 Dec 22, 2023
6b2be51
compiles
pompon0 Dec 22, 2023
12e6277
tests compile (1 commented out)
pompon0 Dec 22, 2023
0f72363
all tests pass
pompon0 Dec 23, 2023
dbd860c
cargo fmt
pompon0 Dec 23, 2023
c87326e
tests pass
pompon0 Dec 23, 2023
2b4b992
compiles
pompon0 Dec 23, 2023
2b5808c
executor tests pass
pompon0 Dec 23, 2023
93ab685
fixed loadtest
pompon0 Dec 24, 2023
5c3ccda
moved rocksdb dep
pompon0 Dec 28, 2023
c3ea6d4
separate runner, cargo fmt
pompon0 Dec 28, 2023
ced17c9
require persistent storage before proposing/verifying
pompon0 Dec 28, 2023
66ef974
cargo fmt
pompon0 Jan 1, 2024
4aad921
Merge remote-tracking branch 'origin/main' into gprusak-tools-config
pompon0 Jan 3, 2024
f2dae9a
nits
pompon0 Jan 3, 2024
eb0f5eb
applied comments
pompon0 Jan 4, 2024
f4d530b
deque
pompon0 Jan 4, 2024
10b3595
applied comments
pompon0 Jan 4, 2024
6edc1b2
store_block -> queue_block
pompon0 Jan 4, 2024
3f2d8da
Merge remote-tracking branch 'origin/main' into gprusak-tools-config
pompon0 Jan 4, 2024
2e9ddde
Update node/actors/bft/src/replica/state_machine.rs
pompon0 Jan 5, 2024
c319079
added metrics for storage
pompon0 Jan 8, 2024
0a14d00
nonempty block store
pompon0 Jan 8, 2024
14e963f
Merge remote-tracking branch 'refs/remotes/origin/gprusak-tools-confi…
pompon0 Jan 8, 2024
b750173
applied comments
pompon0 Jan 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,4 @@ wildcard_dependencies = "warn"
# Produces too many false positives.
redundant_locals = "allow"
needless_pass_by_ref_mut = "allow"
box_default = "allow"
33 changes: 16 additions & 17 deletions node/actors/bft/src/inner.rs → node/actors/bft/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
//! The inner data of the consensus state machine. This is shared between the different roles.

use crate::{io::OutputMessage, misc};
use crate::{misc, PayloadManager};
use std::sync::Arc;
use tracing::instrument;
use zksync_concurrency::ctx::channel;
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

/// The ConsensusInner struct, it contains data to be shared with the state machines. This is never supposed
/// to be modified, except by the Consensus struct.
/// Configuration of the bft actor.
#[derive(Debug)]
pub(crate) struct ConsensusInner {
/// The communication pipe. This is used to send outputs.
pub(crate) pipe: channel::UnboundedSender<OutputMessage>,
pub struct Config {
/// The validator's secret key.
pub(crate) secret_key: validator::SecretKey,
pub secret_key: validator::SecretKey,
/// A vector of public keys for all the validators in the network.
pub(crate) validator_set: validator::ValidatorSet,
pub validator_set: validator::ValidatorSet,
/// Block store.
pub block_store: Arc<storage::BlockStore>,
/// Replica store.
pub replica_store: Box<dyn storage::ReplicaStore>,
/// Payload manager.
pub payload_manager: Box<dyn PayloadManager>,
}

impl ConsensusInner {
impl Config {
/// The maximum size of the payload of a block, in bytes. We will
/// reject blocks with payloads larger than this.
pub(crate) const PAYLOAD_MAX_SIZE: usize = 500 * zksync_protobuf::kB;
Expand All @@ -33,16 +36,12 @@ impl ConsensusInner {
/// for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn threshold(&self) -> usize {
let num_validators = self.validator_set.len();

misc::consensus_threshold(num_validators)
misc::consensus_threshold(self.validator_set.len())
}

/// Calculate the maximum number of faulty replicas, for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn faulty_replicas(&self) -> usize {
let num_validators = self.validator_set.len();

misc::faulty_replicas(num_validators)
misc::faulty_replicas(self.validator_set.len())
}
}
14 changes: 7 additions & 7 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl StateMachine {

// Check that the message signer is in the validator set.
let validator_index =
self.inner
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand All @@ -84,7 +84,7 @@ impl StateMachine {
}

// If the message is for a view when we are not a leader, we discard it.
if self.inner.view_leader(message.view) != self.inner.secret_key.public() {
if self.config.view_leader(message.view) != self.config.secret_key.public() {
return Err(Error::NotLeaderInView);
}

Expand All @@ -109,7 +109,7 @@ impl StateMachine {
// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view)
.or_insert(CommitQC::new(message, &self.inner.validator_set))
.or_insert(CommitQC::new(message, &self.config.validator_set))
.add(&signed_message.sig, validator_index);

// We store the message in our cache.
Expand All @@ -123,11 +123,11 @@ impl StateMachine {
}
let Some((_, replica_messages)) = by_proposal
.into_iter()
.find(|(_, v)| v.len() >= self.inner.threshold())
.find(|(_, v)| v.len() >= self.config.threshold())
else {
return Ok(());
};
debug_assert_eq!(replica_messages.len(), self.inner.threshold());
debug_assert_eq!(replica_messages.len(), self.config.threshold());

// ----------- Update the state machine --------------

Expand All @@ -151,7 +151,7 @@ impl StateMachine {
// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self
.inner
.config
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit {
Expand All @@ -161,7 +161,7 @@ impl StateMachine {
)),
recipient: Target::Broadcast,
};
self.inner.pipe.send(output_message.into());
self.pipe.send(output_message.into());

// Clean the caches.
self.prepare_message_cache.retain(|k, _| k >= &self.view);
Expand Down
12 changes: 6 additions & 6 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl StateMachine {

// Check that the message signer is in the validator set.
let validator_index =
self.inner
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand All @@ -109,7 +109,7 @@ impl StateMachine {
}

// If the message is for a view when we are not a leader, we discard it.
if self.inner.view_leader(message.view) != self.inner.secret_key.public() {
if self.config.view_leader(message.view) != self.config.secret_key.public() {
return Err(Error::NotLeaderInView);
}

Expand All @@ -134,7 +134,7 @@ impl StateMachine {
// Verify the high QC.
message
.high_qc
.verify(&self.inner.validator_set, self.inner.threshold())
.verify(&self.config.validator_set, self.config.threshold())
.map_err(Error::InvalidHighQC)?;

// If the high QC is for a future view, we discard the message.
Expand All @@ -153,7 +153,7 @@ impl StateMachine {
self.prepare_qcs.entry(message.view).or_default().add(
&signed_message,
validator_index,
&self.inner.validator_set,
&self.config.validator_set,
);

// We store the message in our cache.
Expand All @@ -165,15 +165,15 @@ impl StateMachine {
// Now we check if we have enough messages to continue.
let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len();

if num_messages < self.inner.threshold() {
if num_messages < self.config.threshold() {
return Ok(());
}

// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
self.prepare_message_cache.remove(&message.view);

debug_assert_eq!(num_messages, self.inner.threshold());
debug_assert_eq!(num_messages, self.config.threshold());

// ----------- Update the state machine --------------

Expand Down
52 changes: 30 additions & 22 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{metrics, ConsensusInner, PayloadSource};
use crate::{metrics, Config, OutputSender};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
Expand All @@ -7,14 +7,16 @@ use std::{
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC};
use zksync_consensus_roles::validator;

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
/// those messages. When participating in consensus we are not the leader most of the time.
pub(crate) struct StateMachine {
/// Consensus configuration and output channel.
pub(crate) inner: Arc<ConsensusInner>,
pub(crate) config: Arc<Config>,
/// Pipe through with leader sends network messages.
pub(crate) pipe: OutputSender,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
pub(crate) view: validator::ViewNumber,
Expand All @@ -29,24 +31,25 @@ pub(crate) struct StateMachine {
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, PrepareQC>,
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, validator::PrepareQC>,
/// Newest prepare QC composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<PrepareQC>>,
pub(crate) prepare_qc: sync::watch::Sender<Option<validator::PrepareQC>>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, CommitQC>,
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, validator::CommitQC>,
}

impl StateMachine {
/// Creates a new StateMachine struct.
#[instrument(level = "trace")]
pub fn new(ctx: &ctx::Ctx, inner: Arc<ConsensusInner>) -> Self {
pub fn new(ctx: &ctx::Ctx, config: Arc<Config>, pipe: OutputSender) -> Self {
StateMachine {
inner,
config,
pipe,
view: validator::ViewNumber(0),
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
Expand Down Expand Up @@ -106,9 +109,9 @@ impl StateMachine {
/// that the validator doesn't spend time on generating payloads for already expired views.
pub(crate) async fn run_proposer(
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
mut prepare_qc: sync::watch::Receiver<Option<PrepareQC>>,
config: &Config,
mut prepare_qc: sync::watch::Receiver<Option<validator::PrepareQC>>,
pipe: &OutputSender,
) -> ctx::Result<()> {
let mut next_view = validator::ViewNumber(0);
loop {
Expand All @@ -119,17 +122,17 @@ impl StateMachine {
continue;
};
next_view = prepare_qc.view().next();
Self::propose(ctx, inner, payload_source, prepare_qc).await?;
Self::propose(ctx, config, prepare_qc, pipe).await?;
}
}

/// Sends a LeaderPrepare for the given PrepareQC.
/// Uses `payload_source` to generate a payload if needed.
pub(crate) async fn propose(
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
justification: PrepareQC,
cfg: &Config,
justification: validator::PrepareQC,
pipe: &OutputSender,
) -> ctx::Result<()> {
// Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum
// in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas.
Expand All @@ -141,11 +144,11 @@ impl StateMachine {
let highest_vote: Option<validator::BlockHeader> = count
.iter()
// We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes.
.find_map(|(h, v)| (*v > 2 * inner.faulty_replicas()).then_some(h))
.find_map(|(h, v)| (*v > 2 * cfg.faulty_replicas()).then_some(h))
.cloned();

// Get the highest CommitQC.
let highest_qc: &CommitQC = justification
// Get the highest validator::CommitQC.
let highest_qc: &validator::CommitQC = justification
.map
.keys()
.map(|s| &s.high_qc)
Expand All @@ -162,8 +165,13 @@ impl StateMachine {
Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None),
// The previous block was finalized, so we can propose a new block.
_ => {
let payload = payload_source
.propose(ctx, highest_qc.message.proposal.number.next())
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
cfg.block_store
.wait_until_stored(ctx, highest_qc.header().number)
.await?;
let payload = cfg
.payload_manager
.propose(ctx, highest_qc.header().number.next())
.await?;
metrics::METRICS
.leader_proposal_payload_size
Expand All @@ -177,7 +185,7 @@ impl StateMachine {
// ----------- Prepare our message and send it --------------

// Broadcast the leader prepare message to all replicas (ourselves included).
let msg = inner
let msg = cfg
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderPrepare(
validator::LeaderPrepare {
Expand All @@ -188,7 +196,7 @@ impl StateMachine {
justification,
},
));
inner.pipe.send(
pipe.send(
ConsensusInputMessage {
message: msg,
recipient: Target::Broadcast,
Expand Down
Loading
Loading