Skip to content

Commit

Permalink
Move vote into ReplicateStage after process_entries
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyera Eulberg committed Sep 10, 2018
1 parent a89b611 commit 0da8b1e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 17 deletions.
29 changes: 14 additions & 15 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ use result::{Error, Result};
use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{responder, BlobReceiver};
use vote_stage::VoteStage;
use streamer::{responder, BlobReceiver, BlobSender};
use vote_stage::send_validator_vote;

pub struct ReplicateStage {
thread_hdls: Vec<JoinHandle<()>>,
Expand All @@ -32,6 +31,8 @@ impl ReplicateStage {
blob_recycler: &BlobRecycler,
window_receiver: &BlobReceiver,
ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>,
vote_blob_sender: &BlobSender,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
Expand All @@ -43,6 +44,12 @@ impl ReplicateStage {

let res = bank.process_entries(entries.clone());

if let Err(err) =
send_validator_vote(bank, keypair, crdt, blob_recycler, vote_blob_sender)
{
info!("Vote failed: {:?}", err);
}

for blob in blobs {
blob_recycler.recycle(blob, "replicate_requests");
}
Expand Down Expand Up @@ -72,7 +79,6 @@ impl ReplicateStage {
blob_recycler: BlobRecycler,
window_receiver: BlobReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
Expand All @@ -83,16 +89,8 @@ impl ReplicateStage {
vote_blob_receiver,
);

let vote_stage = VoteStage::new(
Arc::new(keypair),
bank.clone(),
crdt.clone(),
blob_recycler.clone(),
vote_blob_sender,
exit,
);

let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let keypair = Arc::new(keypair);

let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
Expand All @@ -103,6 +101,8 @@ impl ReplicateStage {
&blob_recycler,
&window_receiver,
ledger_writer.as_mut(),
&keypair,
&vote_blob_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Expand All @@ -113,8 +113,7 @@ impl ReplicateStage {
})
.unwrap();

let mut thread_hdls = vec![t_responder, t_replicate];
thread_hdls.extend(vote_stage.thread_hdls());
let thread_hdls = vec![t_responder, t_replicate];

ReplicateStage { thread_hdls }
}
Expand Down
1 change: 0 additions & 1 deletion src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl Tvu {
blob_recycler,
blob_window_receiver,
ledger_path,
exit,
);

Tvu {
Expand Down
2 changes: 1 addition & 1 deletion src/vote_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub fn send_leader_vote(
Ok(())
}

fn send_validator_vote(
pub fn send_validator_vote(
bank: &Arc<Bank>,
keypair: &Arc<Keypair>,
crdt: &Arc<RwLock<Crdt>>,
Expand Down

0 comments on commit 0da8b1e

Please sign in to comment.