Skip to content

Commit

Permalink
code merge
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Sep 26, 2022
1 parent 75ada73 commit 875db1c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 15 deletions.
4 changes: 2 additions & 2 deletions base_layer/core/src/mempool/sync_protocol/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::{sync::mpsc, time::sleep};
use crate::{
base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle},
mempool::{
sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL},
Mempool,
MempoolServiceConfig,
},
Expand Down Expand Up @@ -102,7 +102,7 @@ impl ServiceInitializer for MempoolSyncInitializer {
}
log_mdc::extend(mdc.clone());
}
let base_node_events = base_node.get_block_event_stream();
let base_node_events = base_node.get_block_event_stream();

MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
.run()
Expand Down
14 changes: 5 additions & 9 deletions base_layer/core/src/mempool/sync_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use initializer::MempoolSyncInitializer;
use log::*;
use prost::Message;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityEventRx, ConnectivityRequester, ConnectivitySelection},
connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection},
framing,
framing::CanonicalFraming,
message::MessageExt,
Expand Down Expand Up @@ -115,19 +115,14 @@ const LOG_TARGET: &str = "c::mempool::sync_protocol";

pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1");

pub struct MempoolSyncStreams {
pub block_event_stream: BlockEventReceiver,
pub connectivity_events: ConnectivityEventRx,
}

pub struct MempoolSyncProtocol<TSubstream> {
config: MempoolServiceConfig,
protocol_notifier: ProtocolNotificationRx<TSubstream>,
mempool: Mempool,
num_synched: Arc<AtomicUsize>,
permits: Arc<Semaphore>,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver
block_event_stream: BlockEventReceiver,
}

impl<TSubstream> MempoolSyncProtocol<TSubstream>
Expand All @@ -138,15 +133,16 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static
protocol_notifier: ProtocolNotificationRx<TSubstream>,
mempool: Mempool,
connectivity: ConnectivityRequester,
block_event_stream: BlockEventReceiver
block_event_stream: BlockEventReceiver,
) -> Self {
Self {
config,
protocol_notifier,
mempool,
num_synched: Arc::new(AtomicUsize::new(0)),
permits: Arc::new(Semaphore::new(1)),
connectivity,block_event_stream
connectivity,
block_event_stream,
}
}

Expand Down
7 changes: 3 additions & 4 deletions base_layer/core/src/mempool/sync_protocol/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn setup(
Vec<Transaction>,
) {
let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1);
let (connectivity_events_tx, connectivity_events_rx) = broadcast::channel(10);
let (connectivity_events_tx, _) = broadcast::channel(10);
let (mempool, transactions) = new_mempool_with_transactions(num_txns).await;
let (connectivity, _) = create_connectivity_mock();
let (block_event_sender, _) = broadcast::channel(1);
Expand All @@ -97,12 +97,11 @@ async fn setup(
let protocol = MempoolSyncProtocol::new(
Default::default(),
protocol_notif_rx,
connectivity_events_rx,
mempool.clone(),
connectivity,
connectivity,block_receiver
);

task::spawn(protocol.run(block_receiver));
task::spawn(protocol.run());

(protocol_notif_tx, connectivity_events_tx, mempool, transactions)
}
Expand Down

0 comments on commit 875db1c

Please sign in to comment.