diff --git a/comms/core/src/protocol/rpc/server/error.rs b/comms/core/src/protocol/rpc/server/error.rs index ea3458b4e5..a829ff6035 100644 --- a/comms/core/src/protocol/rpc/server/error.rs +++ b/comms/core/src/protocol/rpc/server/error.rs @@ -60,8 +60,17 @@ pub enum RpcServerError { ServiceCallExceededDeadline, #[error("Stream read exceeded deadline")] ReadStreamExceededDeadline, - #[error("Early close error: {0}")] - EarlyCloseError(#[from] EarlyCloseError), + #[error("Early close: {0}")] + EarlyClose(#[from] EarlyCloseError), +} + +impl RpcServerError { + pub fn early_close_io(&self) -> Option<&io::Error> { + match self { + Self::EarlyClose(e) => e.io(), + _ => None, + } + } } impl From for RpcServerError { diff --git a/comms/core/src/protocol/rpc/server/mod.rs b/comms/core/src/protocol/rpc/server/mod.rs index 6690e31418..a05a40de4f 100644 --- a/comms/core/src/protocol/rpc/server/mod.rs +++ b/comms/core/src/protocol/rpc/server/mod.rs @@ -44,6 +44,7 @@ use std::{ convert::TryFrom, future::Future, io, + io::ErrorKind, pin::Pin, sync::Arc, task::Poll, @@ -353,7 +354,7 @@ where { Ok(_) => {}, Err(err @ RpcServerError::HandshakeError(_)) => { - debug!(target: LOG_TARGET, "{}", err); + debug!(target: LOG_TARGET, "Handshake error: {}", err); metrics::handshake_error_counter(&node_id, ¬ification.protocol).inc(); }, Err(err) => { @@ -530,7 +531,7 @@ where metrics::error_counter(&self.node_id, &self.protocol, &err).inc(); let level = match &err { RpcServerError::Io(e) => err_to_log_level(e), - RpcServerError::EarlyCloseError(e) => e.io().map(err_to_log_level).unwrap_or(log::Level::Error), + RpcServerError::EarlyClose(e) => e.io().map(err_to_log_level).unwrap_or(log::Level::Error), _ => log::Level::Error, }; log!( @@ -562,8 +563,10 @@ where err, ); } - error!( + let level = err.early_close_io().map(err_to_log_level).unwrap_or(log::Level::Error); + log!( target: LOG_TARGET, + level, "(peer: {}, protocol: {}) Failed to handle request: {}", self.node_id, self.protocol_name(), @@ -880,8 +883,13 @@ fn into_response(request_id: u32, result: Result) -> RpcRe } fn err_to_log_level(err: &io::Error) -> log::Level { + error!(target: LOG_TARGET, "KIND: {}", err.kind()); match err.kind() { - io::ErrorKind::BrokenPipe | io::ErrorKind::WriteZero => log::Level::Debug, + ErrorKind::ConnectionReset | + ErrorKind::ConnectionAborted | + ErrorKind::BrokenPipe | + ErrorKind::WriteZero | + ErrorKind::UnexpectedEof => log::Level::Debug, _ => log::Level::Error, } } diff --git a/comms/dht/src/dht.rs b/comms/dht/src/dht.rs index e3acdae98a..3fef7d43a8 100644 --- a/comms/dht/src/dht.rs +++ b/comms/dht/src/dht.rs @@ -31,6 +31,7 @@ use tari_comms::{ pipeline::PipelineError, }; use tari_shutdown::ShutdownSignal; +use tari_utilities::epoch_time::EpochTime; use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tower::{layer::Layer, Service, ServiceBuilder}; @@ -298,6 +299,7 @@ impl Dht { .layer(MetricsLayer::new(self.metrics_collector.clone())) .layer(inbound::DeserializeLayer::new(self.peer_manager.clone())) .layer(filter::FilterLayer::new(self.unsupported_saf_messages_filter())) + .layer(filter::FilterLayer::new(discard_expired_messages)) .layer(inbound::DecryptionLayer::new( self.config.clone(), self.node_identity.clone(), @@ -432,6 +434,20 @@ fn filter_messages_to_rebroadcast(msg: &DecryptedDhtMessage) -> bool { } } +/// Check message expiry and immediately discard if expired +fn discard_expired_messages(msg: &DhtInboundMessage) -> bool { + if let Some(expires) = msg.dht_header.expires { + if expires < EpochTime::now() { + debug!( + target: LOG_TARGET, + "[discard_expired_messages] Discarding expired message {}", msg + ); + return false; + } + } + true +} + #[cfg(test)] mod test { use std::{sync::Arc, time::Duration}; diff --git a/comms/dht/src/envelope.rs b/comms/dht/src/envelope.rs index 3f4f2ef06e..6ac881cb80 100644 --- a/comms/dht/src/envelope.rs +++ b/comms/dht/src/envelope.rs @@ -43,7 +43,7 @@ use crate::version::DhtProtocolVersion; pub(crate) fn datetime_to_timestamp(datetime: DateTime) -> Timestamp { Timestamp { seconds: datetime.timestamp(), - nanos: datetime.timestamp_subsec_nanos().try_into().unwrap_or(std::i32::MAX), + nanos: datetime.timestamp_subsec_nanos().try_into().unwrap_or(i32::MAX), } } diff --git a/comms/dht/src/store_forward/database/stored_message.rs b/comms/dht/src/store_forward/database/stored_message.rs index b8d095d901..1913b5be02 100644 --- a/comms/dht/src/store_forward/database/stored_message.rs +++ b/comms/dht/src/store_forward/database/stored_message.rs @@ -20,8 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::TryInto; - use chrono::NaiveDateTime; use tari_comms::message::MessageExt; use tari_utilities::{hex, hex::Hex}; @@ -50,7 +48,7 @@ pub struct NewStoredMessage { } impl NewStoredMessage { - pub fn try_construct(message: DecryptedDhtMessage, priority: StoredMessagePriority) -> Option { + pub fn new(message: DecryptedDhtMessage, priority: StoredMessagePriority) -> Self { let DecryptedDhtMessage { authenticated_origin, decryption_result, @@ -64,8 +62,8 @@ impl NewStoredMessage { }; let body_hash = hex::to_hex(&dedup::create_message_hash(&dht_header.message_signature, &body)); - Some(Self { - version: dht_header.version.as_major().try_into().ok()?, + Self { + version: dht_header.version.as_major() as i32, origin_pubkey: authenticated_origin.as_ref().map(|pk| pk.to_hex()), message_type: dht_header.message_type as i32, destination_pubkey: dht_header.destination.public_key().map(|pk| pk.to_hex()), @@ -81,7 +79,7 @@ impl NewStoredMessage { }, body_hash, body, - }) + } } } diff --git a/comms/dht/src/store_forward/error.rs b/comms/dht/src/store_forward/error.rs index 4a71b410eb..85fd5678c2 100644 --- a/comms/dht/src/store_forward/error.rs +++ b/comms/dht/src/store_forward/error.rs @@ -27,7 +27,7 @@ use tari_comms::{ message::MessageError, peer_manager::{NodeId, PeerManagerError}, }; -use tari_utilities::byte_array::ByteArrayError; +use tari_utilities::{byte_array::ByteArrayError, epoch_time::EpochTime}; use thiserror::Error; use crate::{ @@ -81,10 +81,10 @@ pub enum StoreAndForwardError { RequesterChannelClosed, #[error("The request was cancelled by the store and forward service")] RequestCancelled, - #[error("The message was not valid for store and forward")] - InvalidStoreMessage, - #[error("The envelope version is invalid")] - InvalidEnvelopeVersion, + #[error("The {field} field was not valid, discarding SAF response: {details}")] + InvalidSafResponseMessage { field: &'static str, details: String }, + #[error("The message has expired, not storing message in SAF db (expiry: {expired}, now: {now})")] + NotStoringExpiredMessage { expired: EpochTime, now: EpochTime }, #[error("MalformedNodeId: {0}")] MalformedNodeId(#[from] ByteArrayError), #[error("DHT message type should not have been forwarded")] diff --git a/comms/dht/src/store_forward/message.rs b/comms/dht/src/store_forward/message.rs index f753b9941b..f74af32c61 100644 --- a/comms/dht/src/store_forward/message.rs +++ b/comms/dht/src/store_forward/message.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use chrono::{DateTime, Utc}; use prost::Message; @@ -76,10 +76,7 @@ impl TryFrom for StoredMessage { let dht_header = DhtHeader::decode(message.header.as_slice())?; Ok(Self { stored_at: Some(datetime_to_timestamp(DateTime::from_utc(message.stored_at, Utc))), - version: message - .version - .try_into() - .map_err(|_| StoreAndForwardError::InvalidEnvelopeVersion)?, + version: message.version as u32, body: message.body, dht_header: Some(dht_header), }) diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index 7f5390d382..4bce651e68 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -36,7 +36,7 @@ use tari_comms::{ types::CommsPublicKey, BytesMut, }; -use tari_utilities::{convert::try_convert_all, ByteArray}; +use tari_utilities::ByteArray; use tokio::sync::mpsc; use tower::{Service, ServiceExt}; @@ -216,7 +216,7 @@ where S: Service let messages = self.saf_requester.fetch_messages(query.clone()).await?; let stored_messages = StoredMessagesResponse { - messages: try_convert_all(messages)?, + messages: messages.into_iter().map(TryInto::try_into).collect::>()?, request_id: retrieve_msgs.request_id, response_type: resp_type as i32, }; @@ -430,8 +430,13 @@ where S: Service .stored_at .map(|t| { Result::<_, StoreAndForwardError>::Ok(DateTime::from_utc( - NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos.try_into().unwrap_or(u32::MAX)) - .ok_or(StoreAndForwardError::InvalidStoreMessage)?, + NaiveDateTime::from_timestamp_opt(t.seconds, 0).ok_or_else(|| { + StoreAndForwardError::InvalidSafResponseMessage { + field: "stored_at", + details: "number of seconds provided represents more days than can fit in a u32" + .to_string(), + } + })?, Utc, )) }) @@ -618,7 +623,7 @@ where S: Service mod test { use std::time::Duration; - use chrono::Utc; + use chrono::{Timelike, Utc}; use tari_comms::{message::MessageExt, runtime, wrap_in_envelope_body}; use tari_test_utils::collect_recv; use tari_utilities::{hex, hex::Hex}; @@ -932,7 +937,7 @@ mod test { .unwrap() .unwrap(); - assert_eq!(last_saf_received, msg2_time); + assert_eq!(last_saf_received.second(), msg2_time.second()); } #[runtime::test] diff --git a/comms/dht/src/store_forward/store.rs b/comms/dht/src/store_forward/store.rs index c0d2b8d224..70690bde94 100644 --- a/comms/dht/src/store_forward/store.rs +++ b/comms/dht/src/store_forward/store.rs @@ -437,13 +437,13 @@ where S: Service + Se ); if let Some(expires) = message.dht_header.expires { - if expires < EpochTime::now() { - return SafResult::Err(StoreAndForwardError::InvalidStoreMessage); + let now = EpochTime::now(); + if expires < now { + return Err(StoreAndForwardError::NotStoringExpiredMessage { expired: expires, now }); } } - let stored_message = - NewStoredMessage::try_construct(message, priority).ok_or(StoreAndForwardError::InvalidStoreMessage)?; + let stored_message = NewStoredMessage::new(message, priority); self.saf_requester.insert_message(stored_message).await } }