Skip to content

Commit

Permalink
Replace prost with protobuf in protocols/floodsub
Browse files Browse the repository at this point in the history
  • Loading branch information
kckeiks committed Oct 21, 2022
1 parent 9cb7cfa commit 7f7e110
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 39 deletions.
4 changes: 2 additions & 2 deletions protocols/floodsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ futures = "0.3.1"
libp2p-core = { version = "0.37.0", path = "../../core" }
libp2p-swarm = { version = "0.40.0", path = "../../swarm" }
log = "0.4"
prost = "0.11"
protobuf = "3.2"
rand = "0.8"
smallvec = "1.6.1"

[build-dependencies]
prost-build = "0.11"
protobuf-codegen = "3.2"
10 changes: 9 additions & 1 deletion protocols/floodsub/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,13 @@
// DEALINGS IN THE SOFTWARE.

fn main() {
prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap();
let cust: protobuf_codegen::Customize = Default::default();
protobuf_codegen::Codegen::new()
.pure()
.includes(&["src"])
.input("src/rpc.proto")
.customize(cust.lite_runtime(true))
.cargo_out_dir("protos")
.run()
.unwrap()
}
12 changes: 7 additions & 5 deletions protocols/floodsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@
use libp2p_core::PeerId;

#[allow(clippy::derive_partial_eq_without_eq)]
mod protos {
include!(concat!(env!("OUT_DIR"), "/protos/mod.rs"));
}

use protos::rpc as rpc_proto;

pub mod protocol;

mod layer;
mod topic;

#[allow(clippy::derive_partial_eq_without_eq)]
mod rpc_proto {
include!(concat!(env!("OUT_DIR"), "/floodsub.pb.rs"));
}

pub use self::layer::{Floodsub, FloodsubEvent};
pub use self::protocol::{FloodsubMessage, FloodsubRpc};
pub use self::topic::Topic;
Expand Down
61 changes: 30 additions & 31 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures::{
AsyncWriteExt, Future,
};
use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
use prost::Message;
use protobuf::Message;
use std::{error, fmt, io, iter, pin::Pin};

/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
Expand Down Expand Up @@ -59,7 +59,7 @@ where
fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move {
let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?;
let rpc = rpc_proto::Rpc::decode(&packet[..])?;
let rpc = rpc_proto::RPC::parse_from_bytes(&packet[..])?;

let mut messages = Vec::with_capacity(rpc.publish.len());
for publish in rpc.publish.into_iter() {
Expand Down Expand Up @@ -97,7 +97,7 @@ pub enum FloodsubDecodeError {
/// Error when reading the packet from the socket.
ReadError(io::Error),
/// Error when decoding the raw buffer into a protobuf.
ProtobufError(prost::DecodeError),
ProtobufError(protobuf::Error),
/// Error when parsing the `PeerId` in the message.
InvalidPeerId,
}
Expand All @@ -108,8 +108,8 @@ impl From<io::Error> for FloodsubDecodeError {
}
}

impl From<prost::DecodeError> for FloodsubDecodeError {
fn from(err: prost::DecodeError) -> Self {
impl From<protobuf::Error> for FloodsubDecodeError {
fn from(err: protobuf::Error) -> Self {
FloodsubDecodeError::ProtobufError(err)
}
}
Expand Down Expand Up @@ -181,32 +181,31 @@ where
impl FloodsubRpc {
/// Turns this `FloodsubRpc` into a message that can be sent to a substream.
fn into_bytes(self) -> Vec<u8> {
let rpc = rpc_proto::Rpc {
publish: self
.messages
.into_iter()
.map(|msg| rpc_proto::Message {
from: Some(msg.source.to_bytes()),
data: Some(msg.data),
seqno: Some(msg.sequence_number),
topic_ids: msg.topics.into_iter().map(|topic| topic.into()).collect(),
})
.collect(),

subscriptions: self
.subscriptions
.into_iter()
.map(|topic| rpc_proto::rpc::SubOpts {
subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
topic_id: Some(topic.topic.into()),
})
.collect(),
};

let mut buf = Vec::with_capacity(rpc.encoded_len());
rpc.encode(&mut buf)
.expect("Vec<u8> provides capacity as needed");
buf
let mut rpc = rpc_proto::RPC::new();
rpc.publish = self
.messages
.into_iter()
.map(|msg| {
let mut m = rpc_proto::Message::new();
m.set_from(msg.source.to_bytes());
m.set_data(msg.data);
m.set_seqno(msg.sequence_number);
m.topic_ids = msg.topics.into_iter().map(|topic| topic.into()).collect();
m
})
.collect();
rpc.subscriptions = self
.subscriptions
.into_iter()
.map(|topic| {
let mut sub_opts = rpc_proto::rpc::SubOpts::new();
sub_opts.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe);
sub_opts.set_topic_id(topic.topic.into());
sub_opts
})
.collect();

rpc.write_to_bytes().expect("Encoding to succeed.")
}
}

Expand Down

0 comments on commit 7f7e110

Please sign in to comment.