-
Notifications
You must be signed in to change notification settings - Fork 950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use bounded channels in transport #987
Use bounded channels in transport #987
Conversation
core/src/transport/memory.rs
Outdated
} | ||
|
||
static CHAN_BUF_SIZE: usize = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be a constant. Inlining the value in the source code is enough. Also, 1000 is not a great value.
core/src/transport/memory.rs
Outdated
@@ -68,7 +99,7 @@ impl Transport for MemoryTransport { | |||
|
|||
let actual_addr = Protocol::Memory(port.get()).into(); | |||
|
|||
let (tx, rx) = mpsc::unbounded(); | |||
let (tx, rx) = mpsc::channel(CHAN_BUF_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 2
is a good value. It's the number of maximum dialing connections that haven't been processed by the listener yet.
let (tx, rx) = mpsc::channel(CHAN_BUF_SIZE); | |
let (tx, rx) = mpsc::channel(2); |
core/src/transport/memory.rs
Outdated
} | ||
a | ||
if let Some(sender) = hub.get(&port) { | ||
let (a_tx, a_rx) = mpsc::channel(CHAN_BUF_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say a value of 4096
for these two channels is good. It's the number of bytes that are pending.
core/src/transport/memory.rs
Outdated
_ => (), | ||
} | ||
} | ||
match self.sender.poll_complete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match self.sender.poll_complete() { | |
match self.sender.close() { |
closed
automatically implies poll_complete
.
core/src/transport/memory.rs
Outdated
match self.sender.poll_complete() { | ||
Err(_) => Err(MemoryTransportError::Unreachable), | ||
Ok(Async::NotReady) => Ok(Async::NotReady), | ||
Ok(Async::Ready(_)) => Ok(Async::Ready(self.channel_to_return.take().unwrap())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally forbid unwrap()
outside of tests in favour of a call to expect()
that contains a message or a proof.
Ok(Async::Ready(_)) => Ok(Async::Ready(self.channel_to_return.take().unwrap())), | |
Ok(Async::Ready(_)) => Ok(Async::Ready(self.channel_to_return.take() | |
.expect("Future has been polled again after it is finished"))), |
/// Transport that supports `/memory/N` multiaddresses. | ||
#[derive(Debug, Copy, Clone, Default)] | ||
pub struct MemoryTransport; | ||
|
||
pub struct DialFuture { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a small doc-comment. Nothing big, just something.
pub struct DialFuture { | |
/// Connection to a `MemoryTransport` currently being opened. | |
pub struct DialFuture { |
Waiting for #992 to get merged |
Partially addresses issues #973.
transport::memory
Replace unbounded channels innodes::handled_node_tasks
To replace unbounded channels in
handled_node_tasks
would have implications that warrant further consideration, recommend to revisit at a later time.