Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove spawn blocking calls from wallet db (contacts service) #4575

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Default for WalletConfig {
base_node_service_config: Default::default(),
data_dir: PathBuf::from_str("data/wallet").unwrap(),
db_file: PathBuf::from_str("db/console_wallet.db").unwrap(),
db_connection_pool_size: 5, // TODO: get actual default
db_connection_pool_size: 16, // Note: Do not reduce this default number
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really high. I was expecting it to be only one or two connections at a time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the start we have always tested with 16; this number slipped in somewhere. Effectively it makes read operation asynchronous.

password: None,
contacts_auto_ping_interval: Duration::from_secs(30),
contacts_online_ping_window: 30,
Expand Down
23 changes: 11 additions & 12 deletions base_layer/wallet/src/contacts_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where T: ContactsBackend + 'static
pin_mut!(shutdown);

// Add all contacts as monitored peers to the liveness service
let result = self.db.get_contacts().await;
let result = self.db.get_contacts();
if let Ok(ref contacts) = result {
self.add_contacts_to_liveness_service(contacts).await?;
}
Expand Down Expand Up @@ -195,14 +195,14 @@ where T: ContactsBackend + 'static
) -> Result<ContactsServiceResponse, ContactsServiceError> {
match request {
ContactsServiceRequest::GetContact(pk) => {
let result = self.db.get_contact(pk.clone()).await;
let result = self.db.get_contact(pk.clone());
if let Ok(ref contact) = result {
self.liveness.check_add_monitored_peer(contact.node_id.clone()).await?;
};
Ok(result.map(ContactsServiceResponse::Contact)?)
},
ContactsServiceRequest::UpsertContact(c) => {
self.db.upsert_contact(c.clone()).await?;
self.db.upsert_contact(c.clone())?;
self.liveness.check_add_monitored_peer(c.node_id).await?;
info!(
target: LOG_TARGET,
Expand All @@ -211,7 +211,7 @@ where T: ContactsBackend + 'static
Ok(ContactsServiceResponse::ContactSaved)
},
ContactsServiceRequest::RemoveContact(pk) => {
let result = self.db.remove_contact(pk.clone()).await?;
let result = self.db.remove_contact(pk.clone())?;
self.liveness
.check_remove_monitored_peer(result.node_id.clone())
.await?;
Expand All @@ -222,7 +222,7 @@ where T: ContactsBackend + 'static
Ok(ContactsServiceResponse::ContactRemoved(result))
},
ContactsServiceRequest::GetContacts => {
let result = self.db.get_contacts().await;
let result = self.db.get_contacts();
if let Ok(ref contacts) = result {
self.add_contacts_to_liveness_service(contacts).await?;
}
Expand Down Expand Up @@ -254,11 +254,11 @@ where T: ContactsBackend + 'static
match event {
// Received a ping, check if it contains ContactsLiveness
LivenessEvent::ReceivedPing(event) => {
self.update_with_ping_pong(event, ContactMessageType::Ping).await?;
self.update_with_ping_pong(event, ContactMessageType::Ping)?;
},
// Received a pong, check if our neighbour sent it and it contains ContactsLiveness
LivenessEvent::ReceivedPong(event) => {
self.update_with_ping_pong(event, ContactMessageType::Pong).await?;
self.update_with_ping_pong(event, ContactMessageType::Pong)?;
},
// New ping round has begun
LivenessEvent::PingRoundBroadcast(num_peers) => {
Expand All @@ -277,7 +277,7 @@ where T: ContactsBackend + 'static
self.resize_contacts_liveness_data_buffer(*num_peers);

// Update offline status
if let Ok(contacts) = self.db.get_contacts().await {
if let Ok(contacts) = self.db.get_contacts() {
for contact in contacts {
let online_status = self.get_online_status(&contact).await?;
if online_status == ContactOnlineStatus::Online {
Expand Down Expand Up @@ -332,7 +332,7 @@ where T: ContactsBackend + 'static
Utc::now().naive_utc().sub(last_seen) <= ping_window
}

async fn update_with_ping_pong(
fn update_with_ping_pong(
&mut self,
event: &PingPongEvent,
message_type: ContactMessageType,
Expand All @@ -356,15 +356,14 @@ where T: ContactsBackend + 'static
}
let this_public_key = self
.db
.update_contact_last_seen(&event.node_id, last_seen.naive_utc(), latency)
.await?;
.update_contact_last_seen(&event.node_id, last_seen.naive_utc(), latency)?;

let data = ContactsLivenessData::new(
this_public_key,
event.node_id.clone(),
latency,
Some(last_seen.naive_utc()),
message_type.clone(),
message_type,
ContactOnlineStatus::Online,
);
self.liveness_data.push(data.clone());
Expand Down
68 changes: 22 additions & 46 deletions base_layer/wallet/src/contacts_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,65 +118,46 @@ where T: ContactsBackend + 'static
Self { db: Arc::new(db) }
}

pub async fn get_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
pub fn get_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
let db_clone = self.db.clone();
tokio::task::spawn_blocking(move || fetch!(db_clone, pub_key.clone(), Contact))
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)
fetch!(db_clone, pub_key, Contact)
}

pub async fn get_contacts(&self) -> Result<Vec<Contact>, ContactsServiceStorageError> {
pub fn get_contacts(&self) -> Result<Vec<Contact>, ContactsServiceStorageError> {
let db_clone = self.db.clone();

let c = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::Contacts) {
match db_clone.fetch(&DbKey::Contacts) {
Ok(None) => log_error(
DbKey::Contacts,
ContactsServiceStorageError::UnexpectedResult("Could not retrieve contacts".to_string()),
),
Ok(Some(DbValue::Contacts(c))) => Ok(c),
Ok(Some(other)) => unexpected_result(DbKey::Contacts, other),
Err(e) => log_error(DbKey::Contacts, e),
})
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))??;
Ok(c)
}
}

pub async fn upsert_contact(&self, contact: Contact) -> Result<(), ContactsServiceStorageError> {
let db_clone = self.db.clone();

tokio::task::spawn_blocking(move || {
db_clone.write(WriteOperation::Upsert(Box::new(DbKeyValuePair::Contact(
contact.public_key.clone(),
contact,
))))
})
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))??;
pub fn upsert_contact(&self, contact: Contact) -> Result<(), ContactsServiceStorageError> {
self.db.write(WriteOperation::Upsert(Box::new(DbKeyValuePair::Contact(
contact.public_key.clone(),
contact,
))))?;
Ok(())
}

pub async fn update_contact_last_seen(
pub fn update_contact_last_seen(
&self,
node_id: &NodeId,
last_seen: NaiveDateTime,
latency: Option<u32>,
) -> Result<CommsPublicKey, ContactsServiceStorageError> {
let db_clone = self.db.clone();
let node_id_clone = node_id.clone();

let result = tokio::task::spawn_blocking(move || {
db_clone.write(WriteOperation::UpdateLastSeen(Box::new(DbKeyValuePair::LastSeen(
node_id_clone,
let result = self
.db
.write(WriteOperation::UpdateLastSeen(Box::new(DbKeyValuePair::LastSeen(
node_id.clone(),
last_seen,
latency.map(|val| val as i32),
))))
})
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::ContactId(node_id.clone())))?;
))))?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::ContactId(node_id.clone())))?;
match result {
DbValue::PublicKey(k) => Ok(*k),
_ => Err(ContactsServiceStorageError::UnexpectedResult(
Expand All @@ -185,16 +166,11 @@ where T: ContactsBackend + 'static
}
}

pub async fn remove_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
let db_clone = self.db.clone();
let pub_key_clone = pub_key.clone();
let result =
tokio::task::spawn_blocking(move || db_clone.write(WriteOperation::Remove(DbKey::Contact(pub_key_clone))))
.await
.map_err(|err| ContactsServiceStorageError::BlockingTaskSpawnError(err.to_string()))
.and_then(|inner_result| inner_result)?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::Contact(pub_key.clone())))?;

pub fn remove_contact(&self, pub_key: CommsPublicKey) -> Result<Contact, ContactsServiceStorageError> {
let result = self
.db
.write(WriteOperation::Remove(DbKey::Contact(pub_key.clone())))?
.ok_or_else(|| ContactsServiceStorageError::ValueNotFound(DbKey::Contact(pub_key.clone())))?;
match result {
DbValue::Contact(c) => Ok(*c),
DbValue::Contacts(_) | DbValue::PublicKey(_) => Err(ContactsServiceStorageError::UnexpectedResult(
Expand Down
4 changes: 2 additions & 2 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
# DO NOT EVER DELETE THIS FILE unless you (a) have backed up your seed phrase and (b) know what you are doing!
#db_file = "db/console_wallet.db"

# The main wallet db sqlite database backend connection pool size for concurrent reads (default = 5)
#db_connection_pool_size = 5
# The main wallet db sqlite database backend connection pool size for concurrent reads (default = 16)
#db_connection_pool_size = 16

# Console wallet password. Should you wish to start your console wallet without typing in your password, the following
# options are available:
Expand Down