Skip to content

Commit

Permalink
removed drop, started task before sending commands
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal committed Sep 20, 2023
1 parent 3dffe6a commit 844fa69
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
6 changes: 3 additions & 3 deletions src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ impl Client {
for chunk in records.chunks(self.put_batch_size) {
// create oneshot for each chunk, through which will success counts be sent,
// only for the records in that chunk
let (chunk_result_tx, chunk_result_rx) = oneshot::channel::<DHTPutSuccess>();
let (response_sender, response_receiver) = oneshot::channel::<DHTPutSuccess>();
if self
.cmd_sender
.send(Command::PutKadRecordBatch {
records: chunk.into(),
quorum,
sender: chunk_result_tx,
sender: response_sender,
})
.await
.context("Command receiver should not be dropped.")
Expand All @@ -171,7 +171,7 @@ impl Client {
// waiting in this manner introduces a back pressure from overwhelming the network
// with too many possible PUT request coming in from the whole batch
// this is the reason why input parameter vector of records is split into chunks
if let Ok(DHTPutSuccess::Batch(num)) = chunk_result_rx.await {
if let Ok(DHTPutSuccess::Batch(num)) = response_receiver.await {
num_success += num;
}
}
Expand Down
29 changes: 15 additions & 14 deletions src/network/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,20 @@ impl EventLoop {
// create channels to track individual PUT results needed for success count
let (put_result_tx, mut put_result_rx) =
mpsc::channel::<DHTPutSuccess>(records.len());

// spawn new task that waits and count all successful put queries from this batch,
// but don't block event_loop
tokio::spawn(async move {
let mut num_success: usize = 0;
// increment count only while receiving single successful results
while let Some(DHTPutSuccess::Single) = put_result_rx.recv().await {
num_success += 1;
}
// send back counted successful puts
// signal back that this chunk of records is done
_ = chunk_success_sender.send(DHTPutSuccess::Batch(num_success));
});

// go record by record and dispatch put requests through KAD
for record in records.as_ref() {
let query_id = self
Expand All @@ -530,20 +544,7 @@ impl EventLoop {
// drop tx manually,
// ensure that only senders in spawned threads are still in use
// IMPORTANT: omitting this will make recv call sleep forever
drop(put_result_tx);

// wait here and count all successful put queries from this batch,
// but don't block event_loop
tokio::spawn(async move {
let mut num_success: usize = 0;
// increment count only while receiving single successful results
while let Some(DHTPutSuccess::Single) = put_result_rx.recv().await {
num_success += 1;
}
// send back counted successful puts
// signal back that this chunk of records is done
_ = chunk_success_sender.send(DHTPutSuccess::Batch(num_success));
});
// drop(put_result_tx);
},
Command::ReduceKademliaMapSize => {
self.swarm
Expand Down

0 comments on commit 844fa69

Please sign in to comment.