Skip to content

Commit

Permalink
Make EntryStreamStage optional
Browse files Browse the repository at this point in the history
  • Loading branch information
CriesofCarrots authored and solana-grimes committed Feb 11, 2019
1 parent f977327 commit d41dec9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
20 changes: 8 additions & 12 deletions src/entry_stream_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ impl EntryStreamStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_entry_receiver: EntryReceiver,
entry_stream: Option<&String>,
entry_stream_socket: String,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, EntryReceiver) {
let (entry_stream_sender, entry_stream_receiver) = channel();
let mut entry_stream = entry_stream
.cloned()
.map(|socket| EntryStream::new(socket, leader_scheduler));
let mut entry_stream = EntryStream::new(entry_stream_socket, leader_scheduler);
let t_entry_stream = Builder::new()
.name("solana-entry-stream".to_string())
.spawn(move || loop {
Expand All @@ -42,7 +40,7 @@ impl EntryStreamStage {
if let Err(e) = Self::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
entry_stream.as_mut(),
&mut entry_stream,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Expand All @@ -57,15 +55,13 @@ impl EntryStreamStage {
fn process_entries(
ledger_entry_receiver: &EntryReceiver,
entry_stream_sender: &EntrySender,
entry_stream: Option<&mut EntryStream>,
entry_stream: &mut EntryStream,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = ledger_entry_receiver.recv_timeout(timeout)?;
if let Some(stream) = entry_stream {
stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, stream.socket);
});
}
entry_stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, entry_stream.socket);
});
entry_stream_sender.send(entries)?;
Ok(())
}
Expand Down Expand Up @@ -117,7 +113,7 @@ mod test {
EntryStreamStage::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
Some(&mut entry_stream),
&mut entry_stream,
)
.unwrap();
assert_eq!(entry_stream.socket.len(), 5);
Expand Down
28 changes: 18 additions & 10 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Tvu {
fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
entry_stream_stage: EntryStreamStage,
entry_stream_stage: Option<EntryStreamStage>,
storage_stage: StorageStage,
exit: Arc<AtomicBool>,
last_entry_id: Arc<RwLock<Hash>>,
Expand Down Expand Up @@ -118,7 +118,7 @@ impl Tvu {

let l_last_entry_id = Arc::new(RwLock::new(last_entry_id));

let (replay_stage, ledger_entry_receiver) = ReplayStage::new(
let (replay_stage, mut previous_receiver) = ReplayStage::new(
keypair.pubkey(),
voting_keypair,
blocktree.clone(),
Expand All @@ -132,16 +132,22 @@ impl Tvu {
ledger_signal_receiver,
);

let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
ledger_entry_receiver,
entry_stream,
bank.leader_scheduler.clone(),
exit.clone(),
);
let entry_stream_stage = if entry_stream.is_some() {
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
previous_receiver,
entry_stream.unwrap().to_string(),
bank.leader_scheduler.clone(),
exit.clone(),
);
previous_receiver = entry_stream_receiver;
Some(entry_stream_stage)
} else {
None
};

let storage_stage = StorageStage::new(
storage_state,
entry_stream_receiver,
previous_receiver,
Some(blocktree),
&keypair,
&exit.clone(),
Expand Down Expand Up @@ -197,7 +203,9 @@ impl Service for Tvu {
self.retransmit_stage.join()?;
self.fetch_stage.join()?;
self.storage_stage.join()?;
self.entry_stream_stage.join()?;
if self.entry_stream_stage.is_some() {
self.entry_stream_stage.unwrap().join()?;
}
self.replay_stage.join()?;
Ok(())
}
Expand Down

0 comments on commit d41dec9

Please sign in to comment.