-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Implement request-responses protocols #6634
Changes from 12 commits
849a623
de1efcc
2b774e7
418bb61
9083df9
44c0203
d0d9810
47fa2ac
13ed831
3f914b7
12878bb
de492a2
0dd28e7
66e04c1
83977da
43a82d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -16,9 +16,9 @@ | |||||
|
||||||
use crate::{ | ||||||
config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, | ||||||
peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, | ||||||
request_responses, peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, | ||||||
protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol}, | ||||||
Event, ObservedRole, DhtEvent, ExHashT, | ||||||
service::maybe_utf8_bytes_to_string, Event, ObservedRole, DhtEvent, ExHashT, | ||||||
}; | ||||||
|
||||||
use codec::Encode as _; | ||||||
|
@@ -37,6 +37,10 @@ use std::{ | |||||
time::Duration, | ||||||
}; | ||||||
|
||||||
pub use crate::request_responses::{ | ||||||
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError | ||||||
}; | ||||||
|
||||||
/// General behaviour of the network. Combines all protocols together. | ||||||
#[derive(NetworkBehaviour)] | ||||||
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")] | ||||||
|
@@ -48,6 +52,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> { | |||||
peer_info: peer_info::PeerInfoBehaviour, | ||||||
/// Discovers nodes of the network. | ||||||
discovery: DiscoveryBehaviour, | ||||||
/// Generic request-reponse protocols. | ||||||
request_responses: request_responses::RequestResponsesBehaviour, | ||||||
/// Block request handling. | ||||||
block_requests: block_requests::BlockRequests<B>, | ||||||
/// Finality proof request handling. | ||||||
|
@@ -74,22 +80,40 @@ pub enum BehaviourOut<B: BlockT> { | |||||
RandomKademliaStarted(ProtocolId), | ||||||
|
||||||
/// We have received a request from a peer and answered it. | ||||||
AnsweredRequest { | ||||||
/// | ||||||
/// This event is generated for statistics purposes. | ||||||
InboundRequest { | ||||||
/// Peer which sent us a request. | ||||||
peer: PeerId, | ||||||
/// Protocol name of the request. | ||||||
protocol: Vec<u8>, | ||||||
/// Time it took to build the response. | ||||||
build_time: Duration, | ||||||
protocol: Cow<'static, str>, | ||||||
/// If `Ok`, contains the time elapsed between when we received the request and when we | ||||||
/// sent back the response. If `Err`, the error that happened. | ||||||
result: Result<Duration, ResponseFailure>, | ||||||
}, | ||||||
|
||||||
/// A request initiated using [`Behaviour::send_request`] has succeeded or failed. | ||||||
RequestFinished { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Would be easier for me to understand without having to read the doc comment. |
||||||
/// Request that has succeeded. | ||||||
request_id: RequestId, | ||||||
/// Response sent by the remote or reason for failure. | ||||||
result: Result<Vec<u8>, RequestFailure>, | ||||||
}, | ||||||
|
||||||
/// Started a new request with the given node. | ||||||
RequestStarted { | ||||||
/// | ||||||
/// This event is for statistics purposes only. The request and response handling are entirely | ||||||
/// internal to the behaviour. | ||||||
OpaqueRequestStarted { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Would that not be more descriptive? Would as well match with |
||||||
peer: PeerId, | ||||||
/// Protocol name of the request. | ||||||
protocol: Vec<u8>, | ||||||
}, | ||||||
/// Finished, successfully or not, a previously-started request. | ||||||
RequestFinished { | ||||||
/// | ||||||
/// This event is for statistics purposes only. The request and response handling are entirely | ||||||
/// internal to the behaviour. | ||||||
OpaqueRequestFinished { | ||||||
/// Who we were requesting. | ||||||
peer: PeerId, | ||||||
/// Protocol name of the request. | ||||||
|
@@ -116,17 +140,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { | |||||
finality_proof_requests: finality_requests::FinalityProofRequests<B>, | ||||||
light_client_handler: light_client_handler::LightClientHandler<B>, | ||||||
disco_config: DiscoveryConfig, | ||||||
) -> Self { | ||||||
Behaviour { | ||||||
request_response_protocols: Vec<request_responses::ProtocolConfig>, | ||||||
) -> Result<Self, request_responses::RegisterError> { | ||||||
Ok(Behaviour { | ||||||
substrate, | ||||||
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), | ||||||
discovery: disco_config.finish(), | ||||||
request_responses: | ||||||
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, | ||||||
block_requests, | ||||||
finality_proof_requests, | ||||||
light_client_handler, | ||||||
events: VecDeque::new(), | ||||||
role, | ||||||
} | ||||||
}) | ||||||
} | ||||||
|
||||||
/// Returns the list of nodes that we know exist in the network. | ||||||
|
@@ -163,6 +190,16 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> { | |||||
self.peer_info.node(peer_id) | ||||||
} | ||||||
|
||||||
/// Initiates sending a request. | ||||||
/// | ||||||
/// An error is returned if we are not connected to the target peer of if the protocol doesn't | ||||||
/// match one that has been registered. | ||||||
pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec<u8>) | ||||||
-> Result<RequestId, SendRequestError> | ||||||
{ | ||||||
self.request_responses.send_request(target, protocol, request) | ||||||
} | ||||||
|
||||||
/// Registers a new notifications protocol. | ||||||
/// | ||||||
/// After that, you can call `write_notifications`. | ||||||
|
@@ -255,18 +292,18 @@ Behaviour<B, H> { | |||||
CustomMessageOutcome::BlockRequest { target, request } => { | ||||||
match self.block_requests.send_request(&target, request) { | ||||||
block_requests::SendRequestOutcome::Ok => { | ||||||
self.events.push_back(BehaviourOut::RequestStarted { | ||||||
self.events.push_back(BehaviourOut::OpaqueRequestStarted { | ||||||
peer: target, | ||||||
protocol: self.block_requests.protocol_name().to_vec(), | ||||||
}); | ||||||
}, | ||||||
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { | ||||||
self.events.push_back(BehaviourOut::RequestFinished { | ||||||
self.events.push_back(BehaviourOut::OpaqueRequestFinished { | ||||||
peer: target.clone(), | ||||||
protocol: self.block_requests.protocol_name().to_vec(), | ||||||
request_duration, | ||||||
}); | ||||||
self.events.push_back(BehaviourOut::RequestStarted { | ||||||
self.events.push_back(BehaviourOut::OpaqueRequestStarted { | ||||||
peer: target, | ||||||
protocol: self.block_requests.protocol_name().to_vec(), | ||||||
}); | ||||||
|
@@ -307,18 +344,41 @@ Behaviour<B, H> { | |||||
} | ||||||
} | ||||||
|
||||||
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Event> for Behaviour<B, H> { | ||||||
fn inject_event(&mut self, event: request_responses::Event) { | ||||||
match event { | ||||||
request_responses::Event::InboundRequest { peer, protocol, result } => { | ||||||
self.events.push_back(BehaviourOut::InboundRequest { | ||||||
peer, | ||||||
protocol, | ||||||
result, | ||||||
}); | ||||||
} | ||||||
|
||||||
request_responses::Event::RequestFinished { request_id, result } => { | ||||||
self.events.push_back(BehaviourOut::RequestFinished { | ||||||
request_id, | ||||||
result, | ||||||
}); | ||||||
}, | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> { | ||||||
fn inject_event(&mut self, event: block_requests::Event<B>) { | ||||||
match event { | ||||||
block_requests::Event::AnsweredRequest { peer, total_handling_time } => { | ||||||
self.events.push_back(BehaviourOut::AnsweredRequest { | ||||||
let protocol = maybe_utf8_bytes_to_string(&self.block_requests.protocol_name()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to its documentation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The protocol name contained in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally this would be handled consistently. If changing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that you hate |
||||||
.into_owned(); | ||||||
self.events.push_back(BehaviourOut::InboundRequest { | ||||||
peer, | ||||||
protocol: self.block_requests.protocol_name().to_vec(), | ||||||
build_time: total_handling_time, | ||||||
protocol: From::from(protocol), | ||||||
result: Ok(total_handling_time), | ||||||
}); | ||||||
}, | ||||||
block_requests::Event::Response { peer, original_request: _, response, request_duration } => { | ||||||
self.events.push_back(BehaviourOut::RequestFinished { | ||||||
self.events.push_back(BehaviourOut::OpaqueRequestFinished { | ||||||
peer: peer.clone(), | ||||||
protocol: self.block_requests.protocol_name().to_vec(), | ||||||
request_duration, | ||||||
|
@@ -330,7 +390,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B | |||||
block_requests::Event::RequestTimeout { peer, request_duration, .. } => { | ||||||
// There doesn't exist any mechanism to report cancellations or timeouts yet, so | ||||||
// we process them by disconnecting the node. | ||||||
self.events.push_back(BehaviourOut::RequestFinished { | ||||||
self.events.push_back(BehaviourOut::OpaqueRequestFinished { | ||||||
peer: peer.clone(), | ||||||
protocol: self.block_requests.protocol_name().to_vec(), | ||||||
request_duration, | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do protocol names have to be strings instead of bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While libp2p allows bytes, I went for restricting this to strings in Substrate because:
maybe_utf8_bytes_to_string
).