Skip to content

Commit

Permalink
[dag] update dag fetcher to construct request inside
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Jul 4, 2023
1 parent 6e5f945 commit 2d1ae23
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 29 deletions.
6 changes: 1 addition & 5 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,7 @@ impl DagDriver {
pub fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> {
let mut dag_writer = self.dag.write();
let round = node.metadata().round();
if dag_writer.all_exists(
node.parents()
.iter()
.map(|certificate| certificate.metadata().digest()),
) {
if dag_writer.all_exists(node.parents()) {
dag_writer.add_node(node)?;
if self.current_round == round {
let maybe_strong_links = dag_writer
Expand Down
49 changes: 34 additions & 15 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
dag::{
dag_network::DAGNetworkSender,
dag_store::Dag,
types::{CertifiedNode, DAGMessage, FetchRequest, FetchResponse, Node},
types::{CertifiedNode, DAGMessage, FetchResponse, Node, RemoteFetchRequest},
},
network::TConsensusMsg,
};
Expand All @@ -19,44 +19,51 @@ use tokio::sync::{
oneshot,
};

enum FetchCallback {
pub enum LocalFetchRequest {
Node(Node, oneshot::Sender<Node>),
CertifiedNode(CertifiedNode, oneshot::Sender<CertifiedNode>),
}

impl FetchCallback {
impl LocalFetchRequest {
pub fn responders(&self, validators: &[Author]) -> Vec<Author> {
match self {
FetchCallback::Node(node, _) => vec![*node.author()],
FetchCallback::CertifiedNode(node, _) => node.certificate().signers(validators),
LocalFetchRequest::Node(node, _) => vec![*node.author()],
LocalFetchRequest::CertifiedNode(node, _) => node.certificate().signers(validators),
}
}

pub fn notify(self) {
if match self {
FetchCallback::Node(node, sender) => sender.send(node).map_err(|_| ()),
FetchCallback::CertifiedNode(node, sender) => sender.send(node).map_err(|_| ()),
LocalFetchRequest::Node(node, sender) => sender.send(node).map_err(|_| ()),
LocalFetchRequest::CertifiedNode(node, sender) => sender.send(node).map_err(|_| ()),
}
.is_err()
{
error!("Failed to send node back");
}
}

pub fn node(&self) -> &Node {
match self {
LocalFetchRequest::Node(node, _) => node,
LocalFetchRequest::CertifiedNode(node, _) => node,
}
}
}

struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
request_rx: Receiver<(FetchRequest, FetchCallback)>,
request_rx: Receiver<LocalFetchRequest>,
}

impl DagFetcher {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
) -> (Self, Sender<(FetchRequest, FetchCallback)>) {
) -> (Self, Sender<LocalFetchRequest>) {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
(
Self {
Expand All @@ -70,17 +77,29 @@ impl DagFetcher {
}

pub async fn start(mut self) {
while let Some((request, callback)) = self.request_rx.recv().await {
let responders =
callback.responders(&self.epoch_state.verifier.get_ordered_account_addresses());
let network_request = DAGMessage::from(request.clone()).into_network_message();
while let Some(local_request) = self.request_rx.recv().await {
let responders = local_request
.responders(&self.epoch_state.verifier.get_ordered_account_addresses());
let remote_request = {
let dag_reader = self.dag.read();
if dag_reader.all_exists(local_request.node().parents()) {
local_request.notify();
continue;
}
RemoteFetchRequest::new(
local_request.node().metadata().clone(),
dag_reader.lowest_round(),
dag_reader.bitmask(),
)
};
let network_request = DAGMessage::from(remote_request.clone()).into_network_message();
if let Ok(response) = self
.network
.send_rpc_with_fallbacks(responders, network_request, Duration::from_secs(1))
.await
.and_then(DAGMessage::try_from)
.and_then(FetchResponse::try_from)
.and_then(|response| response.verify(&request, &self.epoch_state.verifier))
.and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier))
{
// TODO: support chunk response or fallback to state sync
let mut dag_writer = self.dag.write();
Expand All @@ -91,7 +110,7 @@ impl DagFetcher {
}
}
}
callback.notify();
local_request.notify();
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Dag {
.unwrap_or(&0)
}

fn highest_round(&self) -> Round {
pub fn highest_round(&self) -> Round {
*self
.nodes_by_round
.last_key_value()
Expand Down Expand Up @@ -79,8 +79,11 @@ impl Dag {
self.nodes_by_digest.contains_key(digest)
}

pub fn all_exists<'a>(&self, mut digests: impl Iterator<Item = &'a HashValue>) -> bool {
digests.all(|digest| self.nodes_by_digest.contains_key(digest))
pub fn all_exists(&self, nodes: &[NodeCertificate]) -> bool {
nodes.iter().all(|certificate| {
self.nodes_by_digest
.contains_key(certificate.metadata().digest())
})
}

pub fn get_node(&self, digest: &HashValue) -> Option<Arc<CertifiedNode>> {
Expand Down Expand Up @@ -115,4 +118,9 @@ impl Dag {
None
}
}

pub fn bitmask(&self) -> Vec<Vec<bool>> {
// TODO: extract local bitvec
todo!();
}
}
22 changes: 16 additions & 6 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,22 @@ impl BroadcastStatus for CertificateAckState {
/// the first round we care about in the DAG, `exists_bitmask` is a two dimensional bitmask represents
/// if a node exist at [start_round + index][validator_index].
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct FetchRequest {
pub struct RemoteFetchRequest {
target: NodeMetadata,
start_round: Round,
exists_bitmask: Vec<Vec<bool>>,
}

impl RemoteFetchRequest {
pub fn new(target: NodeMetadata, start_round: Round, exists_bitmask: Vec<Vec<bool>>) -> Self {
Self {
target,
start_round,
exists_bitmask,
}
}
}

/// Represents a response to FetchRequest, `certified_nodes` are indexed by [round][validator_index]
/// It should fill in gaps from the `exists_bitmask` according to the parents from the `target_digest` node.
#[derive(Serialize, Deserialize, Clone, Debug)]
Expand All @@ -407,7 +417,7 @@ impl FetchResponse {

pub fn verify(
self,
_request: &FetchRequest,
_request: &RemoteFetchRequest,
_validator_verifier: &ValidatorVerifier,
) -> anyhow::Result<Self> {
todo!("verification");
Expand All @@ -427,7 +437,7 @@ pub enum DAGMessage {
NodeDigestSignatureMsg(NodeDigestSignature),
NodeCertificateMsg(NodeCertificate),
CertifiedAckMsg(CertifiedAck),
FetchRequest(FetchRequest),
FetchRequest(RemoteFetchRequest),
FetchResponse(FetchResponse),

#[cfg(test)]
Expand Down Expand Up @@ -522,7 +532,7 @@ impl TryFrom<DAGMessage> for CertifiedAck {
}
}

impl TryFrom<DAGMessage> for FetchRequest {
impl TryFrom<DAGMessage> for RemoteFetchRequest {
type Error = anyhow::Error;

fn try_from(msg: DAGMessage) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -568,8 +578,8 @@ impl From<CertifiedAck> for DAGMessage {
}
}

impl From<FetchRequest> for DAGMessage {
fn from(req: FetchRequest) -> Self {
impl From<RemoteFetchRequest> for DAGMessage {
fn from(req: RemoteFetchRequest) -> Self {
Self::FetchRequest(req)
}
}
Expand Down

0 comments on commit 2d1ae23

Please sign in to comment.