From 03dce934c006420425a638393031cc70ffc880b7 Mon Sep 17 00:00:00 2001 From: Ademola Date: Wed, 13 Sep 2023 22:22:40 +0100 Subject: [PATCH 1/5] feat(kad): migrate to quick-protobuf-codec crate --- protocols/kad/Cargo.toml | 1 + protocols/kad/src/protocol.rs | 146 +++++++++++++++++----------------- 2 files changed, 75 insertions(+), 72 deletions(-) diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index fa937033908..b76ad8ccf28 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -21,6 +21,7 @@ log = "0.4" libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } quick-protobuf = "0.8" +quick-protobuf-codec = { workspace = true } libp2p-identity = { workspace = true } rand = "0.8" sha2 = "0.10.7" diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index d960e6508d1..b05fc19cb6e 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -28,19 +28,17 @@ use crate::proto; use crate::record_priv::{self, Record}; -use asynchronous_codec::Framed; +use asynchronous_codec::{Decoder, Encoder, Framed}; use bytes::BytesMut; -use codec::UviBytes; use futures::prelude::*; use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; -use quick_protobuf::{BytesReader, Writer}; +use std::marker::PhantomData; use std::{convert::TryFrom, time::Duration}; use std::{io, iter}; -use unsigned_varint::codec; /// The protocol name used for negotiating with multistream-select. pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0"); @@ -179,6 +177,47 @@ impl UpgradeInfo for KademliaProtocolConfig { } } +/// Codec for Kademlia inbound and outbound message framing. +pub struct KadCodec { + codec: quick_protobuf_codec::Codec, + __phantom: PhantomData<(A, B)>, +} +impl KadCodec { + fn new(max_packet_size: usize) -> Self { + KadCodec { + codec: quick_protobuf_codec::Codec::new(max_packet_size), + __phantom: PhantomData, + } + } +} + +impl, B> Encoder for KadCodec { + type Error = io::Error; + type Item = A; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.codec + .encode(item.into(), dst) + .map_err(|err| err.into()) + } +} +impl> Decoder for KadCodec { + type Error = io::Error; + type Item = B; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + self.codec + .decode(src)? + .map(|msg| B::try_from(msg)) + .transpose() + } +} + +/// Sink of responses and stream of requests. +pub(crate) type KadInStreamSink = Framed>; +/// Sink of requests and stream of responses. +pub(crate) type KadOutStreamSink = Framed>; + impl InboundUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, @@ -188,32 +227,9 @@ where type Error = io::Error; fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { - use quick_protobuf::{MessageRead, MessageWrite}; - - let mut codec = UviBytes::default(); - codec.set_max_len(self.max_packet_size); - - future::ok( - Framed::new(incoming, codec) - .err_into() - .with::<_, _, fn(_) -> _, _>(|response| { - let proto_struct = resp_msg_to_proto(response); - let mut buf = Vec::with_capacity(proto_struct.get_size()); - let mut writer = Writer::new(&mut buf); - proto_struct - .write_message(&mut writer) - .expect("Encoding to succeed"); - future::ready(Ok(io::Cursor::new(buf))) - }) - .and_then::<_, fn(_) -> _>(|bytes| { - let mut reader = BytesReader::from_bytes(&bytes); - let request = match proto::Message::from_reader(&mut reader, &bytes) { - Ok(r) => r, - Err(err) => return future::ready(Err(err.into())), - }; - future::ready(proto_to_req_msg(request)) - }), - ) + let codec = KadCodec::new(self.max_packet_size); + + future::ok(Framed::new(incoming, codec)) } } @@ -226,51 +242,12 @@ where type Error = io::Error; fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { - use quick_protobuf::{MessageRead, MessageWrite}; - - let mut codec = UviBytes::default(); - codec.set_max_len(self.max_packet_size); - - future::ok( - Framed::new(incoming, codec) - .err_into() - .with::<_, _, fn(_) -> _, _>(|request| { - let proto_struct = req_msg_to_proto(request); - let mut buf = Vec::with_capacity(proto_struct.get_size()); - let mut writer = Writer::new(&mut buf); - proto_struct - .write_message(&mut writer) - .expect("Encoding to succeed"); - future::ready(Ok(io::Cursor::new(buf))) - }) - .and_then::<_, fn(_) -> _>(|bytes| { - let mut reader = BytesReader::from_bytes(&bytes); - let response = match proto::Message::from_reader(&mut reader, &bytes) { - Ok(r) => r, - Err(err) => return future::ready(Err(err.into())), - }; - future::ready(proto_to_resp_msg(response)) - }), - ) + let codec = KadCodec::new(self.max_packet_size); + + future::ok(Framed::new(incoming, codec)) } } -/// Sink of responses and stream of requests. -pub(crate) type KadInStreamSink = KadStreamSink; -/// Sink of requests and stream of responses. -pub(crate) type KadOutStreamSink = KadStreamSink; -pub(crate) type KadStreamSink = stream::AndThen< - sink::With< - stream::ErrInto>>>, io::Error>, - io::Cursor>, - A, - future::Ready>, io::Error>>, - fn(A) -> future::Ready>, io::Error>>, - >, - future::Ready>, - fn(BytesMut) -> future::Ready>, ->; - /// Request that we can send to a peer or that we received from a peer. #[derive(Debug, Clone, PartialEq, Eq)] pub enum KadRequestMsg { @@ -346,6 +323,31 @@ pub enum KadResponseMsg { }, } +impl From for proto::Message { + fn from(kad_msg: KadRequestMsg) -> Self { + req_msg_to_proto(kad_msg) + } +} +impl From for proto::Message { + fn from(kad_msg: KadResponseMsg) -> Self { + resp_msg_to_proto(kad_msg) + } +} +impl TryFrom for KadRequestMsg { + type Error = io::Error; + + fn try_from(message: proto::Message) -> Result { + proto_to_req_msg(message) + } +} +impl TryFrom for KadResponseMsg { + type Error = io::Error; + + fn try_from(message: proto::Message) -> Result { + proto_to_resp_msg(message) + } +} + /// Converts a `KadRequestMsg` into the corresponding protobuf message for sending. fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message { match kad_msg { From 3eae2b56942a21191306f27486a38d2685d2ef00 Mon Sep 17 00:00:00 2001 From: Ademola Date: Wed, 13 Sep 2023 23:18:52 +0100 Subject: [PATCH 2/5] Update changelog --- protocols/kad/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index a9cde76dd76..417016847f6 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -4,9 +4,11 @@ See [PR 4270]. - Reduce noise of "remote supports our protocol" log. See [PR 4278]. +- Depend on `quick-protobuf-codec` crate for codec logic. [PR 4270]: https://github.com/libp2p/rust-libp2p/pull/4270 [PR 4278]: https://github.com/libp2p/rust-libp2p/pull/4278 +[PR 4500]: https://github.com/libp2p/rust-libp2p/pull/4500 ## 0.44.3 From f6ea3f33f42b7085360e8997434e25e51bfd16ae Mon Sep 17 00:00:00 2001 From: Ademola Date: Wed, 13 Sep 2023 23:20:43 +0100 Subject: [PATCH 3/5] Fix changelog updates --- protocols/kad/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 417016847f6..bec580947f1 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -8,7 +8,7 @@ [PR 4270]: https://github.com/libp2p/rust-libp2p/pull/4270 [PR 4278]: https://github.com/libp2p/rust-libp2p/pull/4278 -[PR 4500]: https://github.com/libp2p/rust-libp2p/pull/4500 +[PR 4501]: https://github.com/libp2p/rust-libp2p/pull/4501 ## 0.44.3 From a8c517d205a00d11f5220b4208c31786858612c8 Mon Sep 17 00:00:00 2001 From: Ademola Date: Thu, 14 Sep 2023 11:23:13 +0100 Subject: [PATCH 4/5] Apply suggested fixes --- Cargo.toml | 2 +- protocols/kad/CHANGELOG.md | 7 ++++++- protocols/kad/Cargo.toml | 2 +- protocols/kad/src/protocol.rs | 27 +++++++++++---------------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 99df44f569c..e7b567636ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.45.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.3" } -libp2p-kad = { version = "0.44.4", path = "protocols/kad" } +libp2p-kad = { version = "0.44.5", path = "protocols/kad" } libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.1.0", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.13.1", path = "misc/metrics" } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index bec580947f1..0fb67a16647 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,10 +1,15 @@ +## 0.44.5 +- Migrate to `quick-protobuf-codec` crate for codec logic. + see [PR 4501]. + +[PR 4501]: https://github.com/libp2p/rust-libp2p/pull/4501 + ## 0.44.4 - Implement common traits on `RoutingUpdate`. See [PR 4270]. - Reduce noise of "remote supports our protocol" log. See [PR 4278]. -- Depend on `quick-protobuf-codec` crate for codec logic. [PR 4270]: https://github.com/libp2p/rust-libp2p/pull/4270 [PR 4278]: https://github.com/libp2p/rust-libp2p/pull/4278 diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index b76ad8ccf28..9c7ea0a1476 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = { workspace = true } description = "Kademlia protocol for libp2p" -version = "0.44.4" +version = "0.44.5" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index b05fc19cb6e..24b24789091 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -178,45 +178,40 @@ impl UpgradeInfo for KademliaProtocolConfig { } /// Codec for Kademlia inbound and outbound message framing. -pub struct KadCodec { +pub struct Codec { codec: quick_protobuf_codec::Codec, __phantom: PhantomData<(A, B)>, } -impl KadCodec { +impl Codec { fn new(max_packet_size: usize) -> Self { - KadCodec { + Codec { codec: quick_protobuf_codec::Codec::new(max_packet_size), __phantom: PhantomData, } } } -impl, B> Encoder for KadCodec { +impl, B> Encoder for Codec { type Error = io::Error; type Item = A; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.codec - .encode(item.into(), dst) - .map_err(|err| err.into()) + Ok(self.codec.encode(item.into(), dst)?) } } -impl> Decoder for KadCodec { +impl> Decoder for Codec { type Error = io::Error; type Item = B; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - self.codec - .decode(src)? - .map(|msg| B::try_from(msg)) - .transpose() + self.codec.decode(src)?.map(B::try_from).transpose() } } /// Sink of responses and stream of requests. -pub(crate) type KadInStreamSink = Framed>; +pub(crate) type KadInStreamSink = Framed>; /// Sink of requests and stream of responses. -pub(crate) type KadOutStreamSink = Framed>; +pub(crate) type KadOutStreamSink = Framed>; impl InboundUpgrade for KademliaProtocolConfig where @@ -227,7 +222,7 @@ where type Error = io::Error; fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { - let codec = KadCodec::new(self.max_packet_size); + let codec = Codec::new(self.max_packet_size); future::ok(Framed::new(incoming, codec)) } @@ -242,7 +237,7 @@ where type Error = io::Error; fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { - let codec = KadCodec::new(self.max_packet_size); + let codec = Codec::new(self.max_packet_size); future::ok(Framed::new(incoming, codec)) } From bd314d75b21c9ac4adecd0a6ed1508650ac56cb8 Mon Sep 17 00:00:00 2001 From: Ademola Date: Thu, 14 Sep 2023 17:15:04 +0100 Subject: [PATCH 5/5] Fix changelog and commit lockfile --- Cargo.lock | 3 ++- protocols/kad/CHANGELOG.md | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e1842797a3..9bb7e186c91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2573,7 +2573,7 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.4" +version = "0.44.5" dependencies = [ "arrayvec", "async-std", @@ -2594,6 +2594,7 @@ dependencies = [ "libp2p-yamux", "log", "quick-protobuf", + "quick-protobuf-codec", "quickcheck-ext", "rand 0.8.5", "serde", diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 0fb67a16647..5106b82f364 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,6 +1,6 @@ -## 0.44.5 +## 0.44.5 - unreleased - Migrate to `quick-protobuf-codec` crate for codec logic. - see [PR 4501]. + See [PR 4501]. [PR 4501]: https://github.com/libp2p/rust-libp2p/pull/4501 @@ -13,7 +13,6 @@ [PR 4270]: https://github.com/libp2p/rust-libp2p/pull/4270 [PR 4278]: https://github.com/libp2p/rust-libp2p/pull/4278 -[PR 4501]: https://github.com/libp2p/rust-libp2p/pull/4501 ## 0.44.3