This repository has been archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Switch from devp2p to libp2p #268
Merged
Merged
Changes from all commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
a94e7e7
Switch from devp2p to libp2p
tomaka a514ed3
Move the keys in the network state
tomaka 1cfc18e
Properly load, store or generate private key
tomaka 0e30b5d
Some robustness
tomaka 536edd6
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka 21ab784
Update for latest libp2p
tomaka fb17f77
Allow secio
tomaka 329b7dc
Don't open a new Kademlia connec all the time
tomaka abe756c
Handle Kademlia disconnection
tomaka 1131498
Set correct permissions on key file
tomaka d1d38d7
Improvements to secret key storage
tomaka 2277a5f
Flush the peer store at Kademlia requests
tomaka 224f6b1
Use RAII guards for disconnection
tomaka 77092a3
Some misc work
tomaka 18ee06f
Set informations about peers
tomaka 4ad5da7
Fix tests and external URL
tomaka fdc5fbf
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka 9e06c31
Fix some style
tomaka 5e74817
Split obtain_private_key into multiple function
tomaka 293cbdf
Split start_kademlia_discovery in multiple functions
tomaka 95c79fc
More style fixes
tomaka e8f743c
More style fixes
tomaka 61e3e57
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka 533f53d
Fix some concerns
tomaka fc5a342
Turn // into ///
tomaka e5390ab
More style fixes
tomaka 2316ebb
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka ce3b819
More style fixes
tomaka b9b54b3
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka dfa92bc
Merge remote-tracking branch 'origin/master' into tka-libp2p
gavofyork e9757a4
Add annotations to unreachable!
tomaka 6731f05
Fix style again
tomaka 06b5498
Remove commented out code
tomaka ab89ef9
Fix test year
tomaka 1a942a2
More concerns
tomaka File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
[package] | ||
description = "libp2p implementation of the ethcore network library" | ||
homepage = "http://parity.io" | ||
license = "GPL-3.0" | ||
name = "substrate-network-libp2p" | ||
version = "0.1.0" | ||
authors = ["Parity Technologies <admin@parity.io>"] | ||
|
||
[dependencies] | ||
bytes = "0.4" | ||
fnv = "1.0" | ||
futures = "0.1" | ||
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "77b1c445807e53b8c5e4e5e2da751222da15b8cc", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] } | ||
ethcore-network = { git = "https://github.com/paritytech/parity.git" } | ||
ethkey = { git = "https://github.com/paritytech/parity.git" } | ||
parking_lot = "0.5" | ||
log = "0.3" | ||
rand = "0.5.0" | ||
tokio-core = "0.1" | ||
tokio-io = "0.1" | ||
tokio-timer = "0.2" | ||
varint = { git = "https://github.com/libp2p/rust-libp2p" } | ||
|
||
[dev-dependencies] | ||
ethcore-bytes = { git = "https://github.com/paritytech/parity.git" } | ||
ethcore-io = { git = "https://github.com/paritytech/parity.git" } | ||
ethcore-logger = { git = "https://github.com/paritytech/parity.git" } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
// Copyright 2018 Parity Technologies (UK) Ltd. | ||
// This file is part of Polkadot. | ||
|
||
// Polkadot is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// Polkadot is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? | ||
|
||
use bytes::{Bytes, BytesMut}; | ||
use network::ProtocolId; | ||
use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint}; | ||
use network::PacketId; | ||
use std::io::Error as IoError; | ||
use std::vec::IntoIter as VecIntoIter; | ||
use futures::{future, Future, stream, Stream, Sink}; | ||
use futures::sync::mpsc; | ||
use tokio_io::{AsyncRead, AsyncWrite}; | ||
use varint::VarintCodec; | ||
|
||
/// Connection upgrade for a single protocol. | ||
/// | ||
/// Note that "a single protocol" here refers to `par` for example. However | ||
/// each protocol can have multiple different versions for networking purposes. | ||
#[derive(Clone)] | ||
pub struct RegisteredProtocol<T> { | ||
/// Id of the protocol for API purposes. | ||
id: ProtocolId, | ||
/// Base name of the protocol as advertised on the network. | ||
/// Ends with `/` so that we can append a version number behind. | ||
base_name: Bytes, | ||
/// List of protocol versions that we support, plus their packet count. | ||
/// Ordered in descending order so that the best comes first. | ||
/// The packet count is used to filter out invalid messages. | ||
supported_versions: Vec<(u8, u8)>, | ||
/// Custom data. | ||
custom_data: T, | ||
} | ||
|
||
/// Output of a `RegisteredProtocol` upgrade. | ||
pub struct RegisteredProtocolOutput<T> { | ||
/// Data passed to `RegisteredProtocol::new`. | ||
pub custom_data: T, | ||
|
||
/// Id of the protocol. | ||
pub protocol_id: ProtocolId, | ||
|
||
/// Version of the protocol that was negotiated. | ||
pub protocol_version: u8, | ||
|
||
/// Channel to sender outgoing messages to. Closing this channel closes the | ||
/// connection. | ||
// TODO: consider assembling packet_id here | ||
pub outgoing: mpsc::UnboundedSender<Bytes>, | ||
|
||
/// Stream where incoming messages are received. The stream ends whenever | ||
/// either side is closed. | ||
pub incoming: Box<Stream<Item = (PacketId, Bytes), Error = IoError>>, | ||
} | ||
|
||
impl<T> RegisteredProtocol<T> { | ||
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be | ||
/// passed inside the `RegisteredProtocolOutput`. | ||
pub fn new(custom_data: T, protocol: ProtocolId, versions: &[(u8, u8)]) | ||
-> Self { | ||
let mut proto_name = Bytes::from_static(b"/substrate/"); | ||
proto_name.extend_from_slice(&protocol); | ||
proto_name.extend_from_slice(b"/"); | ||
|
||
RegisteredProtocol { | ||
base_name: proto_name, | ||
id: protocol, | ||
supported_versions: { | ||
let mut tmp: Vec<_> = versions.iter().rev().cloned().collect(); | ||
tmp.sort_unstable_by(|a, b| b.1.cmp(&a.1)); | ||
tmp | ||
}, | ||
custom_data: custom_data, | ||
} | ||
} | ||
|
||
/// Returns the ID of the protocol. | ||
pub fn id(&self) -> ProtocolId { | ||
self.id | ||
} | ||
|
||
/// Returns the custom data that was passed to `new`. | ||
pub fn custom_data(&self) -> &T { | ||
&self.custom_data | ||
} | ||
} | ||
|
||
// `Maf` is short for `MultiaddressFuture` | ||
impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocol<T> | ||
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ | ||
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :( | ||
{ | ||
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>; | ||
type UpgradeIdentifier = u8; // Protocol version | ||
|
||
#[inline] | ||
fn protocol_names(&self) -> Self::NamesIter { | ||
// Report each version as an individual protocol. | ||
self.supported_versions.iter().map(|&(ver, _)| { | ||
let num = ver.to_string(); | ||
let mut name = self.base_name.clone(); | ||
name.extend_from_slice(num.as_bytes()); | ||
(name, ver) | ||
}).collect::<Vec<_>>().into_iter() | ||
} | ||
|
||
type Output = RegisteredProtocolOutput<T>; | ||
type MultiaddrFuture = Maf; | ||
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; | ||
|
||
fn upgrade( | ||
self, | ||
socket: C, | ||
protocol_version: Self::UpgradeIdentifier, | ||
endpoint: Endpoint, | ||
remote_addr: Maf | ||
) -> Self::Future { | ||
let packet_count = self.supported_versions | ||
.iter() | ||
.find(|&(v, _)| *v == protocol_version) | ||
.expect("negotiated protocol version that wasn't advertised ; \ | ||
programmer error") | ||
.1; | ||
|
||
// This function is called whenever we successfully negotiated a | ||
// protocol with a remote (both if initiated by us or by the remote) | ||
|
||
// This channel is used to send outgoing packets to the custom_data | ||
// for this open substream. | ||
let (msg_tx, msg_rx) = mpsc::unbounded(); | ||
|
||
// Build the sink for outgoing network bytes, and the stream for | ||
// incoming instructions. `stream` implements `Stream<Item = Message>`. | ||
enum Message { | ||
/// Received data from the network. | ||
RecvSocket(BytesMut), | ||
/// Data to send to the network. | ||
/// The packet_id must already be inside the `Bytes`. | ||
SendReq(Bytes), | ||
/// The socket has been closed. | ||
Finished, | ||
} | ||
|
||
let (sink, stream) = { | ||
let framed = AsyncRead::framed(socket, VarintCodec::default()); | ||
let msg_rx = msg_rx.map(Message::SendReq) | ||
.chain(stream::once(Ok(Message::Finished))) | ||
.map_err(|()| unreachable!("mpsc::UnboundedReceiver never errors")); | ||
let (sink, stream) = framed.split(); | ||
let stream = stream.map(Message::RecvSocket) | ||
.chain(stream::once(Ok(Message::Finished))); | ||
(sink, msg_rx.select(stream)) | ||
}; | ||
|
||
let incoming = stream::unfold((sink, stream, false), move |(sink, stream, finished)| { | ||
if finished { | ||
return None | ||
} | ||
|
||
Some(stream | ||
.into_future() | ||
.map_err(|(err, _)| err) | ||
.and_then(move |(message, stream)| | ||
match message { | ||
Some(Message::RecvSocket(mut data)) => { | ||
// The `data` should be prefixed by the packet ID, | ||
// therefore an empty packet is invalid. | ||
if data.is_empty() { | ||
debug!(target: "sub-libp2p", "ignoring incoming \ | ||
packet because it was empty"); | ||
let f = future::ok((None, (sink, stream, false))); | ||
return future::Either::A(f) | ||
} | ||
|
||
let packet_id = data[0]; | ||
let data = data.split_off(1); | ||
|
||
if packet_id >= packet_count { | ||
debug!(target: "sub-libp2p", "ignoring incoming packet \ | ||
because packet_id {} is too large", packet_id); | ||
let f = future::ok((None, (sink, stream, false))); | ||
future::Either::A(f) | ||
} else { | ||
let out = Some((packet_id, data.freeze())); | ||
let f = future::ok((out, (sink, stream, false))); | ||
future::Either::A(f) | ||
} | ||
}, | ||
|
||
Some(Message::SendReq(data)) => { | ||
let fut = sink.send(data) | ||
.map(move |sink| (None, (sink, stream, false))); | ||
future::Either::B(fut) | ||
}, | ||
|
||
Some(Message::Finished) | None => { | ||
let f = future::ok((None, (sink, stream, true))); | ||
future::Either::A(f) | ||
}, | ||
} | ||
)) | ||
}).filter_map(|v| v); | ||
|
||
let out = RegisteredProtocolOutput { | ||
custom_data: self.custom_data, | ||
protocol_id: self.id, | ||
protocol_version: protocol_version, | ||
outgoing: msg_tx, | ||
incoming: Box::new(incoming), | ||
}; | ||
|
||
future::ok((out, remote_addr)) | ||
} | ||
} | ||
|
||
// Connection upgrade for all the protocols contained in it. | ||
#[derive(Clone)] | ||
pub struct RegisteredProtocols<T>(pub Vec<RegisteredProtocol<T>>); | ||
|
||
impl<T> RegisteredProtocols<T> { | ||
/// Finds a protocol in the list by its id. | ||
pub fn find_protocol(&self, protocol: ProtocolId) | ||
-> Option<&RegisteredProtocol<T>> { | ||
self.0.iter().find(|p| p.id == protocol) | ||
} | ||
|
||
/// Returns true if the given protocol is in the list. | ||
pub fn has_protocol(&self, protocol: ProtocolId) -> bool { | ||
self.0.iter().any(|p| p.id == protocol) | ||
} | ||
} | ||
|
||
impl<T> Default for RegisteredProtocols<T> { | ||
fn default() -> Self { | ||
RegisteredProtocols(Vec::new()) | ||
} | ||
} | ||
|
||
impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocols<T> | ||
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ | ||
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :( | ||
{ | ||
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>; | ||
type UpgradeIdentifier = (usize, | ||
<RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier); | ||
|
||
fn protocol_names(&self) -> Self::NamesIter { | ||
// We concat the lists of `RegisteredProtocol::protocol_names` for | ||
// each protocol. | ||
self.0.iter().enumerate().flat_map(|(n, proto)| | ||
ConnectionUpgrade::<C, Maf>::protocol_names(proto) | ||
.map(move |(name, id)| (name, (n, id))) | ||
).collect::<Vec<_>>().into_iter() | ||
} | ||
|
||
type Output = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Output; | ||
type MultiaddrFuture = <RegisteredProtocol<T> as | ||
ConnectionUpgrade<C, Maf>>::MultiaddrFuture; | ||
type Future = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Future; | ||
|
||
#[inline] | ||
fn upgrade( | ||
self, | ||
socket: C, | ||
upgrade_identifier: Self::UpgradeIdentifier, | ||
endpoint: Endpoint, | ||
remote_addr: Maf | ||
) -> Self::Future { | ||
let (protocol_index, inner_proto_id) = upgrade_identifier; | ||
self.0.into_iter() | ||
.nth(protocol_index) | ||
.expect("invalid protocol index ; programmer logic error") | ||
.upgrade(socket, inner_proto_id, endpoint, remote_addr) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
// Copyright 2018 Parity Technologies (UK) Ltd. | ||
// This file is part of Polkadot. | ||
|
||
// Polkadot is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// Polkadot is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? | ||
|
||
#![type_length_limit = "268435456"] | ||
|
||
extern crate parking_lot; | ||
extern crate fnv; | ||
extern crate futures; | ||
extern crate tokio_core; | ||
extern crate tokio_io; | ||
extern crate tokio_timer; | ||
extern crate ethkey; | ||
extern crate ethcore_network as network; | ||
extern crate libp2p; | ||
extern crate rand; | ||
extern crate bytes; | ||
extern crate varint; | ||
|
||
#[macro_use] | ||
extern crate log; | ||
|
||
mod custom_proto; | ||
mod network_state; | ||
mod service; | ||
mod timeouts; | ||
mod transport; | ||
|
||
pub use service::NetworkService; | ||
|
||
/// Check if node url is valid | ||
pub fn validate_node_url(url: &str) -> Result<(), network::Error> { | ||
match url.parse::<libp2p::multiaddr::Multiaddr>() { | ||
Ok(_) => Ok(()), | ||
Err(_) => Err(network::ErrorKind::InvalidNodeId.into()), | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long type!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't
impl Trait
just assign unique numbers or something so we don't have to deal with this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some
Box
es after I realized that the compile time was starting to be really long, so the limit might not need to be that high. However I don't see a reason to lower it either.