Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

feat: update overlord with brake engine #159

Merged
merged 20 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/logger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ repository = "https://github.com/nervosnetwork/muta"
log = "0.4"
log4rs = "0.8"
json = "0.12"
backtrace = "0.3"
29 changes: 29 additions & 0 deletions common/logger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::{panic, thread};

use backtrace::Backtrace;
use json::JsonValue;
use log::LevelFilter;
use log4rs::append::console::ConsoleAppender;
Expand All @@ -21,6 +23,8 @@ pub fn init<S: ::std::hash::BuildHasher>(
log_path: PathBuf,
modules_level: HashMap<String, String, S>,
) {
setup_panic_logger();

let console = ConsoleAppender::builder()
.encoder(Box::new(PatternEncoder::new(
if console_show_file_and_line {
Expand Down Expand Up @@ -98,3 +102,28 @@ pub fn metrics(name: &str, mut content: JsonValue) {
content
});
}

fn setup_panic_logger() {
let panic_logger = |info: &panic::PanicInfo| {
let backtrace = Backtrace::new();
let thread = thread::current();
let name = thread.name().unwrap_or("unnamed");
let location = info.location().unwrap(); // The current implementation always returns Some
let msg = match info.payload().downcast_ref::<&'static str>() {
Some(s) => *s,
None => match info.payload().downcast_ref::<String>() {
Some(s) => &*s,
None => "Box<Any>",
},
};
log::error!(
target: "panic", "thread '{}' panicked at '{}': {}:{} {:?}",
name,
msg,
location.file(),
location.line(),
backtrace,
);
};
panic::set_hook(Box::new(panic_logger));
}
2 changes: 1 addition & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures-timer = "3.0"
hex = "0.4"
log = "0.4"
moodyblues-sdk = "0.3"
overlord = "0.2.0-alpha.3"
overlord = "0.2.0-alpha.4"
parking_lot = "0.10"
rlp = "0.4"
serde = {version = "1.0", features = ["derive"]}
Expand Down
2 changes: 2 additions & 0 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ where
propose_ratio: u64,
prevote_ratio: u64,
precommit_ratio: u64,
brake_ratio: u64,
validators: Vec<Validator>,
) -> ProtocolResult<()> {
self.overlord_handler
Expand All @@ -259,6 +260,7 @@ where
propose_ratio,
prevote_ratio,
precommit_ratio,
brake_ratio,
validators,
)),
)
Expand Down
16 changes: 15 additions & 1 deletion core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use creep::Context;
use futures::lock::Mutex;
use overlord::types::{AggregatedVote, Node, OverlordMsg, SignedProposal, SignedVote, Status};
use overlord::types::{
AggregatedVote, Node, OverlordMsg, SignedChoke, SignedProposal, SignedVote, Status,
};
use overlord::{DurationConfig, Overlord, OverlordHandler};

use common_crypto::{BlsCommonReference, BlsPrivateKey, BlsPublicKey};
Expand Down Expand Up @@ -57,6 +59,15 @@ impl<Adapter: ConsensusAdapter + 'static> Consensus for OverlordConsensus<Adapte
.map_err(|e| ConsensusError::OverlordErr(Box::new(e)))?;
Ok(())
}

async fn set_choke(&self, ctx: Context, choke: Vec<u8>) -> ProtocolResult<()> {
let signed_choke: SignedChoke =
rlp::decode(&choke).map_err(|_| ConsensusError::DecodeErr(MsgType::SignedChoke))?;
self.handler
.send_msg(ctx, OverlordMsg::SignedChoke(signed_choke))
.map_err(|e| ConsensusError::OverlordErr(Box::new(e)))?;
Ok(())
}
}

impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
Expand Down Expand Up @@ -96,6 +107,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
status.propose_ratio,
status.prevote_ratio,
status.precommit_ratio,
status.brake_ratio,
status.validators,
)),
)
Expand Down Expand Up @@ -133,6 +145,7 @@ pub fn gen_overlord_status(
propose_ratio: u64,
prevote_ratio: u64,
precommit_ratio: u64,
brake_ratio: u64,
validators: Vec<Validator>,
) -> Status {
let mut authority_list = validators
Expand All @@ -152,6 +165,7 @@ pub fn gen_overlord_status(
propose_ratio,
prevote_ratio,
precommit_ratio,
brake_ratio,
}),
authority_list,
}
Expand Down
19 changes: 18 additions & 1 deletion core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use protocol::{Bytes, ProtocolError, ProtocolResult};

use crate::fixed_types::FixedPill;
use crate::message::{
END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_PROPOSAL, END_GOSSIP_SIGNED_VOTE,
END_GOSSIP_AGGREGATED_VOTE, END_GOSSIP_SIGNED_CHOKE, END_GOSSIP_SIGNED_PROPOSAL,
END_GOSSIP_SIGNED_VOTE,
};
use crate::status::StatusAgent;
use crate::{ConsensusError, StatusCacheField};
Expand Down Expand Up @@ -104,7 +105,10 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
hash: Bytes,
block: FixedPill,
) -> Result<(), Box<dyn Error + Send>> {
let time = std::time::Instant::now();

let order_hashes = block.get_ordered_hashes();
let order_hashes_len = order_hashes.len();
let exemption = { self.exemption_hash.read().contains(&hash) };
let sync_tx_hashes = block.get_propose_hashes();

Expand All @@ -128,6 +132,11 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
self.adapter
.save_wal_transactions(Context::new(), Hash::digest(hash.clone()), inner)
.await?;
log::info!(
"[consensus-engine]: check block cost {:?} order_hashes_len {:?}",
time.elapsed(),
order_hashes_len
);
Ok(())
}

Expand Down Expand Up @@ -157,6 +166,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
propose_ratio: current_consensus_status.propose_ratio,
prevote_ratio: current_consensus_status.prevote_ratio,
precommit_ratio: current_consensus_status.precommit_ratio,
brake_ratio: current_consensus_status.brake_ratio,
}),
authority_list: covert_to_overlord_authority(&current_consensus_status.validators),
};
Expand Down Expand Up @@ -232,6 +242,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
propose_ratio: current_consensus_status.propose_ratio,
prevote_ratio: current_consensus_status.prevote_ratio,
precommit_ratio: current_consensus_status.precommit_ratio,
brake_ratio: current_consensus_status.brake_ratio,
}),
authority_list: covert_to_overlord_authority(&current_consensus_status.validators),
};
Expand All @@ -255,6 +266,12 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
let bytes = av.rlp_bytes();
(END_GOSSIP_AGGREGATED_VOTE, bytes)
}

OverlordMsg::SignedChoke(sc) => {
let bytes = sc.rlp_bytes();
(END_GOSSIP_SIGNED_CHOKE, bytes)
}

_ => unreachable!(),
};

Expand Down
3 changes: 3 additions & 0 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum MsgType {

#[display(fmt = "Rpc Pull Transactions")]
RpcPullTxs,

#[display(fmt = "Signed Choke")]
SignedChoke,
}

#[derive(Clone, Debug, Display)]
Expand Down
31 changes: 30 additions & 1 deletion core/consensus/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bincode::serialize;
use log::debug;
use overlord::types::{AggregatedVote, SignedProposal, SignedVote};
use overlord::types::{AggregatedVote, SignedChoke, SignedProposal, SignedVote};
use overlord::Codec;
use rlp::Encodable;
use serde::{Deserialize, Serialize};
Expand All @@ -18,6 +18,7 @@ use crate::fixed_types::{FixedBlock, FixedHeight, FixedSignedTxs, PullTxsRequest
pub const END_GOSSIP_SIGNED_PROPOSAL: &str = "/gossip/consensus/signed_proposal";
pub const END_GOSSIP_SIGNED_VOTE: &str = "/gossip/consensus/signed_vote";
pub const END_GOSSIP_AGGREGATED_VOTE: &str = "/gossip/consensus/qc";
pub const END_GOSSIP_SIGNED_CHOKE: &str = "/gossip/consensus/signed_choke";
pub const RPC_SYNC_PULL_BLOCK: &str = "/rpc_call/consensus/sync_pull_block";
pub const RPC_RESP_SYNC_PULL_BLOCK: &str = "/rpc_resp/consensus/sync_pull_block";
pub const RPC_SYNC_PULL_TXS: &str = "/rpc_call/consensus/sync_pull_txs";
Expand Down Expand Up @@ -60,6 +61,15 @@ impl From<FixedHeight> for RichHeight {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct Choke(pub Vec<u8>);

impl From<SignedChoke> for Choke {
fn from(signed_choke: SignedChoke) -> Self {
Choke(signed_choke.rlp_bytes())
}
}

pub struct ProposalMessageHandler<C> {
consensus: Arc<C>,
}
Expand Down Expand Up @@ -117,6 +127,25 @@ impl<C: Consensus + 'static> MessageHandler for QCMessageHandler<C> {
}
}

pub struct ChokeMessageHandler<C> {
consensus: Arc<C>,
}

impl<C: Consensus + 'static> ChokeMessageHandler<C> {
pub fn new(consensus: Arc<C>) -> Self {
Self { consensus }
}
}

#[async_trait]
impl<C: Consensus + 'static> MessageHandler for ChokeMessageHandler<C> {
type Message = Choke;

async fn process(&self, ctx: Context, msg: Self::Message) -> ProtocolResult<()> {
self.consensus.set_choke(ctx, msg.0).await
}
}

pub struct RemoteHeightMessageHandler<Sy> {
synchronization: Arc<Sy>,
}
Expand Down
7 changes: 4 additions & 3 deletions core/consensus/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl StatusAgent {
pub struct CurrentConsensusStatus {
pub cycles_price: u64,
pub cycles_limit: u64,
pub height: u64,
pub exec_height: u64,
pub height: u64,
pub exec_height: u64,
pub prev_hash: Hash,
pub latest_state_root: MerkleRoot,
pub latest_state_root: MerkleRoot,
pub logs_bloom: Vec<Bloom>,
pub confirm_root: Vec<MerkleRoot>,
pub state_root: Vec<MerkleRoot>,
Expand All @@ -106,6 +106,7 @@ pub struct CurrentConsensusStatus {
pub propose_ratio: u64,
pub prevote_ratio: u64,
pub precommit_ratio: u64,
pub brake_ratio: u64,
}

impl CurrentConsensusStatus {
Expand Down
Loading