Skip to content

Commit

Permalink
plaintext/2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ackintosh committed Sep 2, 2019
1 parent e686c2f commit 5ee673b
Show file tree
Hide file tree
Showing 5 changed files with 534 additions and 17 deletions.
6 changes: 5 additions & 1 deletion protocols/plaintext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
bytes = "0.4"
futures = "0.1"
libp2p-core = { version = "0.12.0", path = "../../core" }
log = "0.4.6"
void = "1"

tokio-io = "0.1.12"
protobuf = "2.3"
rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" }
13 changes: 13 additions & 0 deletions protocols/plaintext/regen_structs_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

# This script regenerates the `src/structs_proto.rs` file from `structs.proto`.

sudo docker run --rm -v `pwd`:/usr/code:z -w /usr/code rust /bin/bash -c " \
apt-get update; \
apt-get install -y protobuf-compiler; \
cargo install --version 2.3.0 protobuf-codegen; \
protoc --rust_out . structs.proto"

#sudo chown $USER:$USER *.rs

mv -f structs.rs ./src/structs_proto.rs
305 changes: 289 additions & 16 deletions protocols/plaintext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,313 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::{Future, StartSend, Poll};
use futures::sink::Sink;
use futures::stream::MapErr as StreamMapErr;
use futures::stream::Stream;
use futures::future::{self, FutureResult};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated};
use log::{debug, trace};
use libp2p_core::{identity, InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated, PeerId, PublicKey};
use rw_stream_sink::RwStreamSink;
use std::iter;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::error;
use std::fmt;
use void::Void;
use bytes::BytesMut;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use tokio_io::codec::length_delimited::Framed;
use crate::structs_proto::Propose;
use protobuf::Message;
use protobuf::error::ProtobufError;

#[derive(Debug, Copy, Clone)]
pub struct PlainTextConfig;
mod structs_proto;

#[derive(Clone)]
pub struct PlainTextConfig {
// peerId: PeerId,
pub key: identity::Keypair,
}

impl UpgradeInfo for PlainTextConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;

fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/plaintext/1.0.0")
iter::once(b"/plaintext/2.0.0")
}
}

impl<C> InboundUpgrade<C> for PlainTextConfig
where
C: AsyncRead + AsyncWrite + Send + 'static
{
type Output = PlainTextOutput<Negotiated<C>>;
type Error = PlainTextError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;

fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(self.handshake(socket))
// future::ok(i)

}
}

impl<C> OutboundUpgrade<C> for PlainTextConfig
where
C: AsyncRead + AsyncWrite + Send + 'static
{
type Output = PlainTextOutput<Negotiated<C>>;
type Error = PlainTextError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;

fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(self.handshake(socket))
// future::ok(i)
}
}

impl PlainTextConfig {
fn handshake<T>(self, socket: T) -> impl Future<Item = PlainTextOutput<T>, Error = PlainTextError>
where
T: AsyncRead + AsyncWrite + Send + 'static
{
debug!("Starting plaintext upgrade");
PlainTextMiddleware::handshake(socket, self)
.map(|(stream_sink, public_key)| {
let mapped = stream_sink.map_err(map_err as fn(_) -> _);
PlainTextOutput {
stream: RwStreamSink::new(mapped),
remote_key: public_key,
}
})
}
}

#[derive(Debug)]
pub enum PlainTextError {
/// I/O error.
IoError(IoError),

/// Protocol buffer error.
ProtobufError(ProtobufError),

/// Failed to parse one of the handshake protobuf messages.
HandshakeParsingFailure,
}

impl error::Error for PlainTextError {
fn cause(&self) -> Option<&dyn error::Error> {
match *self {
PlainTextError::IoError(ref err) => Some(err),
PlainTextError::ProtobufError(ref err) => Some(err),
_ => None,
}
}
}

impl fmt::Display for PlainTextError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
PlainTextError::IoError(e) => write!(f, "I/O error: {}", e),
PlainTextError::ProtobufError(e) => write!(f, "Protobuf error: {}", e),
PlainTextError::HandshakeParsingFailure => f.write_str("Failed to parse one of the handshake protobuf messages"),
}
}
}

impl From<IoError> for PlainTextError {
#[inline]
fn from(err: IoError) -> PlainTextError {
PlainTextError::IoError(err)
}
}

impl From<ProtobufError> for PlainTextError {
#[inline]
fn from(err: ProtobufError) -> PlainTextError {
PlainTextError::ProtobufError(err)
}
}

struct HandShakeContext<T> {
config: PlainTextConfig,
state: T
}

// HandshakeContext<()> --with_local-> HandshakeContext<Local>
struct Local {
// Our encoded local public key
public_key_encoded: Vec<u8>,
// Our local proposition's raw bytes:
proposition_bytes: Vec<u8>,
}

struct Remote {
local: Local,
// The remote's proposition's raw bytes:
proposition_bytes: BytesMut,
// The remote's public key:
public_key: PublicKey,
}

impl HandShakeContext<()> {
fn new(config: PlainTextConfig) -> Self {
Self {
config,
state: (),
}
}

fn with_local(self) -> Result<HandShakeContext<Local>, PlainTextError> {
let public_key_encoded = self.config.key.public().into_protobuf_encoding();
let mut proposition = Propose::new();
proposition.set_pubkey(public_key_encoded.clone());

let proposition_bytes = proposition.write_to_bytes()?;

Ok(HandShakeContext {
config: self.config,
state: Local {
public_key_encoded,
proposition_bytes,
}
})
}
}

impl<C> InboundUpgrade<C> for PlainTextConfig {
type Output = Negotiated<C>;
type Error = Void;
type Future = FutureResult<Negotiated<C>, Self::Error>;
impl HandShakeContext<Local> {
fn with_remote(self, proposition_bytes: BytesMut) -> Result<HandShakeContext<Remote>, PlainTextError> {
let mut prop = match protobuf::parse_from_bytes::<Propose>(&proposition_bytes) {
Ok(prop) => prop,
Err(_) => {
debug!("failed to parse remote's proposition protobuf message");
return Err(PlainTextError::HandshakeParsingFailure);
},
};

fn upgrade_inbound(self, i: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ok(i)
let public_key_encoded = prop.take_pubkey();
let public_key = match PublicKey::from_protobuf_encoding(&public_key_encoded) {
Ok(p) => p,
Err(_) => {
debug!("failed to parse remote's proposition's pubkey protobuf");
return Err(PlainTextError::HandshakeParsingFailure);
},
};

Ok(HandShakeContext {
config: self.config,
state: Remote {
local: self.state,
proposition_bytes,
public_key,
}
})
}
}

impl<C> OutboundUpgrade<C> for PlainTextConfig {
type Output = Negotiated<C>;
type Error = Void;
type Future = FutureResult<Negotiated<C>, Self::Error>;
#[inline]
fn map_err(err: IoError) -> IoError {
debug!("error during plaintext handshake {:?}", err);
IoError::new(IoErrorKind::InvalidData, err)
}

pub struct PlainTextMiddleware<S> {
pub inner: Framed<S, BytesMut>,
}

fn upgrade_outbound(self, i: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ok(i)
impl<T> PlainTextMiddleware<T>
where
T: AsyncRead + AsyncWrite + Send,
{
fn handshake(socket: T, config: PlainTextConfig) -> impl Future<Item = (PlainTextMiddleware<T>, PublicKey), Error = PlainTextError>
where
T: AsyncRead + AsyncWrite + Send + 'static
{
let socket = length_delimited::Builder::new()
.big_endian()
.length_field_length(4)
.new_framed(socket);

future::ok::<_, PlainTextError>(HandShakeContext::new(config))
.and_then(|context| {
trace!("starting handshake");
Ok(context.with_local()?)
})
.and_then(|context| {
trace!("sending proposition to remote");
socket.send(BytesMut::from(context.state.proposition_bytes.clone()))
.from_err()
.map(|s| (s, context))
})
.and_then(move |(socket, context)| {
trace!("receiving the remote's proposition");
socket.into_future()
.map_err(|(e, _)| e.into())
.and_then(move |(prop_raw, socket)| {
let context = match prop_raw {
Some(p) => context.with_remote(p)?,
None => {
debug!("unexpected eof while waiting for remote's proposition");
let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof");
return Err(err.into());
}
};

trace!("received proposition from remote; pubkey = {:?}", context.state.public_key);
Ok((socket, context))
})
})
.and_then(|(socket, context)| {
let middleware = PlainTextMiddleware { inner: socket };
Ok((middleware, context.state.public_key))
})
}
}

impl<S> Sink for PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite,
{
type SinkItem = BytesMut;
type SinkError = IoError;

#[inline]
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.inner.start_send(item)
}

#[inline]
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
}

#[inline]
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.inner.close()
}
}

impl<S> Stream for PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite,
{
type Item = BytesMut;
type Error = IoError;

#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
}
}

/// Output of the plaintext protocol.
pub struct PlainTextOutput<S>
where
S: AsyncRead + AsyncWrite,
{
pub stream: RwStreamSink<StreamMapErr<PlainTextMiddleware<S>, fn(IoError) -> IoError>>,
/// The public key of the remote.
pub remote_key: PublicKey,
}
Loading

0 comments on commit 5ee673b

Please sign in to comment.