diff --git a/Cargo.lock b/Cargo.lock index 6e1842797a3..9bb7e186c91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2573,7 +2573,7 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.4" +version = "0.44.5" dependencies = [ "arrayvec", "async-std", @@ -2594,6 +2594,7 @@ dependencies = [ "libp2p-yamux", "log", "quick-protobuf", + "quick-protobuf-codec", "quickcheck-ext", "rand 0.8.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 99df44f569c..e7b567636ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.3" } -libp2p-kad = { version = "0.44.4", path = "protocols/kad" } +libp2p-kad = { version = "0.44.5", path = "protocols/kad" } libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index a9cde76dd76..5106b82f364 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.44.5 - unreleased +- Migrate to `quick-protobuf-codec` crate for codec logic. + See [PR 4501]. + +[PR 4501]: https://github.com/libp2p/rust-libp2p/pull/4501 + ## 0.44.4 - Implement common traits on `RoutingUpdate`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index fa937033908..9c7ea0a1476 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = { workspace = true } description = "Kademlia protocol for libp2p" -version = "0.44.4" +version = "0.44.5" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -21,6 +21,7 @@ log = "0.4" libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } quick-protobuf = "0.8" +quick-protobuf-codec = { workspace = true } libp2p-identity = { workspace = true } rand = "0.8" sha2 = "0.10.7" diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index d960e6508d1..24b24789091 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -28,19 +28,17 @@ use crate::proto; use crate::record_priv::{self, Record}; -use asynchronous_codec::Framed; +use asynchronous_codec::{Decoder, Encoder, Framed}; use bytes::BytesMut; -use codec::UviBytes; use futures::prelude::*; use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; -use quick_protobuf::{BytesReader, Writer}; +use std::marker::PhantomData; use std::{convert::TryFrom, time::Duration}; use std::{io, iter}; -use unsigned_varint::codec; /// The protocol name used for negotiating with multistream-select. pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0"); @@ -179,6 +177,42 @@ impl UpgradeInfo for KademliaProtocolConfig { } } +/// Codec for Kademlia inbound and outbound message framing. +pub struct Codec { + codec: quick_protobuf_codec::Codec, + __phantom: PhantomData<(A, B)>, +} +impl Codec { + fn new(max_packet_size: usize) -> Self { + Codec { + codec: quick_protobuf_codec::Codec::new(max_packet_size), + __phantom: PhantomData, + } + } +} + +impl, B> Encoder for Codec { + type Error = io::Error; + type Item = A; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + Ok(self.codec.encode(item.into(), dst)?) + } +} +impl> Decoder for Codec { + type Error = io::Error; + type Item = B; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + self.codec.decode(src)?.map(B::try_from).transpose() + } +} + +/// Sink of responses and stream of requests. +pub(crate) type KadInStreamSink = Framed>; +/// Sink of requests and stream of responses. +pub(crate) type KadOutStreamSink = Framed>; + impl InboundUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, @@ -188,32 +222,9 @@ where type Error = io::Error; fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { - use quick_protobuf::{MessageRead, MessageWrite}; - - let mut codec = UviBytes::default(); - codec.set_max_len(self.max_packet_size); - - future::ok( - Framed::new(incoming, codec) - .err_into() - .with::<_, _, fn(_) -> _, _>(|response| { - let proto_struct = resp_msg_to_proto(response); - let mut buf = Vec::with_capacity(proto_struct.get_size()); - let mut writer = Writer::new(&mut buf); - proto_struct - .write_message(&mut writer) - .expect("Encoding to succeed"); - future::ready(Ok(io::Cursor::new(buf))) - }) - .and_then::<_, fn(_) -> _>(|bytes| { - let mut reader = BytesReader::from_bytes(&bytes); - let request = match proto::Message::from_reader(&mut reader, &bytes) { - Ok(r) => r, - Err(err) => return future::ready(Err(err.into())), - }; - future::ready(proto_to_req_msg(request)) - }), - ) + let codec = Codec::new(self.max_packet_size); + + future::ok(Framed::new(incoming, codec)) } } @@ -226,51 +237,12 @@ where type Error = io::Error; fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { - use quick_protobuf::{MessageRead, MessageWrite}; - - let mut codec = UviBytes::default(); - codec.set_max_len(self.max_packet_size); - - future::ok( - Framed::new(incoming, codec) - .err_into() - .with::<_, _, fn(_) -> _, _>(|request| { - let proto_struct = req_msg_to_proto(request); - let mut buf = Vec::with_capacity(proto_struct.get_size()); - let mut writer = Writer::new(&mut buf); - proto_struct - .write_message(&mut writer) - .expect("Encoding to succeed"); - future::ready(Ok(io::Cursor::new(buf))) - }) - .and_then::<_, fn(_) -> _>(|bytes| { - let mut reader = BytesReader::from_bytes(&bytes); - let response = match proto::Message::from_reader(&mut reader, &bytes) { - Ok(r) => r, - Err(err) => return future::ready(Err(err.into())), - }; - future::ready(proto_to_resp_msg(response)) - }), - ) + let codec = Codec::new(self.max_packet_size); + + future::ok(Framed::new(incoming, codec)) } } -/// Sink of responses and stream of requests. -pub(crate) type KadInStreamSink = KadStreamSink; -/// Sink of requests and stream of responses. -pub(crate) type KadOutStreamSink = KadStreamSink; -pub(crate) type KadStreamSink = stream::AndThen< - sink::With< - stream::ErrInto>>>, io::Error>, - io::Cursor>, - A, - future::Ready>, io::Error>>, - fn(A) -> future::Ready>, io::Error>>, - >, - future::Ready>, - fn(BytesMut) -> future::Ready>, ->; - /// Request that we can send to a peer or that we received from a peer. #[derive(Debug, Clone, PartialEq, Eq)] pub enum KadRequestMsg { @@ -346,6 +318,31 @@ pub enum KadResponseMsg { }, } +impl From for proto::Message { + fn from(kad_msg: KadRequestMsg) -> Self { + req_msg_to_proto(kad_msg) + } +} +impl From for proto::Message { + fn from(kad_msg: KadResponseMsg) -> Self { + resp_msg_to_proto(kad_msg) + } +} +impl TryFrom for KadRequestMsg { + type Error = io::Error; + + fn try_from(message: proto::Message) -> Result { + proto_to_req_msg(message) + } +} +impl TryFrom for KadResponseMsg { + type Error = io::Error; + + fn try_from(message: proto::Message) -> Result { + proto_to_resp_msg(message) + } +} + /// Converts a `KadRequestMsg` into the corresponding protobuf message for sending. fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message { match kad_msg {