Skip to content

Commit

Permalink
feat: update overlord with brake engine (nervosnetwork#159)
Browse files Browse the repository at this point in the history
* feat(consensus): update overlord with brake engine

* chore: update overlord version

* feat: register choke handler in p2p

* update overlord

tmp

* feat(consensus): update overlord with brake engine

* chore: update overlord version

* feat: register choke handler in p2p

* update overlord

tmp

update overlord

update overlord

update overlord

update overlord

* update overlord

update overlord

* fix(sync): Save the latest height when an error occurred

* fix(sync): Check remote block

* chore: Add log for check block

* chore: print order hashes length

* feat: add panic hook to logger (nervosnetwork#156)

* feat: add panic hook to logger

* chore: change crate location

* refactor: Remove boradcast when start sync

* fix: empty cycle used in genesis block

* filter callback_cache while pull txs (nervosnetwork#158)

* chore: update overlord

* fix CI

Co-authored-by: Jiayu Ye <yejiayu.fe@gmail.com>
Co-authored-by: Wenchao Hu <me@huwenchao.com>
Co-authored-by: rev-chaos <wancencen@cryptape.com>
  • Loading branch information
4 people authored and zeroqn committed Feb 11, 2020
1 parent e5566de commit 78d4198
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 60 deletions.
1 change: 1 addition & 0 deletions built-in-services/metadata/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ fn mock_metadata_1() -> Metadata {
propose_ratio: 10,
prevote_ratio: 10,
precommit_ratio: 10,
brake_ratio: 7,
}
}
fn mock_metadata_2() -> Metadata {
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures-timer = "3.0"
hex = "0.4"
log = "0.4"
moodyblues-sdk = "0.3"
overlord = "0.2.0-alpha.3"
overlord = "0.2.0-alpha.4"
parking_lot = "0.10"
rlp = "0.4"
serde = {version = "1.0", features = ["derive"]}
Expand Down
2 changes: 2 additions & 0 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ where
propose_ratio: u64,
prevote_ratio: u64,
precommit_ratio: u64,
brake_ratio: u64,
validators: Vec<Validator>,
) -> ProtocolResult<()> {
self.overlord_handler
Expand All @@ -259,6 +260,7 @@ where
propose_ratio,
prevote_ratio,
precommit_ratio,
brake_ratio,
validators,
)),
)
Expand Down
16 changes: 15 additions & 1 deletion core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use creep::Context;
use futures::lock::Mutex;
use overlord::types::{AggregatedVote, Node, OverlordMsg, SignedProposal, SignedVote, Status};
use overlord::types::{
AggregatedVote, Node, OverlordMsg, SignedChoke, SignedProposal, SignedVote, Status,
};
use overlord::{DurationConfig, Overlord, OverlordHandler};

use common_crypto::{BlsCommonReference, BlsPrivateKey, BlsPublicKey};
Expand Down Expand Up @@ -57,6 +59,15 @@ impl<Adapter: ConsensusAdapter + 'static> Consensus for OverlordConsensus<Adapte
.map_err(|e| ConsensusError::OverlordErr(Box::new(e)))?;
Ok(())
}

async fn set_choke(&self, ctx: Context, choke: Vec<u8>) -> ProtocolResult<()> {
let signed_choke: SignedChoke =
rlp::decode(&choke).map_err(|_| ConsensusError::DecodeErr(MsgType::SignedChoke))?;
self.handler
.send_msg(ctx, OverlordMsg::SignedChoke(signed_choke))
.map_err(|e| ConsensusError::OverlordErr(Box::new(e)))?;
Ok(())
}
}

impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
Expand Down Expand Up @@ -96,6 +107,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
status.propose_ratio,
status.prevote_ratio,
status.precommit_ratio,
status.brake_ratio,
status.validators,
)),
)
Expand Down Expand Up @@ -133,6 +145,7 @@ pub fn gen_overlord_status(
propose_ratio: u64,
prevote_ratio: u64,
precommit_ratio: u64,
brake_ratio: u64,
validators: Vec<Validator>,
) -> Status {
let mut authority_list = validators
Expand All @@ -152,6 +165,7 @@ pub fn gen_overlord_status(
propose_ratio,
prevote_ratio,
precommit_ratio,
brake_ratio,
}),
authority_list,
}
Expand Down
19 changes: 18 additions & 1 deletion core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use protocol::{Bytes, ProtocolError, ProtocolResult};

use crate::fixed_types::FixedPill;
use crate::message::{
END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_PROPOSAL, END_GOSSIP_SIGNED_VOTE,
END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_CHOKE, END_GOSSIP_SIGNED_PROPOSAL,
END_GOSSIP_SIGNED_VOTE,
};
use crate::status::StatusAgent;
use crate::{ConsensusError, StatusCacheField};
Expand Down Expand Up @@ -104,7 +105,10 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
hash: Bytes,
block: FixedPill,
) -> Result<(), Box<dyn Error + Send>> {
let time = std::time::Instant::now();

let order_hashes = block.get_ordered_hashes();
let order_hashes_len = order_hashes.len();
let exemption = { self.exemption_hash.read().contains(&hash) };
let sync_tx_hashes = block.get_propose_hashes();

Expand All @@ -128,6 +132,11 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
self.adapter
.save_wal_transactions(Context::new(), Hash::digest(hash.clone()), inner)
.await?;
log::info!(
"[consensus-engine]: check block cost {:?} order_hashes_len {:?}",
time.elapsed(),
order_hashes_len
);
Ok(())
}

Expand Down Expand Up @@ -157,6 +166,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
propose_ratio: current_consensus_status.propose_ratio,
prevote_ratio: current_consensus_status.prevote_ratio,
precommit_ratio: current_consensus_status.precommit_ratio,
brake_ratio: current_consensus_status.brake_ratio,
}),
authority_list: covert_to_overlord_authority(&current_consensus_status.validators),
};
Expand Down Expand Up @@ -232,6 +242,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
propose_ratio: current_consensus_status.propose_ratio,
prevote_ratio: current_consensus_status.prevote_ratio,
precommit_ratio: current_consensus_status.precommit_ratio,
brake_ratio: current_consensus_status.brake_ratio,
}),
authority_list: covert_to_overlord_authority(&current_consensus_status.validators),
};
Expand All @@ -255,6 +266,12 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
let bytes = av.rlp_bytes();
(END_GOSSIP_AGGREGATED_VOTE, bytes)
}

OverlordMsg::SignedChoke(sc) => {
let bytes = sc.rlp_bytes();
(END_GOSSIP_SIGNED_CHOKE, bytes)
}

_ => unreachable!(),
};

Expand Down
3 changes: 3 additions & 0 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum MsgType {

#[display(fmt = "Rpc Pull Transactions")]
RpcPullTxs,

#[display(fmt = "Signed Choke")]
SignedChoke,
}

#[derive(Clone, Debug, Display)]
Expand Down
31 changes: 30 additions & 1 deletion core/consensus/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bincode::serialize;
use log::debug;
use overlord::types::{AggregatedVote, SignedProposal, SignedVote};
use overlord::types::{AggregatedVote, SignedChoke, SignedProposal, SignedVote};
use overlord::Codec;
use rlp::Encodable;
use serde::{Deserialize, Serialize};
Expand All @@ -18,6 +18,7 @@ use crate::fixed_types::{FixedBlock, FixedHeight, FixedSignedTxs, PullTxsRequest
pub const END_GOSSIP_SIGNED_PROPOSAL: &str = "/gossip/consensus/signed_proposal";
pub const END_GOSSIP_SIGNED_VOTE: &str = "/gossip/consensus/signed_vote";
pub const END_GOSSIP_AGGREGATED_VOTE: &str = "/gossip/consensus/qc";
pub const END_GOSSIP_SIGNED_CHOKE: &str = "/gossip/consensus/signed_choke";
pub const RPC_SYNC_PULL_BLOCK: &str = "/rpc_call/consensus/sync_pull_block";
pub const RPC_RESP_SYNC_PULL_BLOCK: &str = "/rpc_resp/consensus/sync_pull_block";
pub const RPC_SYNC_PULL_TXS: &str = "/rpc_call/consensus/sync_pull_txs";
Expand Down Expand Up @@ -60,6 +61,15 @@ impl From<FixedHeight> for RichHeight {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct Choke(pub Vec<u8>);

impl From<SignedChoke> for Choke {
fn from(signed_choke: SignedChoke) -> Self {
Choke(signed_choke.rlp_bytes())
}
}

pub struct ProposalMessageHandler<C> {
consensus: Arc<C>,
}
Expand Down Expand Up @@ -117,6 +127,25 @@ impl<C: Consensus + 'static> MessageHandler for QCMessageHandler<C> {
}
}

pub struct ChokeMessageHandler<C> {
consensus: Arc<C>,
}

impl<C: Consensus + 'static> ChokeMessageHandler<C> {
pub fn new(consensus: Arc<C>) -> Self {
Self { consensus }
}
}

#[async_trait]
impl<C: Consensus + 'static> MessageHandler for ChokeMessageHandler<C> {
type Message = Choke;

async fn process(&self, ctx: Context, msg: Self::Message) -> ProtocolResult<()> {
self.consensus.set_choke(ctx, msg.0).await
}
}

pub struct RemoteHeightMessageHandler<Sy> {
synchronization: Arc<Sy>,
}
Expand Down
7 changes: 4 additions & 3 deletions core/consensus/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl StatusAgent {
pub struct CurrentConsensusStatus {
pub cycles_price: u64,
pub cycles_limit: u64,
pub height: u64,
pub exec_height: u64,
pub height: u64,
pub exec_height: u64,
pub prev_hash: Hash,
pub latest_state_root: MerkleRoot,
pub latest_state_root: MerkleRoot,
pub logs_bloom: Vec<Bloom>,
pub confirm_root: Vec<MerkleRoot>,
pub state_root: Vec<MerkleRoot>,
Expand All @@ -106,6 +106,7 @@ pub struct CurrentConsensusStatus {
pub propose_ratio: u64,
pub prevote_ratio: u64,
pub precommit_ratio: u64,
pub brake_ratio: u64,
}

impl CurrentConsensusStatus {
Expand Down
Loading

0 comments on commit 78d4198

Please sign in to comment.