Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Implements basic Routing functions #582

Merged
merged 18 commits into from
Aug 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions examples/simple_key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,12 @@ impl Node {
our_authority : Authority,
from_authority : Authority,
response_token : Option<SignedToken>) {
match request {
match request.clone() {
ExternalRequest::Get(data_request) => {
self.handle_get_request(data_request,
our_authority,
from_authority,
request,
response_token);
},
ExternalRequest::Put(data) => {
Expand All @@ -199,6 +200,7 @@ impl Node {
fn handle_get_request(&mut self, data_request : DataRequest,
_our_authority : Authority,
from_authority : Authority,
orig_request : ExternalRequest,
response_token : Option<SignedToken>) {
let name = match data_request {
DataRequest::PlainData(name) => name,
Expand All @@ -210,11 +212,14 @@ impl Node {
None => return,
};

self.routing.get_response(from_authority, Data::PlainData(data), response_token);
self.routing.get_response(from_authority,
Data::PlainData(data),
orig_request,
response_token);
}

fn handle_put_request(&mut self, data : Data,
our_authority : Authority,
fn handle_put_request(&mut self, data : Data,
our_authority : Authority,
_from_authority : Authority,
_response_token : Option<SignedToken>) {
let plain_data = match data {
Expand Down
2 changes: 0 additions & 2 deletions src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ pub fn our_authority(message : &RoutingMessage,
Content::InternalRequest(ref request) => {
match *request {
InternalRequest::Connect(_) => None,
InternalRequest::FindGroup => None,
InternalRequest::GetGroupKey => None,
InternalRequest::RequestNetworkName(ref public_id) => Some(public_id.name()),
InternalRequest::CacheNetworkName(ref public_id, _) => Some(public_id.name()),
InternalRequest::Refresh(_, _) => None,
Expand Down
1 change: 0 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub enum Event {
response : ExternalResponse,
our_authority : Authority,
from_authority : Authority,
orig_request : ExternalRequest,
},
Refresh(u64, NameType, Vec<Vec<u8>>),
// ~|~ ~~|~~~~~ ~~|~~~~~~~~~
Expand Down
44 changes: 25 additions & 19 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,36 +70,42 @@ pub enum ExternalRequest {

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, RustcEncodable, RustcDecodable)]
pub enum ExternalResponse {
Get (Data, SignedToken),
Put (ResponseError, SignedToken),
Post (ResponseError, SignedToken),
Delete(ResponseError, SignedToken),
// TODO: Technical depth: if the third param here is Some(...) then
// the it shares most of the data with the second argument, which
// needlessly increases bandwidth.
Get (Data, ExternalRequest, Option<SignedToken>),
Put (ResponseError, ExternalRequest, Option<SignedToken>),
Post (ResponseError, ExternalRequest, Option<SignedToken>),
Delete(ResponseError, ExternalRequest, Option<SignedToken>),
}

impl ExternalResponse {
pub fn get_signed_token(&self) -> &SignedToken {
// If the *request* was from a group entity, then there is
// no signed token.
pub fn get_signed_token(&self) -> &Option<SignedToken> {
match *self {
ExternalResponse::Get(_, ref r) => r,
ExternalResponse::Put(_, ref r) => r,
ExternalResponse::Post(_, ref r) => r,
ExternalResponse::Delete(_, ref r) => r,
ExternalResponse::Get(_, _, ref r) => r,
ExternalResponse::Put(_, _, ref r) => r,
ExternalResponse::Post(_, _, ref r) => r,
ExternalResponse::Delete(_, _, ref r) => r,
}
}

pub fn get_orig_request(&self) -> Result<SignedMessage, CborError> {
SignedMessage::new_from_token(self.get_signed_token().clone())
}

pub fn verify_request_came_from(&self, requester_pub_key: &sign::PublicKey) -> bool {
self.get_signed_token().verify_signature(requester_pub_key)
pub fn get_orig_request(&self) -> &ExternalRequest {
match *self {
ExternalResponse::Get(_, ref r, _) => r,
ExternalResponse::Put(_, ref r, _) => r,
ExternalResponse::Post(_, ref r, _) => r,
ExternalResponse::Delete(_, ref r, _) => r,
}
}
}

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, RustcEncodable, RustcDecodable)]
pub enum InternalRequest {
Connect(ConnectRequest),
FindGroup,
GetGroupKey,
// FindGroup,
// GetGroupKey,
RequestNetworkName(PublicId),
// a client can send RequestNetworkName
CacheNetworkName(PublicId, SignedToken),
Expand All @@ -115,8 +121,8 @@ pub enum InternalRequest {
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, RustcEncodable, RustcDecodable)]
pub enum InternalResponse {
Connect(ConnectResponse, SignedToken),
FindGroup(Vec<PublicId>, SignedToken),
GetGroupKey(BTreeMap<NameType, sign::PublicKey>, SignedToken),
// FindGroup(Vec<PublicId>, SignedToken),
// GetGroupKey(BTreeMap<NameType, sign::PublicKey>, SignedToken),
CacheNetworkName(PublicId, Vec<PublicId>, SignedToken),
// ~~|~~~~~ ~~|~~~~~~~~~~ ~~|~~~~~~~~
// | | | the original Request::RequestNetworkName
Expand Down
112 changes: 75 additions & 37 deletions src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use sodiumoxide;
use std::sync::mpsc;
use std::thread::spawn;

use id::Id;
use action::Action;
Expand All @@ -30,13 +31,13 @@ use types::Bytes;
use error::{RoutingError, ResponseError};
use authority::Authority;
use sodiumoxide::crypto;
use messages::{ExternalRequest, ExternalResponse, Content};

//use types::{MessageId, Address};
//use utils::{encode, decode};
//use authority::{Authority};
//use messages::{RoutingMessage, SignedMessage, MessageType};
//use error::{RoutingError};
//use std::thread::spawn;
//use std::collections::BTreeMap;

type RoutingResult = Result<(), RoutingError>;
Expand All @@ -46,43 +47,52 @@ type RoutingResult = Result<(), RoutingError>;
/// Routing objects are clonable for multithreading, or a Routing object can be
/// cloned with a new set of keys while preserving a single RoutingNode.
pub struct Routing {
keys : Id,
action_sender : mpsc::Sender<Action>,
}

impl Routing {
/// Starts a new RoutingIdentity, which will also start a new RoutingNode.
/// The RoutingNode will attempt to achieve full routing node status.
/// The intial Routing object will have newly generated keys
// TODO(dirvine) Always start a node if possible :09/08/2015
pub fn new(event_sender : mpsc::Sender<Event>) -> Result<Routing, RoutingError> {
sodiumoxide::init(); // enable shared global (i.e. safe to multithread now)

let keys = Id::new();
let (action_sender, action_receiver) = mpsc::channel::<Action>();

// TODO (ben 5/08/2015) Errors on starting RoutingNode should more aggressively
// be handled internally
// start the handler for routing
let routing_node = match RoutingNode::new(action_sender.clone(), action_receiver,
event_sender) {
Ok(routing_node) => routing_node,
Err(e) => return Err(e),
};
// start the handler for routing without a restriction to become a full node
let mut routing_node = RoutingNode::new(action_sender.clone(), action_receiver,
event_sender, false);

spawn(move || routing_node.run());

Ok(Routing {
keys : keys,
action_sender : action_sender,
})
}

/// Starts a new RoutingIdentity, which will also start a new RoutingNode.
/// The RoutingNode will only bootstrap to the network and not attempt to
/// achieve full routing node status.
// TODO(dirvine) take an Id as a param to sign messages ???? (or amend put etc. for a client put_request to take reference to a particular ID for sign/encryt, we should be already bootstrapped anyway with the new() call :09/08/2015
// TODO(dirvine) take an Id as a param to sign messages ???? (or amend put etc. for a client
// put_request to take reference to a particular ID for sign/encryt, we should be already
// bootstrapped anyway with the new() call :09/08/2015
// FIXME(dirvine) discussion required :09/08/2015
pub fn new_client(event_receiver : mpsc::Sender<Event>)

pub fn new_client(event_sender : mpsc::Sender<Event>)
-> Result<Routing, RoutingError> {
unimplemented!()
sodiumoxide::init(); // enable shared global (i.e. safe to multithread now)

let (action_sender, action_receiver) = mpsc::channel::<Action>();

// start the handler for routing with a restriction to become a full node
let mut routing_node = RoutingNode::new(action_sender.clone(), action_receiver,
event_sender, true);

spawn(move || routing_node.run());

Ok(Routing {
action_sender : action_sender,
})
}

/// Clone the interface while maintaining the same RoutingNode, with a given set of keys.
Expand All @@ -92,52 +102,84 @@ impl Routing {

/// Send a Get message with a DataRequest to an Authority, signed with given keys.
pub fn get_request(&self, location : Authority, data_request : DataRequest) {
unimplemented!()
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalRequest(ExternalRequest::Get(data_request))));
}

/// Add something to the network
pub fn put_request(&self, location : Authority, data : Data) {
unimplemented!()
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalRequest(ExternalRequest::Put(data))));
}

/// Change something already on the network
pub fn post_request(&self, location : Authority, data : Data) {
unimplemented!()
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalRequest(ExternalRequest::Post(data))));
}

/// Remove something from the network
pub fn delete_request(&self, location : Authority, data_request : DataRequest) {
unimplemented!()
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalRequest(ExternalRequest::Delete(data_request))));
}
/// Respond to a get_request (no error can be sent)
/// If we received the request from a group, we'll not get the signed_token.
pub fn get_response(&self, location : Authority, data: Data, signed_token : Option<SignedToken>) {
unimplemented!()
pub fn get_response(&self, location : Authority,
data : Data,
orig_request : ExternalRequest,
signed_token : Option<SignedToken>) {
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalResponse(
ExternalResponse::Get(data, orig_request, signed_token))));
}
// FIXME(dirvine) perhaps all responses here shoudl be a single respond_error fn instead
// FIXME(dirvine) perhaps all responses here shoudl be a single respond_error fn instead
// Also these shoudl return an error so if not yet a node they fail (if clients try and call for instance) :09/08/2015
/// response error to a put request
pub fn put_response(&self, location : Authority, response_error : ResponseError,
signed_token : SignedToken) {
unimplemented!()
pub fn put_response(&self, location : Authority,
response_error : ResponseError,
orig_request : ExternalRequest,
signed_token : Option<SignedToken>) {
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalResponse(
ExternalResponse::Put(response_error, orig_request, signed_token))));
}
/// Response error to a post request
pub fn post_response(&self, location : Authority, response_error : ResponseError,
signed_token : SignedToken) {
unimplemented!()
pub fn post_response(&self, location : Authority,
response_error : ResponseError,
orig_request : ExternalRequest,
signed_token : Option<SignedToken>) {
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalResponse(
ExternalResponse::Post(response_error, orig_request, signed_token))));
}
/// response error to a delete respons
pub fn delete_response(&self, location : Authority, response_error : ResponseError,
signed_token : SignedToken) {
unimplemented!()
pub fn delete_response(&self, location : Authority,
response_error : ResponseError,
orig_request : ExternalRequest,
signed_token : Option<SignedToken>) {
let _ = self.action_sender.send(Action::SendContent(
location,
Content::ExternalResponse(
ExternalResponse::Delete(response_error,
orig_request,
signed_token))));
}

/// Refresh the content in the close group nodes of group address content::name.
/// This method needs to be called when churn is triggered.
/// all the group members need to call this, otherwise it will not be resolved as a valid
/// content.
pub fn refresh_request(&self, type_tag: u64, from_group: NameType, content: Bytes) {
unimplemented!()
// unimplemented!()
// TODO (ben 14/08/2015) ignore refresh calls for now
}

/// Signal to RoutingNode that it needs to refuse new messages and handle all outstanding
Expand All @@ -146,8 +188,4 @@ impl Routing {
pub fn stop(&mut self) {
unimplemented!()
}

pub fn signing_public_key(&self) -> crypto::sign::PublicKey {
self.keys.signing_public_key()
}
}
Loading