From b59ce591f2740d28532d81c65e8fadddc077b5dd Mon Sep 17 00:00:00 2001 From: brianp Date: Fri, 29 Sep 2023 21:28:13 +0200 Subject: [PATCH 01/12] Add new confirmation types --- .../down.sql | 2 +- .../down.sql | 2 + .../up.sql | 2 + .../storage/types/messages.rs | 8 ++ .../contacts_service/types/confirmation.rs | 47 ++++++++++++ .../src/contacts_service/types/message.rs | 6 ++ .../types/message_dispatch.rs | 76 +++++++++++++++++++ .../src/contacts_service/types/mod.rs | 6 ++ base_layer/contacts/src/schema.rs | 2 + 9 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql create mode 100644 base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql create mode 100644 base_layer/contacts/src/contacts_service/types/confirmation.rs create mode 100644 base_layer/contacts/src/contacts_service/types/message_dispatch.rs diff --git a/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql b/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql index edfc520ef0..67d5f203b3 100644 --- a/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql +++ b/base_layer/contacts/migrations/2023-09-05-132343_add_metadata_to_messages/down.sql @@ -1 +1 @@ -ALTER TABLE contacts drop metadata; +ALTER TABLE messages drop metadata; diff --git a/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql new file mode 100644 index 0000000000..7a105dcd03 --- /dev/null +++ b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE messages drop delivery_confirmation_at; +ALTER TABLE messages drop read_confirmation_at; diff --git a/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql new file mode 100644 index 0000000000..d4130f8940 --- /dev/null +++ b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql @@ -0,0 +1,2 @@ + ALTER TABLE messages ADD delivery_confirmation_at TIMESTAMP; + ALTER TABLE messages ADD read_confirmation_at TIMESTAMP; diff --git a/base_layer/contacts/src/contacts_service/storage/types/messages.rs b/base_layer/contacts/src/contacts_service/storage/types/messages.rs index 47506388bd..a2c6dca4a5 100644 --- a/base_layer/contacts/src/contacts_service/storage/types/messages.rs +++ b/base_layer/contacts/src/contacts_service/storage/types/messages.rs @@ -45,6 +45,8 @@ pub struct MessagesSqlInsert { pub body: Vec, pub metadata: Vec, pub stored_at: NaiveDateTime, + pub delivery_confirmation_at: NaiveDateTime, + pub read_confirmation_at: NaiveDateTime, pub direction: i32, } @@ -57,6 +59,8 @@ pub struct MessagesSql { pub body: Vec, pub metadata: Vec, pub stored_at: NaiveDateTime, + pub delivery_confirmation_at: NaiveDateTime, + pub read_confirmation_at: NaiveDateTime, pub direction: i32, } @@ -106,6 +110,8 @@ impl TryFrom for Message { ) .unwrap_or_else(|| panic!("Direction from byte {}", o.direction)), stored_at: o.stored_at.timestamp() as u64, + delivery_confirmation_at: o.stored_at.timestamp() as u64, + read_confirmation_at: o.stored_at.timestamp() as u64, body: o.body, metadata, message_id: o.message_id, @@ -127,6 +133,8 @@ impl TryFrom for MessagesSqlInsert { body: o.body, metadata: metadata.into_bytes().to_vec(), stored_at: NaiveDateTime::from_timestamp_opt(o.stored_at as i64, 0).unwrap(), + delivery_confirmation_at: NaiveDateTime::from_timestamp_opt(o.delivery_confirmation_at as i64, 0).unwrap(), + read_confirmation_at: NaiveDateTime::from_timestamp_opt(o.read_confirmation_at as i64, 0).unwrap(), direction: i32::from(o.direction.as_byte()), }) } diff --git a/base_layer/contacts/src/contacts_service/types/confirmation.rs b/base_layer/contacts/src/contacts_service/types/confirmation.rs new file mode 100644 index 0000000000..f41b06a555 --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/confirmation.rs @@ -0,0 +1,47 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::contacts_service::proto; + +#[derive(Clone, Debug, Default)] +pub struct Confirmation { + pub message_id: Vec, + pub timestamp: u64, +} + +impl From for Confirmation { + fn from(confirmation: proto::Confirmation) -> Self { + Self { + message_id: confirmation.message_id, + timestamp: confirmation.timestamp, + } + } +} + +impl From for proto::Confirmation { + fn from(confirmation: Confirmation) -> Self { + Self { + message_id: confirmation.message_id, + timestamp: confirmation.timestamp, + } + } +} diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs index 46d9fc3767..3a9bd00d45 100644 --- a/base_layer/contacts/src/contacts_service/types/message.rs +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -39,6 +39,8 @@ pub struct Message { pub address: TariAddress, pub direction: Direction, pub stored_at: u64, + pub delivery_confirmation_at: u64, + pub read_confirmation_at: u64, pub message_id: Vec, } @@ -109,6 +111,8 @@ impl TryFrom for Message { // A Message from a proto::Message will always be an inbound message direction: Direction::Inbound, stored_at: message.stored_at, + delivery_confirmation_at: message.delivery_confirmation_at, + read_confirmation_at: message.read_confirmation_at, message_id: message.message_id, }) } @@ -126,6 +130,8 @@ impl From for proto::Message { address: message.address.to_bytes().to_vec(), direction: i32::from(message.direction.as_byte()), stored_at: message.stored_at, + delivery_confirmation_at: message.delivery_confirmation_at, + read_confirmation_at: message.read_confirmation_at, message_id: message.message_id, } } diff --git a/base_layer/contacts/src/contacts_service/types/message_dispatch.rs b/base_layer/contacts/src/contacts_service/types/message_dispatch.rs new file mode 100644 index 0000000000..cf3c6fa36d --- /dev/null +++ b/base_layer/contacts/src/contacts_service/types/message_dispatch.rs @@ -0,0 +1,76 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use tari_comms_dht::domain_message::OutboundDomainMessage; +use tari_p2p::tari_message::TariMessageType; + +use crate::contacts_service::{ + proto, + types::{Confirmation, Message}, +}; + +pub enum MessageDispatch { + Message(Message), + DeliveryConfirmation(Confirmation), + ReadConfirmation(Confirmation), +} + +impl TryFrom for MessageDispatch { + type Error = String; + + fn try_from(dispatch: proto::MessageDispatch) -> Result { + Ok(match dispatch.contents { + Some(proto::message_dispatch::Contents::Message(m)) => MessageDispatch::Message(Message::try_from(m)?), + Some(proto::message_dispatch::Contents::DeliveryConfirmation(c)) => { + MessageDispatch::DeliveryConfirmation(Confirmation::from(c)) + }, + Some(proto::message_dispatch::Contents::ReadConfirmation(c)) => { + MessageDispatch::ReadConfirmation(Confirmation::from(c)) + }, + None => return Err("We didn't get any known type of chat message".to_string()), + }) + } +} + +impl From for proto::MessageDispatch { + fn from(dispatch: MessageDispatch) -> Self { + let content = match dispatch { + MessageDispatch::Message(m) => proto::message_dispatch::Contents::Message(m.into()), + MessageDispatch::DeliveryConfirmation(c) => { + proto::message_dispatch::Contents::DeliveryConfirmation(c.into()) + }, + MessageDispatch::ReadConfirmation(c) => proto::message_dispatch::Contents::ReadConfirmation(c.into()), + }; + + Self { + contents: Some(content), + } + } +} + +impl From for OutboundDomainMessage { + fn from(dispatch: MessageDispatch) -> Self { + Self::new(&TariMessageType::Chat, dispatch.into()) + } +} diff --git a/base_layer/contacts/src/contacts_service/types/mod.rs b/base_layer/contacts/src/contacts_service/types/mod.rs index d2bc7105af..16fdb38a46 100644 --- a/base_layer/contacts/src/contacts_service/types/mod.rs +++ b/base_layer/contacts/src/contacts_service/types/mod.rs @@ -28,3 +28,9 @@ pub use message::{Direction, Message, MessageMetadata, MessageMetadataType}; mod message_builder; pub use message_builder::MessageBuilder; + +mod message_dispatch; +pub use message_dispatch::MessageDispatch; + +mod confirmation; +pub use confirmation::Confirmation; diff --git a/base_layer/contacts/src/schema.rs b/base_layer/contacts/src/schema.rs index f3b921bca3..ad77ae8d2b 100644 --- a/base_layer/contacts/src/schema.rs +++ b/base_layer/contacts/src/schema.rs @@ -18,6 +18,8 @@ diesel::table! { body -> Binary, metadata -> Binary, stored_at -> Timestamp, + delivery_confirmation_at -> Timestamp, + read_confirmation_at -> Timestamp, direction -> Integer, } } From 19a6b93ae744e9fcccdb919ea89d026581e01a37 Mon Sep 17 00:00:00 2001 From: brianp Date: Fri, 29 Sep 2023 21:28:48 +0200 Subject: [PATCH 02/12] Handle a wrapping of the messages --- base_layer/contacts/proto/message.proto | 19 +++++- .../contacts/src/contacts_service/handle.rs | 1 + .../contacts/src/contacts_service/service.rs | 67 ++++++++++++++----- 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/base_layer/contacts/proto/message.proto b/base_layer/contacts/proto/message.proto index 41301eb01c..714cde07b8 100644 --- a/base_layer/contacts/proto/message.proto +++ b/base_layer/contacts/proto/message.proto @@ -10,7 +10,9 @@ message Message { bytes address = 3; DirectionEnum direction = 4; uint64 stored_at = 5; - bytes message_id = 6; + uint64 delivery_confirmation_at = 6; + uint64 read_confirmation_at = 7; + bytes message_id = 8; } enum DirectionEnum { @@ -25,4 +27,17 @@ message MessageMetadata { enum MessageTypeEnum { TokenRequest = 0; -} \ No newline at end of file +} + +message Confirmation { + bytes message_id = 1; + uint64 timestamp = 2; +} + +message MessageDispatch { + oneof contents { + Message message = 1; + Confirmation delivery_confirmation = 2; + Confirmation read_confirmation = 3; + } +} diff --git a/base_layer/contacts/src/contacts_service/handle.rs b/base_layer/contacts/src/contacts_service/handle.rs index 3ea70aeb89..6640b3a40d 100644 --- a/base_layer/contacts/src/contacts_service/handle.rs +++ b/base_layer/contacts/src/contacts_service/handle.rs @@ -137,6 +137,7 @@ pub enum ContactsServiceRequest { GetContactOnlineStatus(Contact), SendMessage(TariAddress, Message), GetMessages(TariAddress, i64, i64), + SendDeliveryConfirmation(TariAddress, Vec), } #[derive(Debug)] diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 47c57f6ac6..8dc5ccc6dd 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -32,7 +32,10 @@ use chrono::{NaiveDateTime, Utc}; use futures::{pin_mut, StreamExt}; use log::*; use tari_common_types::tari_address::TariAddress; -use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester}; +use tari_comms::{ + connectivity::{ConnectivityEvent, ConnectivityRequester}, + types::CommsPublicKey, +}; use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundEncryption, Dht}; use tari_p2p::{ comms_connector::SubscriptionFactory, @@ -53,7 +56,7 @@ use crate::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceRequest, ContactsServiceResponse}, proto, storage::database::{ContactsBackend, ContactsDatabase}, - types::{Contact, Message}, + types::{Contact, Message, MessageDispatch}, }; const LOG_TARGET: &str = "contacts::contacts_service"; @@ -189,7 +192,7 @@ where T: ContactsBackend + 'static let chat_messages = self .subscription_factory .get_subscription(TariMessageType::Chat, SUBSCRIPTION_LABEL) - .map(map_decode::); + .map(map_decode::); pin_mut!(chat_messages); @@ -301,7 +304,7 @@ where T: ContactsBackend + 'static Err(_) => Contact::from(&address), }; - let ob_message = OutboundDomainMessage::from(message.clone()); + let ob_message = OutboundDomainMessage::from(MessageDispatch::Message(message.clone())); let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); match self.get_online_status(&contact).await { @@ -332,6 +335,9 @@ where T: ContactsBackend + 'static Ok(ContactsServiceResponse::MessageSent) }, + ContactsServiceRequest::SendDeliveryConfirmation(address, message_id) => { + Ok(ContactsServiceResponse::MessageSent) + }, } } @@ -406,7 +412,7 @@ where T: ContactsBackend + 'static async fn handle_incoming_message( &mut self, - msg: DomainMessage>, + msg: DomainMessage>, ) -> Result<(), ContactsServiceError> { let msg_inner = match &msg.inner { Ok(msg) => msg.clone(), @@ -421,22 +427,16 @@ where T: ContactsBackend + 'static }, }; if let Some(source_public_key) = msg.authenticated_origin { - let message = Message::try_from(msg_inner).map_err(ContactsServiceError::MessageParsingError)?; + let dispatch = MessageDispatch::try_from(msg_inner).map_err(ContactsServiceError::MessageParsingError)?; - let our_message = Message { - address: TariAddress::from_public_key(&source_public_key, message.address.network()), - stored_at: EpochTime::now().as_u64(), - ..message - }; - - self.db.save_message(our_message.clone())?; - - let _msg = self.message_publisher.send(Arc::new(our_message)); + match dispatch { + MessageDispatch::Message(m) => self.handle_chat_message(m, source_public_key).await, + MessageDispatch::DeliveryConfirmation(_) => Ok(()), + MessageDispatch::ReadConfirmation(_) => Ok(()), + } } else { - return Err(ContactsServiceError::MessageSourceDoesNotMatchOrigin); + Err(ContactsServiceError::MessageSourceDoesNotMatchOrigin) } - - Ok(()) } async fn get_online_status(&self, contact: &Contact) -> Result { @@ -559,4 +559,35 @@ where T: ContactsBackend + 'static _ => {}, } } + + async fn create_and_send_delivery_confirmation_for_msg(&mut self, message: &Message) { + // let address = &message.address; + + // let outbound_comms = self.dht.outbound_requester(); + } + + async fn handle_chat_message( + &mut self, + message: Message, + source_public_key: CommsPublicKey, + ) -> Result<(), ContactsServiceError> { + let our_message = Message { + address: TariAddress::from_public_key(&source_public_key, message.address.network()), + stored_at: EpochTime::now().as_u64(), + delivery_confirmation_at: EpochTime::now().as_u64(), + ..message + }; + + match self.db.save_message(our_message.clone()) { + Ok(..) => { + let _msg = self.message_publisher.send(Arc::new(our_message.clone())); + // Send a delivery notification + + self.create_and_send_delivery_confirmation_for_msg(&our_message).await; + + Ok(()) + }, + Err(e) => Err(e.into()), + } + } } From a5db30f65ba6cd4830f8c23322fa0f8706f9e23f Mon Sep 17 00:00:00 2001 From: brianp Date: Sun, 1 Oct 2023 19:08:11 +0200 Subject: [PATCH 03/12] Refactor the deliver message function --- .../contacts/src/contacts_service/service.rs | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 8dc5ccc6dd..2fc64d0803 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -299,36 +299,8 @@ where T: ContactsBackend + 'static Ok(result.map(ContactsServiceResponse::Messages)?) }, ContactsServiceRequest::SendMessage(address, mut message) => { - let contact = match self.db.get_contact(address.clone()) { - Ok(contact) => contact, - Err(_) => Contact::from(&address), - }; - let ob_message = OutboundDomainMessage::from(MessageDispatch::Message(message.clone())); - let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); - - match self.get_online_status(&contact).await { - Ok(ContactOnlineStatus::Online) => { - info!(target: LOG_TARGET, "Chat message being sent directed"); - let mut comms_outbound = self.dht.outbound_requester(); - - comms_outbound - .send_direct_encrypted( - address.public_key().clone(), - ob_message, - encryption, - "contact service messaging".to_string(), - ) - .await?; - }, - Err(e) => return Err(e), - _ => { - let mut comms_outbound = self.dht.outbound_requester(); - comms_outbound - .closest_broadcast(address.public_key().clone(), encryption, vec![], ob_message) - .await?; - }, - } + self.deliver_message(address, ob_message).await?; message.stored_at = Utc::now().naive_utc().timestamp() as u64; self.db.save_message(message)?; @@ -590,4 +562,41 @@ where T: ContactsBackend + 'static Err(e) => Err(e.into()), } } + + async fn deliver_message( + &mut self, + address: TariAddress, + message: OutboundDomainMessage, + ) -> Result<(), ContactsServiceError> { + let contact = match self.db.get_contact(address.clone()) { + Ok(contact) => contact, + Err(_) => Contact::from(&address), + }; + let encryption = OutboundEncryption::EncryptFor(Box::new(address.public_key().clone())); + + match self.get_online_status(&contact).await { + Ok(ContactOnlineStatus::Online) => { + info!(target: LOG_TARGET, "Chat message being sent directed"); + let mut comms_outbound = self.dht.outbound_requester(); + + comms_outbound + .send_direct_encrypted( + address.public_key().clone(), + message, + encryption, + "contact service messaging".to_string(), + ) + .await?; + }, + Err(e) => return Err(e), + _ => { + let mut comms_outbound = self.dht.outbound_requester(); + comms_outbound + .closest_broadcast(address.public_key().clone(), encryption, vec![], message) + .await?; + }, + }; + + Ok(()) + } } From 29b57a6c6230a7936599a1ce6c64a56b158787f2 Mon Sep 17 00:00:00 2001 From: brianp Date: Sun, 1 Oct 2023 19:09:16 +0200 Subject: [PATCH 04/12] Send and register delivery notifications --- .../contacts/src/contacts_service/service.rs | 47 +++++++++++++---- .../src/contacts_service/storage/database.rs | 21 ++++++++ .../src/contacts_service/storage/sqlite_db.rs | 21 +++++++- .../storage/types/messages.rs | 32 ++++++++++++ .../types/message_dispatch.rs | 1 + integration_tests/tests/features/Chat.feature | 7 +++ integration_tests/tests/steps/chat_steps.rs | 50 +++++++++++++++++++ 7 files changed, 167 insertions(+), 12 deletions(-) diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 2fc64d0803..4d3f08f800 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -56,7 +56,7 @@ use crate::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceRequest, ContactsServiceResponse}, proto, storage::database::{ContactsBackend, ContactsDatabase}, - types::{Contact, Message, MessageDispatch}, + types::{Confirmation, Contact, Message, MessageDispatch}, }; const LOG_TARGET: &str = "contacts::contacts_service"; @@ -403,8 +403,9 @@ where T: ContactsBackend + 'static match dispatch { MessageDispatch::Message(m) => self.handle_chat_message(m, source_public_key).await, - MessageDispatch::DeliveryConfirmation(_) => Ok(()), - MessageDispatch::ReadConfirmation(_) => Ok(()), + MessageDispatch::DeliveryConfirmation(_) | MessageDispatch::ReadConfirmation(_) => { + self.handle_confirmation(dispatch.clone()).await + }, } } else { Err(ContactsServiceError::MessageSourceDoesNotMatchOrigin) @@ -532,12 +533,6 @@ where T: ContactsBackend + 'static } } - async fn create_and_send_delivery_confirmation_for_msg(&mut self, message: &Message) { - // let address = &message.address; - - // let outbound_comms = self.dht.outbound_requester(); - } - async fn handle_chat_message( &mut self, message: Message, @@ -553,9 +548,9 @@ where T: ContactsBackend + 'static match self.db.save_message(our_message.clone()) { Ok(..) => { let _msg = self.message_publisher.send(Arc::new(our_message.clone())); - // Send a delivery notification - self.create_and_send_delivery_confirmation_for_msg(&our_message).await; + // Send a delivery notification + self.create_and_send_delivery_confirmation_for_msg(&our_message).await?; Ok(()) }, @@ -563,6 +558,36 @@ where T: ContactsBackend + 'static } } + async fn create_and_send_delivery_confirmation_for_msg( + &mut self, + message: &Message, + ) -> Result<(), ContactsServiceError> { + let address = &message.address; + let confirmation = MessageDispatch::DeliveryConfirmation(Confirmation { + message_id: message.message_id.clone(), + timestamp: message.delivery_confirmation_at, + }); + let msg = OutboundDomainMessage::from(confirmation); + + self.deliver_message(address.clone(), msg).await + } + + async fn handle_confirmation(&mut self, dispatch: MessageDispatch) -> Result<(), ContactsServiceError> { + let (message_id, delivery, read) = match dispatch { + MessageDispatch::DeliveryConfirmation(c) => (c.message_id, Some(c.timestamp), None), + MessageDispatch::ReadConfirmation(c) => (c.message_id, None, Some(c.timestamp)), + _ => { + return Err(ContactsServiceError::MessageParsingError( + "Incorrect confirmation type".to_string(), + )) + }, + }; + + self.db.confirm_message(message_id, delivery, read)?; + + Ok(()) + } + async fn deliver_message( &mut self, address: TariAddress, diff --git a/base_layer/contacts/src/contacts_service/storage/database.rs b/base_layer/contacts/src/contacts_service/storage/database.rs index a80de6fe74..fe8e22532c 100644 --- a/base_layer/contacts/src/contacts_service/storage/database.rs +++ b/base_layer/contacts/src/contacts_service/storage/database.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + convert::TryFrom, fmt::{Display, Error, Formatter}, sync::Arc, }; @@ -50,6 +51,7 @@ pub enum DbKey { Contact(TariAddress), ContactId(NodeId), Contacts, + Message(Vec), Messages(TariAddress, i64, i64), } @@ -64,6 +66,7 @@ pub enum DbValue { #[allow(clippy::large_enum_variant)] pub enum DbKeyValuePair { Contact(TariAddress, Contact), + MessageConfirmations(Vec, Option, Option), LastSeen(NodeId, NaiveDateTime, Option), } @@ -188,6 +191,23 @@ where T: ContactsBackend + 'static Ok(()) } + + pub fn confirm_message( + &self, + message_id: Vec, + delivery_confirmation: Option, + read_confirmation: Option, + ) -> Result<(), ContactsServiceStorageError> { + self.db + .write(WriteOperation::Upsert(Box::new(DbKeyValuePair::MessageConfirmations( + message_id, + delivery_confirmation + .map(|d| NaiveDateTime::from_timestamp_opt(i64::try_from(d).unwrap_or(0), 0).unwrap()), + read_confirmation.map(|d| NaiveDateTime::from_timestamp_opt(i64::try_from(d).unwrap_or(0), 0).unwrap()), + ))))?; + + Ok(()) + } } fn unexpected_result(req: DbKey, res: DbValue) -> Result { @@ -203,6 +223,7 @@ impl Display for DbKey { DbKey::ContactId(id) => f.write_str(&format!("Contact: {:?}", id)), DbKey::Contacts => f.write_str("Contacts"), DbKey::Messages(c, _l, _p) => f.write_str(&format!("Messages for id: {:?}", c)), + DbKey::Message(m) => f.write_str(&format!("Message for id: {:?}", m)), } } } diff --git a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs index aba31a0da0..4de0cdf77b 100644 --- a/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs +++ b/base_layer/contacts/src/contacts_service/storage/sqlite_db.rs @@ -34,7 +34,7 @@ use crate::contacts_service::{ database::{ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation}, types::{ contacts::{ContactSql, UpdateContact}, - messages::{MessagesSql, MessagesSqlInsert}, + messages::{MessageUpdate, MessagesSql, MessagesSqlInsert}, }, }, types::{Contact, Message}, @@ -116,6 +116,11 @@ where TContactServiceDbConnection: PooledDbConnection return Err(e), } }, + DbKey::Message(id) => match MessagesSql::find_by_message_id(&id.to_vec(), &mut conn) { + Ok(c) => Some(DbValue::Message(Box::new(Message::try_from(c)?))), + Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => None, + Err(e) => return Err(e), + }, }; Ok(result) @@ -126,6 +131,16 @@ where TContactServiceDbConnection: PooledDbConnection match *kvp { + DbKeyValuePair::MessageConfirmations(k, d, r) => { + if MessagesSql::find_by_message_id_and_update(&mut conn, &k, MessageUpdate { + delivery_confirmation_at: d, + read_confirmation_at: r, + }) + .is_err() + { + MessagesSql::find_by_message_id(&k, &mut conn)?; + } + }, DbKeyValuePair::Contact(k, c) => { if ContactSql::find_by_address_and_update(&mut conn, &k.to_bytes(), UpdateContact { alias: Some(c.clone().alias), @@ -155,6 +170,9 @@ where TContactServiceDbConnection: PooledDbConnection return Err(ContactsServiceStorageError::OperationNotSupported), + DbKeyValuePair::MessageConfirmations(..) => { + return Err(ContactsServiceStorageError::OperationNotSupported) + }, }, WriteOperation::Remove(k) => match k { DbKey::Contact(k) => match ContactSql::find_by_address_and_delete(&mut conn, &k.to_bytes()) { @@ -173,6 +191,7 @@ where TContactServiceDbConnection: PooledDbConnection return Err(ContactsServiceStorageError::OperationNotSupported), DbKey::Messages(_pk, _l, _p) => return Err(ContactsServiceStorageError::OperationNotSupported), + DbKey::Message(_id) => return Err(ContactsServiceStorageError::OperationNotSupported), }, WriteOperation::Insert(i) => { if let DbValue::Message(m) = *i { diff --git a/base_layer/contacts/src/contacts_service/storage/types/messages.rs b/base_layer/contacts/src/contacts_service/storage/types/messages.rs index a2c6dca4a5..686ba16ab2 100644 --- a/base_layer/contacts/src/contacts_service/storage/types/messages.rs +++ b/base_layer/contacts/src/contacts_service/storage/types/messages.rs @@ -25,6 +25,7 @@ use std::convert::TryFrom; use chrono::NaiveDateTime; use diesel::{prelude::*, SqliteConnection}; use serde_json; +use tari_common_sqlite::util::diesel_ext::ExpectedRowsExtension; use tari_common_types::tari_address::TariAddress; use crate::{ @@ -63,6 +64,13 @@ pub struct MessagesSql { pub read_confirmation_at: NaiveDateTime, pub direction: i32, } +#[derive(Clone, Debug, AsChangeset, PartialEq, Eq)] +#[diesel(table_name = messages)] +#[diesel(primary_key(message_id))] +pub struct MessageUpdate { + pub delivery_confirmation_at: Option, + pub read_confirmation_at: Option, +} impl MessagesSqlInsert { /// Write this struct to the database @@ -89,6 +97,30 @@ impl MessagesSql { .limit(limit) .load::(conn)?) } + + /// Find a particular message by its message_id + pub fn find_by_message_id( + message_id: &[u8], + conn: &mut SqliteConnection, + ) -> Result { + Ok(messages::table + .filter(messages::message_id.eq(message_id)) + .first::(conn)?) + } + + /// Find a particular Message by message_id, and update it if it exists, returning the affected record + pub fn find_by_message_id_and_update( + conn: &mut SqliteConnection, + message_id: &[u8], + updated_message: MessageUpdate, + ) -> Result { + // Note: `get_result` not implemented for SQLite + diesel::update(messages::table.filter(messages::message_id.eq(message_id))) + .set(updated_message) + .execute(conn) + .num_rows_affected_or_not_found(1)?; + MessagesSql::find_by_message_id(message_id, conn) + } } /// Conversion from an Message to the Sql datatype form diff --git a/base_layer/contacts/src/contacts_service/types/message_dispatch.rs b/base_layer/contacts/src/contacts_service/types/message_dispatch.rs index cf3c6fa36d..0f6d81952b 100644 --- a/base_layer/contacts/src/contacts_service/types/message_dispatch.rs +++ b/base_layer/contacts/src/contacts_service/types/message_dispatch.rs @@ -30,6 +30,7 @@ use crate::contacts_service::{ types::{Confirmation, Message}, }; +#[derive(Clone)] pub enum MessageDispatch { Message(Message), DeliveryConfirmation(Confirmation), diff --git a/integration_tests/tests/features/Chat.feature b/integration_tests/tests/features/Chat.feature index f18d34031f..c192d2dbb0 100644 --- a/integration_tests/tests/features/Chat.feature +++ b/integration_tests/tests/features/Chat.feature @@ -48,3 +48,10 @@ Feature: Chat messaging Then CHAT_A will have 2 messages with CHAT_B Then CHAT_A will have 1 messages with CHAT_C + Scenario: A message receives a delivery receipt + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps diff --git a/integration_tests/tests/steps/chat_steps.rs b/integration_tests/tests/steps/chat_steps.rs index 7dc277e2b0..27f91316c5 100644 --- a/integration_tests/tests/steps/chat_steps.rs +++ b/integration_tests/tests/steps/chat_steps.rs @@ -114,6 +114,7 @@ async fn i_reply_to_message( panic!("Never received incoming chat message",) } +#[when(expr = "{word} will have {int} message(s) with {word}")] #[then(expr = "{word} will have {int} message(s) with {word}")] async fn receive_n_messages(world: &mut TariWorld, receiver: String, message_count: u64, sender: String) { let receiver = world.chat_clients.get(&receiver).unwrap(); @@ -224,3 +225,52 @@ async fn have_replied_message(world: &mut TariWorld, receiver: String, sender: S panic!("Never received incoming chat message",) } + +#[then(regex = r"^(.+) and (.+) will have a message '(.+)' with matching delivery timestamps")] +async fn matching_delivery_timestamps(world: &mut TariWorld, sender: String, receiver: String, msg: String) { + let client_1 = world.chat_clients.get(&receiver).unwrap(); + let client_2 = world.chat_clients.get(&sender).unwrap(); + let client_1_address = TariAddress::from_public_key(client_1.identity().public_key(), Network::LocalNet); + let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); + + for _a in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let client_1_messages: Vec = (*client_1) + .get_messages(&client_2_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_1_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_1_message = client_1_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + let client_2_messages: Vec = (*client_2) + .get_messages(&client_1_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_2_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_2_message = client_2_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + assert_eq!( + client_1_message.delivery_confirmation_at, + client_2_message.delivery_confirmation_at + ); + + return; + } + + panic!("Never received incoming chat message",) +} From c41a4e19e2a02734a4739a1ec9a85286ec98fe6d Mon Sep 17 00:00:00 2001 From: brianp Date: Sun, 1 Oct 2023 19:47:08 +0200 Subject: [PATCH 05/12] Make the confirmation fields nullable --- .../up.sql | 4 ++-- base_layer/contacts/proto/message.proto | 4 ++-- base_layer/contacts/src/contacts_service/service.rs | 8 ++++++-- .../src/contacts_service/storage/types/messages.rs | 12 ++++-------- .../contacts/src/contacts_service/types/message.rs | 4 ++-- base_layer/contacts/src/schema.rs | 4 ++-- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql index d4130f8940..b634514a74 100644 --- a/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql +++ b/base_layer/contacts/migrations/2023-09-26-083422_add_delivery_confirmations_to_messages/up.sql @@ -1,2 +1,2 @@ - ALTER TABLE messages ADD delivery_confirmation_at TIMESTAMP; - ALTER TABLE messages ADD read_confirmation_at TIMESTAMP; + ALTER TABLE messages ADD delivery_confirmation_at TIMESTAMP NULL; + ALTER TABLE messages ADD read_confirmation_at TIMESTAMP NULL; diff --git a/base_layer/contacts/proto/message.proto b/base_layer/contacts/proto/message.proto index 714cde07b8..61a7768915 100644 --- a/base_layer/contacts/proto/message.proto +++ b/base_layer/contacts/proto/message.proto @@ -10,8 +10,8 @@ message Message { bytes address = 3; DirectionEnum direction = 4; uint64 stored_at = 5; - uint64 delivery_confirmation_at = 6; - uint64 read_confirmation_at = 7; + optional uint64 delivery_confirmation_at = 6; + optional uint64 read_confirmation_at = 7; bytes message_id = 8; } diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 4d3f08f800..eb4550ae5a 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -541,7 +541,7 @@ where T: ContactsBackend + 'static let our_message = Message { address: TariAddress::from_public_key(&source_public_key, message.address.network()), stored_at: EpochTime::now().as_u64(), - delivery_confirmation_at: EpochTime::now().as_u64(), + delivery_confirmation_at: Some(EpochTime::now().as_u64()), ..message }; @@ -565,7 +565,11 @@ where T: ContactsBackend + 'static let address = &message.address; let confirmation = MessageDispatch::DeliveryConfirmation(Confirmation { message_id: message.message_id.clone(), - timestamp: message.delivery_confirmation_at, + timestamp: message + .delivery_confirmation_at + .ok_or(ContactsServiceError::MessageParsingError( + "delivery_confirmation_at is malformed".to_string(), + ))?, }); let msg = OutboundDomainMessage::from(confirmation); diff --git a/base_layer/contacts/src/contacts_service/storage/types/messages.rs b/base_layer/contacts/src/contacts_service/storage/types/messages.rs index 686ba16ab2..091bc23a18 100644 --- a/base_layer/contacts/src/contacts_service/storage/types/messages.rs +++ b/base_layer/contacts/src/contacts_service/storage/types/messages.rs @@ -46,8 +46,6 @@ pub struct MessagesSqlInsert { pub body: Vec, pub metadata: Vec, pub stored_at: NaiveDateTime, - pub delivery_confirmation_at: NaiveDateTime, - pub read_confirmation_at: NaiveDateTime, pub direction: i32, } @@ -60,8 +58,8 @@ pub struct MessagesSql { pub body: Vec, pub metadata: Vec, pub stored_at: NaiveDateTime, - pub delivery_confirmation_at: NaiveDateTime, - pub read_confirmation_at: NaiveDateTime, + pub delivery_confirmation_at: Option, + pub read_confirmation_at: Option, pub direction: i32, } #[derive(Clone, Debug, AsChangeset, PartialEq, Eq)] @@ -142,8 +140,8 @@ impl TryFrom for Message { ) .unwrap_or_else(|| panic!("Direction from byte {}", o.direction)), stored_at: o.stored_at.timestamp() as u64, - delivery_confirmation_at: o.stored_at.timestamp() as u64, - read_confirmation_at: o.stored_at.timestamp() as u64, + delivery_confirmation_at: Some(o.stored_at.timestamp() as u64), + read_confirmation_at: Some(o.stored_at.timestamp() as u64), body: o.body, metadata, message_id: o.message_id, @@ -165,8 +163,6 @@ impl TryFrom for MessagesSqlInsert { body: o.body, metadata: metadata.into_bytes().to_vec(), stored_at: NaiveDateTime::from_timestamp_opt(o.stored_at as i64, 0).unwrap(), - delivery_confirmation_at: NaiveDateTime::from_timestamp_opt(o.delivery_confirmation_at as i64, 0).unwrap(), - read_confirmation_at: NaiveDateTime::from_timestamp_opt(o.read_confirmation_at as i64, 0).unwrap(), direction: i32::from(o.direction.as_byte()), }) } diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs index 3a9bd00d45..d963567efb 100644 --- a/base_layer/contacts/src/contacts_service/types/message.rs +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -39,8 +39,8 @@ pub struct Message { pub address: TariAddress, pub direction: Direction, pub stored_at: u64, - pub delivery_confirmation_at: u64, - pub read_confirmation_at: u64, + pub delivery_confirmation_at: Option, + pub read_confirmation_at: Option, pub message_id: Vec, } diff --git a/base_layer/contacts/src/schema.rs b/base_layer/contacts/src/schema.rs index ad77ae8d2b..9a57007c44 100644 --- a/base_layer/contacts/src/schema.rs +++ b/base_layer/contacts/src/schema.rs @@ -18,8 +18,8 @@ diesel::table! { body -> Binary, metadata -> Binary, stored_at -> Timestamp, - delivery_confirmation_at -> Timestamp, - read_confirmation_at -> Timestamp, + delivery_confirmation_at -> Nullable, + read_confirmation_at -> Nullable, direction -> Integer, } } From 4ba846fd379db924751e9c75b16bbad3bb53d908 Mon Sep 17 00:00:00 2001 From: brianp Date: Sun, 1 Oct 2023 21:19:08 +0200 Subject: [PATCH 06/12] Add a client function for read receipt sending --- .../examples/chat_client/src/client.rs | 10 +++++++ .../contacts/src/contacts_service/handle.rs | 27 +++++++++++++++++-- .../contacts/src/contacts_service/service.rs | 10 +++++-- integration_tests/src/chat_ffi.rs | 4 +++ 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/base_layer/contacts/examples/chat_client/src/client.rs b/base_layer/contacts/examples/chat_client/src/client.rs index 9fe6b475e1..3ebfcf66d5 100644 --- a/base_layer/contacts/examples/chat_client/src/client.rs +++ b/base_layer/contacts/examples/chat_client/src/client.rs @@ -49,6 +49,7 @@ pub trait ChatClient { fn create_message(&self, receiver: &TariAddress, message: String) -> Message; async fn get_messages(&self, sender: &TariAddress, limit: u64, page: u64) -> Vec; async fn send_message(&self, message: Message); + async fn send_read_receipt(&self, address: &TariAddress, message_id: Vec); fn identity(&self) -> &NodeIdentity; fn shutdown(&mut self); } @@ -171,6 +172,15 @@ impl ChatClient for Client { messages } + async fn send_read_receipt(&self, address: &TariAddress, message_id: Vec) { + if let Some(mut contacts_service) = self.contacts.clone() { + contacts_service + .send_read_confirmation(address.clone(), message_id) + .await + .expect("Read receipt not sent"); + } + } + fn create_message(&self, receiver: &TariAddress, message: String) -> Message { MessageBuilder::new().address(receiver.clone()).message(message).build() } diff --git a/base_layer/contacts/src/contacts_service/handle.rs b/base_layer/contacts/src/contacts_service/handle.rs index 6640b3a40d..822f5c122a 100644 --- a/base_layer/contacts/src/contacts_service/handle.rs +++ b/base_layer/contacts/src/contacts_service/handle.rs @@ -30,13 +30,14 @@ use chrono::{DateTime, Local, NaiveDateTime}; use tari_common_types::tari_address::TariAddress; use tari_comms::peer_manager::NodeId; use tari_service_framework::reply_channel::SenderService; +use tari_utilities::epoch_time::EpochTime; use tokio::sync::broadcast; use tower::Service; use crate::contacts_service::{ error::ContactsServiceError, service::{ContactMessageType, ContactOnlineStatus}, - types::{Contact, Message}, + types::{Confirmation, Contact, Message}, }; pub static DEFAULT_MESSAGE_LIMIT: u64 = 35; @@ -137,7 +138,7 @@ pub enum ContactsServiceRequest { GetContactOnlineStatus(Contact), SendMessage(TariAddress, Message), GetMessages(TariAddress, i64, i64), - SendDeliveryConfirmation(TariAddress, Vec), + SendReadConfirmation(TariAddress, Confirmation), } #[derive(Debug)] @@ -149,6 +150,7 @@ pub enum ContactsServiceResponse { OnlineStatus(ContactOnlineStatus), Messages(Vec), MessageSent, + ReadConfirmationSent, } #[derive(Clone)] @@ -283,4 +285,25 @@ impl ContactsServiceHandle { _ => Err(ContactsServiceError::UnexpectedApiResponse), } } + + pub async fn send_read_confirmation( + &mut self, + address: TariAddress, + message_id: Vec, + ) -> Result<(), ContactsServiceError> { + match self + .request_response_service + .call(ContactsServiceRequest::SendReadConfirmation( + address.clone(), + Confirmation { + message_id, + timestamp: EpochTime::now().as_u64(), + }, + )) + .await?? + { + ContactsServiceResponse::ReadConfirmationSent => Ok(()), + _ => Err(ContactsServiceError::UnexpectedApiResponse), + } + } } diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index eb4550ae5a..fb5e7fa310 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -307,8 +307,14 @@ where T: ContactsBackend + 'static Ok(ContactsServiceResponse::MessageSent) }, - ContactsServiceRequest::SendDeliveryConfirmation(address, message_id) => { - Ok(ContactsServiceResponse::MessageSent) + ContactsServiceRequest::SendReadConfirmation(address, confirmation) => { + let msg = OutboundDomainMessage::from(MessageDispatch::DeliveryConfirmation(confirmation.clone())); + self.deliver_message(address, msg).await?; + + self.db + .confirm_message(confirmation.message_id.clone(), None, Some(confirmation.timestamp))?; + + Ok(ContactsServiceResponse::ReadConfirmationSent) }, } } diff --git a/integration_tests/src/chat_ffi.rs b/integration_tests/src/chat_ffi.rs index c8a229d86c..b730ff2c2e 100644 --- a/integration_tests/src/chat_ffi.rs +++ b/integration_tests/src/chat_ffi.rs @@ -188,6 +188,10 @@ impl ChatClient for ChatFFI { } } + async fn send_read_receipt(&self, _address: &TariAddress, _message_id: Vec) { + todo!(); + } + fn identity(&self) -> &NodeIdentity { &self.identity } From bc2a59713f578a49196a648549c24bf60b1b58ce Mon Sep 17 00:00:00 2001 From: brianp Date: Mon, 2 Oct 2023 11:42:51 +0200 Subject: [PATCH 07/12] Test read receipt delivery --- base_layer/chat_ffi/chat.h | 19 ++++ base_layer/chat_ffi/src/lib.rs | 1 + base_layer/chat_ffi/src/read_receipt.rs | 69 +++++++++++++++ .../examples/chat_client/src/client.rs | 6 +- integration_tests/src/chat_ffi.rs | 11 ++- integration_tests/tests/features/Chat.feature | 9 ++ .../tests/features/ChatFFI.feature | 17 ++++ integration_tests/tests/steps/chat_steps.rs | 87 ++++++++++++++++++- 8 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 base_layer/chat_ffi/src/read_receipt.rs diff --git a/base_layer/chat_ffi/chat.h b/base_layer/chat_ffi/chat.h index 08445cc2a3..785c27b4c3 100644 --- a/base_layer/chat_ffi/chat.h +++ b/base_layer/chat_ffi/chat.h @@ -261,6 +261,25 @@ void add_chat_message_metadata(struct Message *message, struct ChatByteVector *data, int *error_out); +/** + * Sends a read confirmation for a given message + * + * ## Arguments + * `client` - The chat client + * `message` - The message that was read + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `*mut TariAddress` - A ptr to a TariAddress + * + * # Safety + * The ```ChatClientFFI``` When done with the client it should be destroyed + * The ```Message``` When done with the Message it should be destroyed + */ +void send_read_confirmation_for_message(struct ChatClientFFI *client, + struct Message *message, + int *error_out); + /** * Creates a tor transport config * diff --git a/base_layer/chat_ffi/src/lib.rs b/base_layer/chat_ffi/src/lib.rs index 87903ba8a0..0fef4f1336 100644 --- a/base_layer/chat_ffi/src/lib.rs +++ b/base_layer/chat_ffi/src/lib.rs @@ -44,6 +44,7 @@ mod error; mod logging; mod message; mod message_metadata; +mod read_receipt; mod tansport_config; mod tari_address; mod types; diff --git a/base_layer/chat_ffi/src/read_receipt.rs b/base_layer/chat_ffi/src/read_receipt.rs new file mode 100644 index 0000000000..55ae263958 --- /dev/null +++ b/base_layer/chat_ffi/src/read_receipt.rs @@ -0,0 +1,69 @@ +// Copyright 2023, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::ptr; + +use libc::c_int; +use tari_chat_client::ChatClient; +use tari_contacts::contacts_service::types::Message; + +use crate::{ + error::{InterfaceError, LibChatError}, + ChatClientFFI, +}; + +/// Sends a read confirmation for a given message +/// +/// ## Arguments +/// `client` - The chat client +/// `message` - The message that was read +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `*mut TariAddress` - A ptr to a TariAddress +/// +/// # Safety +/// The ```ChatClientFFI``` When done with the client it should be destroyed +/// The ```Message``` When done with the Message it should be destroyed +#[no_mangle] +pub unsafe extern "C" fn send_read_confirmation_for_message( + client: *mut ChatClientFFI, + message: *mut Message, + error_out: *mut c_int, +) { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if client.is_null() { + error = LibChatError::from(InterfaceError::NullError("client".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + if message.is_null() { + error = LibChatError::from(InterfaceError::NullError("message".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + (*client) + .runtime + .block_on((*client).client.send_read_receipt((*message).clone())); +} diff --git a/base_layer/contacts/examples/chat_client/src/client.rs b/base_layer/contacts/examples/chat_client/src/client.rs index 3ebfcf66d5..9f46f4d57d 100644 --- a/base_layer/contacts/examples/chat_client/src/client.rs +++ b/base_layer/contacts/examples/chat_client/src/client.rs @@ -49,7 +49,7 @@ pub trait ChatClient { fn create_message(&self, receiver: &TariAddress, message: String) -> Message; async fn get_messages(&self, sender: &TariAddress, limit: u64, page: u64) -> Vec; async fn send_message(&self, message: Message); - async fn send_read_receipt(&self, address: &TariAddress, message_id: Vec); + async fn send_read_receipt(&self, message: Message); fn identity(&self) -> &NodeIdentity; fn shutdown(&mut self); } @@ -172,10 +172,10 @@ impl ChatClient for Client { messages } - async fn send_read_receipt(&self, address: &TariAddress, message_id: Vec) { + async fn send_read_receipt(&self, message: Message) { if let Some(mut contacts_service) = self.contacts.clone() { contacts_service - .send_read_confirmation(address.clone(), message_id) + .send_read_confirmation(message.address.clone(), message.message_id) .await .expect("Read receipt not sent"); } diff --git a/integration_tests/src/chat_ffi.rs b/integration_tests/src/chat_ffi.rs index b730ff2c2e..eeb8298afa 100644 --- a/integration_tests/src/chat_ffi.rs +++ b/integration_tests/src/chat_ffi.rs @@ -90,6 +90,7 @@ extern "C" { element_count: c_uint, error_our: *const c_int, ) -> *mut c_void; + pub fn send_read_confirmation_for_message(client: *mut ClientFFI, message: *mut c_void, error_out: *const c_int); } #[derive(Debug)] @@ -188,8 +189,14 @@ impl ChatClient for ChatFFI { } } - async fn send_read_receipt(&self, _address: &TariAddress, _message_id: Vec) { - todo!(); + async fn send_read_receipt(&self, message: Message) { + let client = self.ptr.lock().unwrap(); + let message_ptr = Box::into_raw(Box::new(message)) as *mut c_void; + let error_out = Box::into_raw(Box::new(0)); + + unsafe { + send_read_confirmation_for_message(client.0, message_ptr, error_out); + } } fn identity(&self) -> &NodeIdentity { diff --git a/integration_tests/tests/features/Chat.feature b/integration_tests/tests/features/Chat.feature index c192d2dbb0..05b66be7ed 100644 --- a/integration_tests/tests/features/Chat.feature +++ b/integration_tests/tests/features/Chat.feature @@ -55,3 +55,12 @@ Feature: Chat messaging When I use CHAT_A to send a message 'Hey there' to CHAT_B When CHAT_B will have 1 message with CHAT_A Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps + + Scenario: A message receives a read receipt + Given I have a seed node SEED_A + When I have a chat client CHAT_A connected to seed node SEED_A + When I have a chat client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps \ No newline at end of file diff --git a/integration_tests/tests/features/ChatFFI.feature b/integration_tests/tests/features/ChatFFI.feature index 4a98580b69..a8dc5a0c21 100644 --- a/integration_tests/tests/features/ChatFFI.feature +++ b/integration_tests/tests/features/ChatFFI.feature @@ -55,3 +55,20 @@ Feature: Chat FFI messaging Then CHAT_B will have 2 messages with CHAT_A Then CHAT_A will have 2 messages with CHAT_B Then CHAT_A will have a replied to message from CHAT_B with 'oh hai' + + Scenario: A message receives a delivery receipt via FFI + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps + + Scenario: A message receives a read receipt via FFI + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + When CHAT_B will have 1 message with CHAT_A + When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps \ No newline at end of file diff --git a/integration_tests/tests/steps/chat_steps.rs b/integration_tests/tests/steps/chat_steps.rs index 27f91316c5..9ae9ec2146 100644 --- a/integration_tests/tests/steps/chat_steps.rs +++ b/integration_tests/tests/steps/chat_steps.rs @@ -228,8 +228,8 @@ async fn have_replied_message(world: &mut TariWorld, receiver: String, sender: S #[then(regex = r"^(.+) and (.+) will have a message '(.+)' with matching delivery timestamps")] async fn matching_delivery_timestamps(world: &mut TariWorld, sender: String, receiver: String, msg: String) { - let client_1 = world.chat_clients.get(&receiver).unwrap(); - let client_2 = world.chat_clients.get(&sender).unwrap(); + let client_1 = world.chat_clients.get(&sender).unwrap(); + let client_2 = world.chat_clients.get(&receiver).unwrap(); let client_1_address = TariAddress::from_public_key(client_1.identity().public_key(), Network::LocalNet); let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); @@ -265,8 +265,8 @@ async fn matching_delivery_timestamps(world: &mut TariWorld, sender: String, rec .clone(); assert_eq!( - client_1_message.delivery_confirmation_at, - client_2_message.delivery_confirmation_at + client_1_message.delivery_confirmation_at.unwrap(), + client_2_message.delivery_confirmation_at.unwrap() ); return; @@ -274,3 +274,82 @@ async fn matching_delivery_timestamps(world: &mut TariWorld, sender: String, rec panic!("Never received incoming chat message",) } + +#[then(regex = r"^(.+) and (.+) will have a message '(.+)' with matching read timestamps")] +async fn matching_read_timestamps(world: &mut TariWorld, sender: String, receiver: String, msg: String) { + let client_1 = world.chat_clients.get(&sender).unwrap(); + let client_2 = world.chat_clients.get(&receiver).unwrap(); + let client_1_address = TariAddress::from_public_key(client_1.identity().public_key(), Network::LocalNet); + let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); + + for _a in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let client_1_messages: Vec = (*client_1) + .get_messages(&client_2_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_1_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_1_message = client_1_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + let client_2_messages: Vec = (*client_2) + .get_messages(&client_1_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if client_2_messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let client_2_message = client_2_messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + if client_1_message.read_confirmation_at.is_none() || client_2_message.read_confirmation_at.is_none() { + continue; + } + + assert_eq!( + client_1_message.read_confirmation_at.unwrap(), + client_2_message.read_confirmation_at.unwrap() + ); + + return; + } + + panic!("Never received incoming chat message",) +} + +#[when(regex = r"^(.+) sends a read receipt to (.+) for message '(.+)'")] +async fn send_read_receipt(world: &mut TariWorld, sender: String, receiver: String, msg: String) { + let client_1 = world.chat_clients.get(&receiver).unwrap(); + let client_2 = world.chat_clients.get(&sender).unwrap(); + let client_2_address = TariAddress::from_public_key(client_2.identity().public_key(), Network::LocalNet); + + for _a in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + let messages: Vec = (*client_1) + .get_messages(&client_2_address, DEFAULT_MESSAGE_LIMIT, DEFAULT_MESSAGE_PAGE) + .await; + + if messages.is_empty() { + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + continue; + } + + let message = messages + .iter() + .find(|m| m.body == msg.clone().into_bytes()) + .expect("no message with that content found") + .clone(); + + client_1.send_read_receipt(message).await; + } +} From 951ad3b1d2f7b7970e4ff888c891676e8022dbcd Mon Sep 17 00:00:00 2001 From: brianp Date: Mon, 2 Oct 2023 14:36:11 +0200 Subject: [PATCH 08/12] Add confirmation callbacks --- base_layer/chat_ffi/chat.h | 56 ++++++++++++++++++- base_layer/chat_ffi/src/callback_handler.rs | 53 ++++++++++++++++-- base_layer/chat_ffi/src/lib.rs | 12 +++- .../contacts/src/contacts_service/handle.rs | 8 +-- .../contacts/src/contacts_service/service.rs | 30 ++++++---- integration_tests/tests/features/Chat.feature | 2 +- 6 files changed, 138 insertions(+), 23 deletions(-) diff --git a/base_layer/chat_ffi/chat.h b/base_layer/chat_ffi/chat.h index 785c27b4c3..abb0eef7e8 100644 --- a/base_layer/chat_ffi/chat.h +++ b/base_layer/chat_ffi/chat.h @@ -18,6 +18,8 @@ struct ChatMessageMetadataVector; struct ChatMessages; +struct Confirmation; + struct Message; struct TariAddress; @@ -43,6 +45,10 @@ struct ChatFFIMessage { typedef void (*CallbackMessageReceived)(struct ChatFFIMessage*); +typedef void (*CallbackDeliveryConfirmationReceived)(struct Confirmation*); + +typedef void (*CallbackReadConfirmationReceived)(struct Confirmation*); + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -65,7 +71,9 @@ extern "C" { struct ChatClientFFI *create_chat_client(struct ApplicationConfig *config, int *error_out, CallbackContactStatusChange callback_contact_status_change, - CallbackMessageReceived callback_message_received); + CallbackMessageReceived callback_message_received, + CallbackDeliveryConfirmationReceived callback_delivery_confirmation_received, + CallbackReadConfirmationReceived callback_read_confirmation_received); /** * Frees memory for a ChatClientFFI @@ -118,6 +126,52 @@ struct ApplicationConfig *create_chat_config(const char *network_str, */ void destroy_chat_config(struct ApplicationConfig *config); +/** + * Get a pointer to a ChatByteVector representation of a message id + * + * ## Arguments + * `confirmation` - A pointer to the Confirmation + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `*mut ChatByteVector` - A ptr to a ChatByteVector + * + * # Safety + * The ```confirmation``` When done with the confirmation it should be destroyed + * The ```ChatByteVector``` When done with the returned ChatByteVector it should be destroyed + */ +struct ChatByteVector *read_confirmation_message_id(struct Confirmation *confirmation, + int *error_out); + +/** + * Get a c_uint timestamp for the confirmation + * + * ## Arguments + * `confirmation` - A pointer to the Confirmation + * `error_out` - Pointer to an int which will be modified + * + * ## Returns + * `c_uint` - A uint representation of time. May return 0 if casting fails + * + * # Safety + * None + */ +unsigned int read_confirmation_timestamp(struct Confirmation *confirmation, int *error_out); + +/** + * Frees memory for a Confirmation + * + * ## Arguments + * `address` - The pointer of a Confirmation + * + * ## Returns + * `()` - Does not return a value, equivalent to void in C + * + * # Safety + * None + */ +void destroy_confirmation(struct Confirmation *address); + /** * Add a contact * diff --git a/base_layer/chat_ffi/src/callback_handler.rs b/base_layer/chat_ffi/src/callback_handler.rs index c0d770973b..0251956129 100644 --- a/base_layer/chat_ffi/src/callback_handler.rs +++ b/base_layer/chat_ffi/src/callback_handler.rs @@ -25,7 +25,7 @@ use std::{convert::TryFrom, ops::Deref}; use log::{debug, error, info, trace}; use tari_contacts::contacts_service::{ handle::{ContactsLivenessData, ContactsLivenessEvent, ContactsServiceHandle}, - types::Message, + types::{Confirmation, Message, MessageDispatch}, }; use tari_shutdown::ShutdownSignal; @@ -35,12 +35,16 @@ const LOG_TARGET: &str = "chat_ffi::callback_handler"; pub(crate) type CallbackContactStatusChange = unsafe extern "C" fn(*mut ChatFFIContactsLivenessData); pub(crate) type CallbackMessageReceived = unsafe extern "C" fn(*mut ChatFFIMessage); +pub(crate) type CallbackDeliveryConfirmationReceived = unsafe extern "C" fn(*mut Confirmation); +pub(crate) type CallbackReadConfirmationReceived = unsafe extern "C" fn(*mut Confirmation); #[derive(Clone)] pub struct CallbackHandler { contacts_service_handle: ContactsServiceHandle, callback_contact_status_change: CallbackContactStatusChange, callback_message_received: CallbackMessageReceived, + callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived, + callback_read_confirmation_received: CallbackReadConfirmationReceived, shutdown: ShutdownSignal, } @@ -50,12 +54,16 @@ impl CallbackHandler { shutdown: ShutdownSignal, callback_contact_status_change: CallbackContactStatusChange, callback_message_received: CallbackMessageReceived, + callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived, + callback_read_confirmation_received: CallbackReadConfirmationReceived, ) -> Self { Self { contacts_service_handle, shutdown, callback_contact_status_change, callback_message_received, + callback_delivery_confirmation_received, + callback_read_confirmation_received, } } @@ -67,9 +75,22 @@ impl CallbackHandler { tokio::select! { rec_message = chat_messages.recv() => { match rec_message { - Ok(message) => { - trace!(target: LOG_TARGET, "FFI Callback monitor received a new Message"); - self.trigger_message_received(message.deref().clone()); + Ok(message_dispatch) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new MessageDispatch"); + match message_dispatch.deref() { + MessageDispatch::Message(m) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new Message"); + self.trigger_message_received(m.clone()); + } + MessageDispatch::DeliveryConfirmation(c) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new Delivery Confirmation"); + self.trigger_delivery_confirmation_received(c.clone()); + }, + MessageDispatch::ReadConfirmation(c) => { + trace!(target: LOG_TARGET, "FFI Callback monitor received a new Read Confirmation"); + self.trigger_read_confirmation_received(c.clone()); + } + }; }, Err(_) => { debug!(target: LOG_TARGET, "FFI Callback monitor had an error receiving new messages")} } @@ -130,4 +151,28 @@ impl CallbackHandler { Err(e) => error!(target: LOG_TARGET, "Error processing message received callback: {}", e), } } + + fn trigger_delivery_confirmation_received(&mut self, confirmation: Confirmation) { + debug!( + target: LOG_TARGET, + "Calling DeliveryConfirmationReceived callback function for message {:?}", + confirmation.message_id, + ); + + unsafe { + (self.callback_delivery_confirmation_received)(Box::into_raw(Box::new(confirmation))); + } + } + + fn trigger_read_confirmation_received(&mut self, confirmation: Confirmation) { + debug!( + target: LOG_TARGET, + "Calling ReadConfirmationReceived callback function for message {:?}", + confirmation.message_id, + ); + + unsafe { + (self.callback_read_confirmation_received)(Box::into_raw(Box::new(confirmation))); + } + } } diff --git a/base_layer/chat_ffi/src/lib.rs b/base_layer/chat_ffi/src/lib.rs index 0fef4f1336..4f367c5d47 100644 --- a/base_layer/chat_ffi/src/lib.rs +++ b/base_layer/chat_ffi/src/lib.rs @@ -32,13 +32,19 @@ use tari_chat_client::{config::ApplicationConfig, networking::PeerFeatures, Chat use tokio::runtime::Runtime; use crate::{ - callback_handler::{CallbackHandler, CallbackMessageReceived}, + callback_handler::{ + CallbackDeliveryConfirmationReceived, + CallbackHandler, + CallbackMessageReceived, + CallbackReadConfirmationReceived, + }, error::{InterfaceError, LibChatError}, logging::init_logging, }; mod application_config; mod callback_handler; +mod confirmation; mod contacts; mod error; mod logging; @@ -80,6 +86,8 @@ pub unsafe extern "C" fn create_chat_client( error_out: *mut c_int, callback_contact_status_change: CallbackContactStatusChange, callback_message_received: CallbackMessageReceived, + callback_delivery_confirmation_received: CallbackDeliveryConfirmationReceived, + callback_read_confirmation_received: CallbackReadConfirmationReceived, ) -> *mut ChatClientFFI { let mut error = 0; ptr::swap(error_out, &mut error as *mut c_int); @@ -138,6 +146,8 @@ pub unsafe extern "C" fn create_chat_client( client.shutdown.to_signal(), callback_contact_status_change, callback_message_received, + callback_delivery_confirmation_received, + callback_read_confirmation_received, ); runtime.spawn(async move { diff --git a/base_layer/contacts/src/contacts_service/handle.rs b/base_layer/contacts/src/contacts_service/handle.rs index 822f5c122a..d69c74d7eb 100644 --- a/base_layer/contacts/src/contacts_service/handle.rs +++ b/base_layer/contacts/src/contacts_service/handle.rs @@ -37,7 +37,7 @@ use tower::Service; use crate::contacts_service::{ error::ContactsServiceError, service::{ContactMessageType, ContactOnlineStatus}, - types::{Confirmation, Contact, Message}, + types::{Confirmation, Contact, Message, MessageDispatch}, }; pub static DEFAULT_MESSAGE_LIMIT: u64 = 35; @@ -158,7 +158,7 @@ pub struct ContactsServiceHandle { request_response_service: SenderService>, liveness_events: broadcast::Sender>, - message_events: broadcast::Sender>, + message_events: broadcast::Sender>, } impl ContactsServiceHandle { @@ -168,7 +168,7 @@ impl ContactsServiceHandle { Result, >, liveness_events: broadcast::Sender>, - message_events: broadcast::Sender>, + message_events: broadcast::Sender>, ) -> Self { Self { request_response_service, @@ -225,7 +225,7 @@ impl ContactsServiceHandle { self.liveness_events.subscribe() } - pub fn get_messages_event_stream(&self) -> broadcast::Receiver> { + pub fn get_messages_event_stream(&self) -> broadcast::Receiver> { self.message_events.subscribe() } diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index fb5e7fa310..3970c37528 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -133,7 +133,7 @@ where T: ContactsBackend + 'static dht: Dht, subscription_factory: Arc, event_publisher: broadcast::Sender>, - message_publisher: broadcast::Sender>, + message_publisher: broadcast::Sender>, number_of_rounds_no_pings: u16, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, @@ -154,7 +154,7 @@ where T: ContactsBackend + 'static dht: Dht, subscription_factory: Arc, event_publisher: broadcast::Sender>, - message_publisher: broadcast::Sender>, + message_publisher: broadcast::Sender>, contacts_auto_ping_interval: Duration, contacts_online_ping_window: usize, ) -> Self { @@ -300,15 +300,17 @@ where T: ContactsBackend + 'static }, ContactsServiceRequest::SendMessage(address, mut message) => { let ob_message = OutboundDomainMessage::from(MessageDispatch::Message(message.clone())); - self.deliver_message(address, ob_message).await?; message.stored_at = Utc::now().naive_utc().timestamp() as u64; self.db.save_message(message)?; + self.deliver_message(address, ob_message).await?; Ok(ContactsServiceResponse::MessageSent) }, ContactsServiceRequest::SendReadConfirmation(address, confirmation) => { - let msg = OutboundDomainMessage::from(MessageDispatch::DeliveryConfirmation(confirmation.clone())); + let msg = OutboundDomainMessage::from(MessageDispatch::ReadConfirmation(confirmation.clone())); + trace!(target: LOG_TARGET, "Sending read confirmation with details: message_id: {:?}, timestamp: {:?}", confirmation.message_id, confirmation.timestamp); + self.deliver_message(address, msg).await?; self.db @@ -547,13 +549,14 @@ where T: ContactsBackend + 'static let our_message = Message { address: TariAddress::from_public_key(&source_public_key, message.address.network()), stored_at: EpochTime::now().as_u64(), - delivery_confirmation_at: Some(EpochTime::now().as_u64()), ..message }; match self.db.save_message(our_message.clone()) { Ok(..) => { - let _msg = self.message_publisher.send(Arc::new(our_message.clone())); + let _msg = self + .message_publisher + .send(Arc::new(MessageDispatch::Message(our_message.clone()))); // Send a delivery notification self.create_and_send_delivery_confirmation_for_msg(&our_message).await?; @@ -569,17 +572,19 @@ where T: ContactsBackend + 'static message: &Message, ) -> Result<(), ContactsServiceError> { let address = &message.address; + let delivery_time = EpochTime::now().as_u64(); let confirmation = MessageDispatch::DeliveryConfirmation(Confirmation { message_id: message.message_id.clone(), - timestamp: message - .delivery_confirmation_at - .ok_or(ContactsServiceError::MessageParsingError( - "delivery_confirmation_at is malformed".to_string(), - ))?, + timestamp: delivery_time, }); let msg = OutboundDomainMessage::from(confirmation); - self.deliver_message(address.clone(), msg).await + self.deliver_message(address.clone(), msg).await?; + + self.db + .confirm_message(message.message_id.clone(), Some(delivery_time), None)?; + + Ok(()) } async fn handle_confirmation(&mut self, dispatch: MessageDispatch) -> Result<(), ContactsServiceError> { @@ -593,6 +598,7 @@ where T: ContactsBackend + 'static }, }; + trace!(target: LOG_TARGET, "Handling confirmation with details: message_id: {:?}, delivery: {:?}, read: {:?}", message_id, delivery, read); self.db.confirm_message(message_id, delivery, read)?; Ok(()) diff --git a/integration_tests/tests/features/Chat.feature b/integration_tests/tests/features/Chat.feature index 05b66be7ed..35323135d1 100644 --- a/integration_tests/tests/features/Chat.feature +++ b/integration_tests/tests/features/Chat.feature @@ -63,4 +63,4 @@ Feature: Chat messaging When I use CHAT_A to send a message 'Hey there' to CHAT_B When CHAT_B will have 1 message with CHAT_A When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' - Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps \ No newline at end of file + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps From 449f003a1b5c9cbe0a43ddb63fccad40fb9d9477 Mon Sep 17 00:00:00 2001 From: brianp Date: Mon, 2 Oct 2023 14:57:15 +0200 Subject: [PATCH 09/12] Test new ffi callbacks --- .../contacts/src/contacts_service/service.rs | 3 +- integration_tests/src/chat_client.rs | 1 + integration_tests/src/chat_ffi.rs | 16 ++++++++ .../tests/features/ChatFFI.feature | 19 ++++++++++ .../tests/steps/chat_ffi_steps.rs | 38 +++++++++++++++++++ 5 files changed, 76 insertions(+), 1 deletion(-) diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 3970c37528..424276d9f2 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -588,7 +588,7 @@ where T: ContactsBackend + 'static } async fn handle_confirmation(&mut self, dispatch: MessageDispatch) -> Result<(), ContactsServiceError> { - let (message_id, delivery, read) = match dispatch { + let (message_id, delivery, read) = match dispatch.clone() { MessageDispatch::DeliveryConfirmation(c) => (c.message_id, Some(c.timestamp), None), MessageDispatch::ReadConfirmation(c) => (c.message_id, None, Some(c.timestamp)), _ => { @@ -600,6 +600,7 @@ where T: ContactsBackend + 'static trace!(target: LOG_TARGET, "Handling confirmation with details: message_id: {:?}, delivery: {:?}, read: {:?}", message_id, delivery, read); self.db.confirm_message(message_id, delivery, read)?; + let _msg = self.message_publisher.send(Arc::new(dispatch)); Ok(()) } diff --git a/integration_tests/src/chat_client.rs b/integration_tests/src/chat_client.rs index 6b4516b074..1bf22cbaf5 100644 --- a/integration_tests/src/chat_client.rs +++ b/integration_tests/src/chat_client.rs @@ -73,6 +73,7 @@ pub fn test_config(address: Multiaddr) -> ApplicationConfig { chat_client_config.p2p.transport.tcp.listener_address = address.clone(); chat_client_config.p2p.public_addresses = MultiaddrList::from(vec![address]); chat_client_config.log_path = Some(PathBuf::from("log/chat_client/chat_client.log")); + chat_client_config.log_verbosity = Some(11); ApplicationConfig { chat_client: chat_client_config, diff --git a/integration_tests/src/chat_ffi.rs b/integration_tests/src/chat_ffi.rs index eeb8298afa..03ea45ff50 100644 --- a/integration_tests/src/chat_ffi.rs +++ b/integration_tests/src/chat_ffi.rs @@ -58,6 +58,16 @@ extern "C" fn callback_message_received(_state: *mut c_void) { *callback.message_received.lock().unwrap() += 1; } +extern "C" fn callback_delivery_confirmation_received(_state: *mut c_void) { + let callback = ChatCallback::instance(); + *callback.delivery_confirmation_received.lock().unwrap() += 1; +} + +extern "C" fn callback_read_confirmation_received(_state: *mut c_void) { + let callback = ChatCallback::instance(); + *callback.read_confirmation_received.lock().unwrap() += 1; +} + #[cfg_attr(windows, link(name = "minotari_chat_ffi.dll"))] #[cfg_attr(not(windows), link(name = "minotari_chat_ffi"))] extern "C" { @@ -66,6 +76,8 @@ extern "C" { error_out: *const c_int, callback_contact_status_change: unsafe extern "C" fn(*mut c_void), callback_message_received: unsafe extern "C" fn(*mut c_void), + callback_delivery_confirmation_received: unsafe extern "C" fn(*mut c_void), + callback_read_confirmation_received: unsafe extern "C" fn(*mut c_void), ) -> *mut ClientFFI; pub fn create_chat_message(receiver: *mut c_void, message: *const c_char, error_out: *const c_int) -> *mut c_void; pub fn send_chat_message(client: *mut ClientFFI, message: *mut c_void, error_out: *const c_int); @@ -252,6 +264,8 @@ pub async fn spawn_ffi_chat_client(name: &str, seed_peers: Vec, base_dir: error_out, callback_contact_status_change, callback_message_received, + callback_delivery_confirmation_received, + callback_read_confirmation_received, ); } @@ -268,6 +282,8 @@ static START: Once = Once::new(); pub struct ChatCallback { pub contact_status_change: Mutex, pub message_received: Mutex, + pub delivery_confirmation_received: Mutex, + pub read_confirmation_received: Mutex, } impl ChatCallback { diff --git a/integration_tests/tests/features/ChatFFI.feature b/integration_tests/tests/features/ChatFFI.feature index a8dc5a0c21..34cc37d58d 100644 --- a/integration_tests/tests/features/ChatFFI.feature +++ b/integration_tests/tests/features/ChatFFI.feature @@ -19,6 +19,25 @@ Feature: Chat FFI messaging Then there will be a MessageReceived callback of at least 1 Then CHAT_B will have 1 message with CHAT_A + Scenario: Callback for delivery confirmation received + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then there will be a DeliveryConfirmationReceived callback of at least 1 + Then CHAT_B will have 1 message with CHAT_A + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching delivery timestamps + + Scenario: Callback for read confirmation received + Given I have a seed node SEED_A + When I have a chat FFI client CHAT_A connected to seed node SEED_A + When I have a chat FFI client CHAT_B connected to seed node SEED_A + When I use CHAT_A to send a message 'Hey there' to CHAT_B + Then CHAT_B will have 1 message with CHAT_A + When CHAT_B sends a read receipt to CHAT_A for message 'Hey there' + Then there will be a ReadConfirmationReceived callback of at least 1 + Then CHAT_A and CHAT_B will have a message 'Hey there' with matching read timestamps + # Also flaky on CI. Seems liveness has issues on CI @broken Scenario: Callback for status change is received diff --git a/integration_tests/tests/steps/chat_ffi_steps.rs b/integration_tests/tests/steps/chat_ffi_steps.rs index ee43e8a1cf..b3148d817a 100644 --- a/integration_tests/tests/steps/chat_ffi_steps.rs +++ b/integration_tests/tests/steps/chat_ffi_steps.rs @@ -81,6 +81,44 @@ async fn message_reveived_callback(_world: &mut TariWorld, callback_count: usize ); } +#[then(expr = "there will be a DeliveryConfirmationReceived callback of at least {int}")] +async fn delivery_confirmation_reveived_callback(_world: &mut TariWorld, callback_count: usize) { + let mut count = 0; + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + count = *ChatCallback::instance().delivery_confirmation_received.lock().unwrap(); + + if count >= callback_count as u64 { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!( + "contact status update never received. Callbacks expected: {}, Callbacks received: {:?}", + callback_count, count + ); +} + +#[then(expr = "there will be a ReadConfirmationReceived callback of at least {int}")] +async fn read_confirmation_received_callback(_world: &mut TariWorld, callback_count: usize) { + let mut count = 0; + for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { + count = *ChatCallback::instance().read_confirmation_received.lock().unwrap(); + + if count >= callback_count as u64 { + return; + } + + tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + } + + panic!( + "contact status update never received. Callbacks expected: {}, Callbacks received: {:?}", + callback_count, count + ); +} + #[then(expr = "I can shutdown {word} without a problem")] async fn can_shutdown(world: &mut TariWorld, name: String) { let mut client = world.chat_clients.remove(&name).unwrap(); From a2c49b2c32b9f77e9fc8542fd2632042652051d1 Mon Sep 17 00:00:00 2001 From: brianp Date: Mon, 2 Oct 2023 15:01:33 +0200 Subject: [PATCH 10/12] Add missing file. Oops --- base_layer/chat_ffi/src/confirmation.rs | 147 ++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 base_layer/chat_ffi/src/confirmation.rs diff --git a/base_layer/chat_ffi/src/confirmation.rs b/base_layer/chat_ffi/src/confirmation.rs new file mode 100644 index 0000000000..93c1a2bdb4 --- /dev/null +++ b/base_layer/chat_ffi/src/confirmation.rs @@ -0,0 +1,147 @@ +// Copyright 2023, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{convert::TryFrom, ptr}; + +use libc::{c_int, c_uint}; +use tari_contacts::contacts_service::types::Confirmation; + +use crate::{ + error::{InterfaceError, LibChatError}, + types::{chat_byte_vector_create, ChatByteVector}, +}; + +/// Get a pointer to a ChatByteVector representation of a message id +/// +/// ## Arguments +/// `confirmation` - A pointer to the Confirmation +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `*mut ChatByteVector` - A ptr to a ChatByteVector +/// +/// # Safety +/// The ```confirmation``` When done with the confirmation it should be destroyed +/// The ```ChatByteVector``` When done with the returned ChatByteVector it should be destroyed +#[no_mangle] +pub unsafe extern "C" fn read_confirmation_message_id( + confirmation: *mut Confirmation, + error_out: *mut c_int, +) -> *mut ChatByteVector { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if confirmation.is_null() { + error = LibChatError::from(InterfaceError::NullError("client".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + let c = &(*confirmation); + let data_bytes = c.message_id.clone(); + let len = u32::try_from(data_bytes.len()).expect("Can't cast from usize"); + chat_byte_vector_create(data_bytes.as_ptr(), len as c_uint, error_out) +} + +/// Get a c_uint timestamp for the confirmation +/// +/// ## Arguments +/// `confirmation` - A pointer to the Confirmation +/// `error_out` - Pointer to an int which will be modified +/// +/// ## Returns +/// `c_uint` - A uint representation of time. May return 0 if casting fails +/// +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn read_confirmation_timestamp(confirmation: *mut Confirmation, error_out: *mut c_int) -> c_uint { + let mut error = 0; + ptr::swap(error_out, &mut error as *mut c_int); + + if confirmation.is_null() { + error = LibChatError::from(InterfaceError::NullError("client".to_string())).code; + ptr::swap(error_out, &mut error as *mut c_int); + } + + let c = &(*confirmation); + c_uint::try_from(c.timestamp).unwrap_or(0) +} + +/// Frees memory for a Confirmation +/// +/// ## Arguments +/// `address` - The pointer of a Confirmation +/// +/// ## Returns +/// `()` - Does not return a value, equivalent to void in C +/// +/// # Safety +/// None +#[no_mangle] +pub unsafe extern "C" fn destroy_confirmation(address: *mut Confirmation) { + if !address.is_null() { + drop(Box::from_raw(address)) + } +} + +#[cfg(test)] +mod test { + use tari_contacts::contacts_service::types::{Confirmation, MessageBuilder}; + use tari_utilities::epoch_time::EpochTime; + + use crate::{ + confirmation::{destroy_confirmation, read_confirmation_message_id, read_confirmation_timestamp}, + types::{chat_byte_vector_get_at, chat_byte_vector_get_length}, + }; + + #[test] + fn test_reading_from_confrimation() { + let message_id = MessageBuilder::new().build().message_id; + let timestamp = EpochTime::now().as_u64(); + let confirmation = Confirmation { + message_id: message_id.clone(), + timestamp, + }; + + let confirmation_ptr = Box::into_raw(Box::new(confirmation)); + let error_out = Box::into_raw(Box::new(0)); + + unsafe { + let id_byte_vec = read_confirmation_message_id(confirmation_ptr, error_out); + let len = chat_byte_vector_get_length(id_byte_vec, error_out); + + let mut read_id = vec![]; + for i in 0..len { + read_id.push(chat_byte_vector_get_at(id_byte_vec, i, error_out)); + } + + assert_eq!(message_id, read_id) + } + + unsafe { + let read_timestamp = read_confirmation_timestamp(confirmation_ptr, error_out); + assert_eq!(timestamp, u64::from(read_timestamp)) + } + + unsafe { destroy_confirmation(confirmation_ptr) } + } +} From 8ee409a08298e3f67ccc2a0823438f2df88e8447 Mon Sep 17 00:00:00 2001 From: brianp Date: Mon, 2 Oct 2023 15:31:20 +0200 Subject: [PATCH 11/12] Proto doesnt need to send these fields --- base_layer/contacts/proto/message.proto | 5 +---- base_layer/contacts/src/contacts_service/types/message.rs | 7 +------ 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/base_layer/contacts/proto/message.proto b/base_layer/contacts/proto/message.proto index 61a7768915..6770cb65b7 100644 --- a/base_layer/contacts/proto/message.proto +++ b/base_layer/contacts/proto/message.proto @@ -9,10 +9,7 @@ message Message { repeated MessageMetadata metadata = 2; bytes address = 3; DirectionEnum direction = 4; - uint64 stored_at = 5; - optional uint64 delivery_confirmation_at = 6; - optional uint64 read_confirmation_at = 7; - bytes message_id = 8; + bytes message_id = 5; } enum DirectionEnum { diff --git a/base_layer/contacts/src/contacts_service/types/message.rs b/base_layer/contacts/src/contacts_service/types/message.rs index d963567efb..9dac4657ad 100644 --- a/base_layer/contacts/src/contacts_service/types/message.rs +++ b/base_layer/contacts/src/contacts_service/types/message.rs @@ -110,10 +110,8 @@ impl TryFrom for Message { address: TariAddress::from_bytes(&message.address).map_err(|e| e.to_string())?, // A Message from a proto::Message will always be an inbound message direction: Direction::Inbound, - stored_at: message.stored_at, - delivery_confirmation_at: message.delivery_confirmation_at, - read_confirmation_at: message.read_confirmation_at, message_id: message.message_id, + ..Message::default() }) } } @@ -129,9 +127,6 @@ impl From for proto::Message { .collect(), address: message.address.to_bytes().to_vec(), direction: i32::from(message.direction.as_byte()), - stored_at: message.stored_at, - delivery_confirmation_at: message.delivery_confirmation_at, - read_confirmation_at: message.read_confirmation_at, message_id: message.message_id, } } From 61133ab940d6a44cffe22b1a741794207740c689 Mon Sep 17 00:00:00 2001 From: brianp Date: Tue, 3 Oct 2023 16:43:23 +0200 Subject: [PATCH 12/12] handle failed castings by erroring out --- .../src/contacts_service/storage/database.rs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/base_layer/contacts/src/contacts_service/storage/database.rs b/base_layer/contacts/src/contacts_service/storage/database.rs index fe8e22532c..e7c8ae3caf 100644 --- a/base_layer/contacts/src/contacts_service/storage/database.rs +++ b/base_layer/contacts/src/contacts_service/storage/database.rs @@ -198,12 +198,27 @@ where T: ContactsBackend + 'static delivery_confirmation: Option, read_confirmation: Option, ) -> Result<(), ContactsServiceStorageError> { + let mut delivery = None; + if let Some(timestamp) = delivery_confirmation { + let secs = i64::try_from(timestamp).map_err(|_e| ContactsServiceStorageError::ConversionError)?; + delivery = Some( + NaiveDateTime::from_timestamp_opt(secs, 0) + .ok_or_else(|| ContactsServiceStorageError::ConversionError)?, + ) + }; + + let mut read = None; + if let Some(timestamp) = read_confirmation { + let secs = i64::try_from(timestamp).map_err(|_e| ContactsServiceStorageError::ConversionError)?; + read = Some( + NaiveDateTime::from_timestamp_opt(secs, 0) + .ok_or_else(|| ContactsServiceStorageError::ConversionError)?, + ) + }; + self.db .write(WriteOperation::Upsert(Box::new(DbKeyValuePair::MessageConfirmations( - message_id, - delivery_confirmation - .map(|d| NaiveDateTime::from_timestamp_opt(i64::try_from(d).unwrap_or(0), 0).unwrap()), - read_confirmation.map(|d| NaiveDateTime::from_timestamp_opt(i64::try_from(d).unwrap_or(0), 0).unwrap()), + message_id, delivery, read, ))))?; Ok(())