Skip to content

Commit

Permalink
Merge branch 'main' into matt/rlpx-multiplexing
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Nov 18, 2023
2 parents ce0dd32 + 6e6e873 commit 7742793
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
52 changes: 45 additions & 7 deletions crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,8 @@ pub enum SharedCapability {
},
/// Any other unknown capability.
UnknownCapability {
/// Name of the capability.
name: Cow<'static, str>,
/// (Highest) negotiated version of the eth capability.
version: u8,
/// Shared capability.
cap: Capability,
/// The message ID offset for this capability.
///
/// This represents the message ID offset for the first message of the eth capability in
Expand All @@ -259,7 +257,10 @@ impl SharedCapability {

match name {
"eth" => Ok(Self::eth(EthVersion::try_from(version)?, offset)),
_ => Ok(Self::UnknownCapability { name: name.to_string().into(), version, offset }),
_ => Ok(Self::UnknownCapability {
cap: Capability::new(name.to_string(), version as usize),
offset,
}),
}
}

Expand All @@ -268,12 +269,20 @@ impl SharedCapability {
Self::Eth { version, offset }
}

/// Returns the capability.
pub fn capability(&self) -> Cow<'_, Capability> {
match self {
SharedCapability::Eth { version, .. } => Cow::Owned(Capability::eth(*version)),
SharedCapability::UnknownCapability { cap, .. } => Cow::Borrowed(cap),
}
}

/// Returns the name of the capability.
#[inline]
pub fn name(&self) -> &str {
match self {
SharedCapability::Eth { .. } => "eth",
SharedCapability::UnknownCapability { name, .. } => name,
SharedCapability::UnknownCapability { cap, .. } => cap.name.as_ref(),
}
}

Expand All @@ -287,7 +296,7 @@ impl SharedCapability {
pub fn version(&self) -> u8 {
match self {
SharedCapability::Eth { version, .. } => *version as u8,
SharedCapability::UnknownCapability { version, .. } => *version,
SharedCapability::UnknownCapability { cap, .. } => cap.version as u8,
}
}

Expand Down Expand Up @@ -348,9 +357,31 @@ impl SharedCapabilities {
}

/// Returns the negotiated eth version if it is shared.
#[inline]
pub fn eth_version(&self) -> Result<u8, P2PStreamError> {
self.eth().map(|cap| cap.version())
}

/// Returns true if the shared capabilities contain the given capability.
#[inline]
pub fn contains(&self, cap: &Capability) -> bool {
self.find(cap).is_some()
}

/// Returns the shared capability for the given capability.
#[inline]
pub fn find(&self, cap: &Capability) -> Option<&SharedCapability> {
self.0.iter().find(|c| c.version() == cap.version as u8 && c.name() == cap.name)
}

/// Returns the shared capability for the given capability or an error if it's not compatible.
#[inline]
pub fn ensure_matching_capability(
&self,
cap: &Capability,
) -> Result<&SharedCapability, UnsupportedCapabilityError> {
self.find(cap).ok_or_else(|| UnsupportedCapabilityError { capability: cap.clone() })
}
}

/// Determines the offsets for each shared capability between the input list of peer
Expand Down Expand Up @@ -452,6 +483,13 @@ pub enum SharedCapabilityError {
ReservedMessageIdOffset(u8),
}

/// An error thrown when capabilities mismatch.
#[derive(Debug, thiserror::Error)]
#[error("unsupported capability {capability}")]
pub struct UnsupportedCapabilityError {
capability: Capability,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 3 additions & 2 deletions crates/net/eth-wire/src/multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ impl<St, Primary> RlpxSatelliteStream<St, Primary> {}
impl<St, Primary> Stream for RlpxSatelliteStream<St, Primary>
where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
Primary: Stream + Unpin,
{
type Item = ();
type Item = <Primary as Stream>::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
Expand All @@ -169,7 +170,7 @@ where
impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
Primary: Sink<T, Error = io::Error>,
Primary: Sink<T, Error = io::Error> + Unpin,
{
type Error = <Primary as Sink<T>>::Error;

Expand Down

0 comments on commit 7742793

Please sign in to comment.