Skip to content

Commit

Permalink
[mplex] Small enhancements. (#1785)
Browse files Browse the repository at this point in the history
* More granular execution of pending flushes.

Also replace fnv hashing with nohash-hasher.

* Don't forget the pending case.

* Simplify.

* Use AtomicU32 for connection IDs.

* Revert to random u64.
  • Loading branch information
romanb authored Oct 15, 2020
1 parent b48b93f commit f4a65e7
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 57 deletions.
3 changes: 2 additions & 1 deletion muxers/mplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ categories = ["network-programming", "asynchronous"]

[dependencies]
bytes = "0.5"
fnv = "1.0"
futures = "0.3.1"
futures_codec = "0.4"
libp2p-core = { version = "0.22.0", path = "../../core" }
log = "0.4"
nohash-hasher = "0.2"
parking_lot = "0.11"
rand = "0.7"
smallvec = "1.4"
unsigned-varint = { version = "0.5", features = ["futures-codec"] }

Expand Down
29 changes: 18 additions & 11 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_core::Endpoint;
use futures_codec::{Decoder, Encoder};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use futures_codec::{Decoder, Encoder};
use libp2p_core::Endpoint;
use std::{fmt, hash::{Hash, Hasher}, io, mem};
use unsigned_varint::{codec, encode};

// Maximum size for a packet: 1MB as per the spec.
Expand All @@ -46,7 +45,7 @@ pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024;
/// > we initiated the stream, so the local ID has the role `Endpoint::Dialer`.
/// > Conversely, when receiving a frame with a flag identifying the remote as a "sender",
/// > the corresponding local ID has the role `Endpoint::Listener`.
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct LocalStreamId {
num: u32,
role: Endpoint,
Expand All @@ -61,6 +60,14 @@ impl fmt::Display for LocalStreamId {
}
}

impl Hash for LocalStreamId {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u32(self.num);
}
}

impl nohash_hasher::IsEnabled for LocalStreamId {}

/// A unique identifier used by the remote node for a substream.
///
/// `RemoteStreamId`s are received with frames from the remote
Expand Down Expand Up @@ -161,7 +168,7 @@ impl Codec {

impl Decoder for Codec {
type Item = Frame<RemoteStreamId>;
type Error = IoError;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
Expand All @@ -182,7 +189,7 @@ impl Decoder for Codec {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}

self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize);
Expand Down Expand Up @@ -213,7 +220,7 @@ impl Decoder for Codec {
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
},
};

Expand All @@ -222,7 +229,7 @@ impl Decoder for Codec {
},

CodecDecodeState::Poisoned => {
return Err(IoError::new(IoErrorKind::InvalidData, "Mplex codec poisoned"));
return Err(io::Error::new(io::ErrorKind::InvalidData, "Mplex codec poisoned"));
}
}
}
Expand All @@ -231,7 +238,7 @@ impl Decoder for Codec {

impl Encoder for Codec {
type Item = Frame<LocalStreamId>;
type Error = IoError;
type Error = io::Error;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Expand Down Expand Up @@ -266,7 +273,7 @@ impl Encoder for Codec {
let data_len_bytes = encode::usize(data_len, &mut data_buf);

if data_len > MAX_FRAME_SIZE {
return Err(IoError::new(IoErrorKind::InvalidData, "data size exceed maximum"));
return Err(io::Error::new(io::ErrorKind::InvalidData, "data size exceed maximum"));
}

dst.reserve(header_bytes.len() + data_len_bytes.len() + data_len);
Expand Down
Loading

0 comments on commit f4a65e7

Please sign in to comment.