Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(messages): remove MsgEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
oetyng committed Feb 23, 2021
1 parent fa13e76 commit 57df069
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 37 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ lru_time_cache = "~0.11.0"
qp2p = "~0.9.16"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
sn_messaging = "~4.0.0"
# sn_messaging = "~4.0.0"
sn_messaging = { git = "https://github.com/oetyng/sn_messaging", branch = "all-acc-in-routing" }
# sn_messaging = { path = "../sn_messaging" }
thiserror = "1.0.23"
xor_name = "1.1.0"
resource_proof = "0.8.0"
Expand Down
4 changes: 2 additions & 2 deletions src/delivery_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub(crate) fn delivery_targets(

candidates(target_name, our_name, section, network)?
}
DstLocation::Direct => return Err(Error::CannotRoute),
DstLocation::Client(_) | DstLocation::Direct => return Err(Error::CannotRoute),
};

Ok((best_section, dg_size))
Expand Down Expand Up @@ -155,7 +155,7 @@ where
let dst_name = match dst {
DstLocation::Node(name) => *name,
DstLocation::Section(name) => *name,
DstLocation::Direct => {
DstLocation::Client(_) | DstLocation::Direct => {
error!("Invalid destination for signature targets: {:?}", dst);
return vec![];
}
Expand Down
4 changes: 2 additions & 2 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytes::Bytes;
use ed25519_dalek::Keypair;
use hex_fmt::HexFmt;
pub use qp2p::{RecvStream, SendStream};
use sn_messaging::client::MsgEnvelope;
use sn_messaging::client::Message;
use std::{
collections::BTreeSet,
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -99,7 +99,7 @@ pub enum Event {
/// Received a message from a client node.
ClientMessageReceived {
/// The content of the message.
content: Box<MsgEnvelope>,
content: Box<Message>,
/// The address of the client that sent the message.
src: SocketAddr,
},
Expand Down
12 changes: 8 additions & 4 deletions src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ impl SrcLocation {
/// Message destination location.
#[derive(Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize, Debug)]
pub enum DstLocation {
/// Destination is a client with the given name.
Client(XorName),
/// Destination is a single node with the given name.
Node(XorName),
/// Destination are the nodes of the section whose prefix matches the given name.
Expand All @@ -61,23 +63,23 @@ impl DstLocation {
pub fn is_section(&self) -> bool {
match self {
Self::Section(_) => true,
Self::Node(_) | Self::Direct => false,
Self::Client(_) | Self::Node(_) | Self::Direct => false,
}
}

/// If this location is `Node`, returns its name, otherwise `Err(BadLocation)`.
pub(crate) fn as_node(&self) -> Result<&XorName> {
match self {
Self::Node(name) => Ok(name),
Self::Section(_) | Self::Direct => Err(Error::InvalidDstLocation),
Self::Client(_) | Self::Section(_) | Self::Direct => Err(Error::InvalidDstLocation),
}
}

/// Returns `Ok` if this location is section, `Err(BadLocation)` otherwise.
pub(crate) fn check_is_section(&self) -> Result<()> {
match self {
Self::Section(_) => Ok(()),
Self::Node(_) | Self::Direct => Err(Error::InvalidDstLocation),
Self::Client(_) | Self::Node(_) | Self::Direct => Err(Error::InvalidDstLocation),
}
}

Expand All @@ -90,15 +92,17 @@ impl DstLocation {
assert!(prefix.matches(name));

match self {
Self::Client(self_name) => name == self_name,
Self::Node(self_name) => name == self_name,
Self::Section(self_name) => prefix.matches(self_name),
Self::Direct => true,
}
}

/// Returns the name of this location, or `None` if it is `Direct`.
pub(crate) fn name(&self) -> Option<&XorName> {
pub fn name(&self) -> Option<&XorName> {
match self {
Self::Client(name) => Some(name),
Self::Node(name) => Some(name),
Self::Section(name) => Some(name),
Self::Direct => None,
Expand Down
25 changes: 9 additions & 16 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ use crate::{
use bytes::Bytes;
use ed25519_dalek::{Keypair, PublicKey, Signature, Signer};
use itertools::Itertools;
use sn_messaging::{
client::MsgEnvelope,
infrastructure::{ErrorResponse, Message as InfrastructureMessage},
node::NodeMessage,
MessageType, WireMsg,
};
use sn_messaging::{network_info::{ErrorResponse, Message as NetworkInfoMsg}, client::Message as ClientMessage, node::NodeMessage, MessageType, WireMsg};
use std::{net::SocketAddr, sync::Arc};
use tokio::{sync::mpsc, task};
use xor_name::{Prefix, XorName};
Expand Down Expand Up @@ -322,7 +317,7 @@ impl Routing {
pub async fn send_message_to_client(
&self,
recipient: SocketAddr,
message: MsgEnvelope,
message: ClientMessage,
) -> Result<()> {
let command = Command::SendMessage {
recipients: vec![recipient],
Expand Down Expand Up @@ -391,7 +386,7 @@ async fn handle_message(stage: Arc<Stage>, bytes: Bytes, sender: SocketAddr) {
MessageType::Ping => {
// Pings are not handled
}
MessageType::InfrastructureMessage(message) => {
MessageType::NetworkInfo(message) => {
let command = Command::HandleInfrastructureMessage { sender, message };
let _ = task::spawn(stage.handle_commands(command));
}
Expand All @@ -412,18 +407,16 @@ async fn handle_message(stage: Arc<Stage>, bytes: Bytes, sender: SocketAddr) {
}
}
}
MessageType::ClientMessage(msg_envelope) => {
if let Some(client_pk) = msg_envelope.message.target_section_pk() {
MessageType::ClientMessage(message) => {
if let Some(client_pk) = message.target_section_pk() {
if let Some(bls_pk) = client_pk.bls() {
if let Err(error) = stage.check_key_status(&bls_pk).await {
let incoming_msg = msg_envelope.message;
let correlation_id = incoming_msg.id();

let correlation_id = message.id();
let command = Command::SendMessage {
recipients: vec![sender],
delivery_group_size: 1,
message: MessageType::InfrastructureMessage(
InfrastructureMessage::InfrastructureUpdate(ErrorResponse {
message: MessageType::NetworkInfo(
NetworkInfoMsg::NetworkInfoUpdate(ErrorResponse {
correlation_id,
error,
}),
Expand All @@ -436,7 +429,7 @@ async fn handle_message(stage: Arc<Stage>, bytes: Bytes, sender: SocketAddr) {
}

let event = Event::ClientMessageReceived {
content: Box::new(msg_envelope),
content: Box::new(message),
src: sender,
};

Expand Down
6 changes: 3 additions & 3 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use super::{bootstrap, Approved, Comm, Command};
use crate::{error::Result, event::Event, relocation::SignedRelocateDetails};
use sn_messaging::{infrastructure::Error as InfrastructureError, MessageType};
use sn_messaging::{network_info::Error as TargetSectionError, MessageType};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, watch, Mutex},
Expand Down Expand Up @@ -178,7 +178,7 @@ impl Stage {
pub async fn check_key_status(
&self,
bls_pk: &bls::PublicKey,
) -> Result<(), InfrastructureError> {
) -> Result<(), TargetSectionError> {
self.state.lock().await.check_key_status(bls_pk)
}

Expand Down Expand Up @@ -212,7 +212,7 @@ impl Stage {
}
vec![]
}
MessageType::InfrastructureMessage(_) => {
MessageType::NetworkInfo(_) => {
for recipient in recipients {
let _ = self
.comm
Expand Down
12 changes: 3 additions & 9 deletions tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use bytes::Bytes;
use qp2p::QuicP2p;
use sn_data_types::Keypair;
use sn_messaging::{
client::{Message, MessageId, MsgEnvelope, MsgSender, Query, TransferQuery},
client::{Message, MessageId, Query, TransferQuery},
MessageType, WireMsg,
};
use sn_routing::{Config, DstLocation, Error, Event, NodeElderChange, SrcLocation};
Expand All @@ -33,7 +33,6 @@ async fn test_messages_client_node() -> Result<()> {
let mut rng = rand::thread_rng();
let keypair = Keypair::new_ed25519(&mut rng);
let pk = keypair.public_key();
let signature = keypair.sign(b"blabla");

let random_xor = XorName::random();
let id = MessageId(random_xor);
Expand All @@ -43,12 +42,7 @@ async fn test_messages_client_node() -> Result<()> {
target_section_pk: None,
};

let msg_envelope = MsgEnvelope {
message,
origin: MsgSender::client(pk, signature)?,
proxies: vec![],
};
let msg_envelope_clone = msg_envelope.clone();
let message_clone = message.clone();

let node_addr = node.our_connection_info();
// spawn node events listener
Expand Down Expand Up @@ -77,7 +71,7 @@ async fn test_messages_client_node() -> Result<()> {
let (client_endpoint, _, mut incoming_messages, _) = client.new_endpoint().await?;
client_endpoint.connect_to(&node_addr).await?;

let client_msg_bytes = WireMsg::serialize_client_msg(&msg_envelope)?;
let client_msg_bytes = WireMsg::serialize_client_msg(&message)?;

client_endpoint
.send_message(client_msg_bytes, &node_addr)
Expand Down

0 comments on commit 57df069

Please sign in to comment.