Skip to content

Commit

Permalink
feat: improve lmdb dynamic growth (#6242)
Browse files Browse the repository at this point in the history
Description
---
- Improved the LMDB resizing during block sync of many consecutive full
blocks; this is required due to how the SMT works currently as it is
replaced for every new block.
- Added SMT database write profiling measurements.

Motivation and Context
---
See above.

How Has This Been Tested?
---
- System-level testing in Windows and Ubuntu - archival node fresh block
sync of many full blocks as generated during a stress test on
`esmeralda`.
- `fn insert_tip_smt(&self, txn: &WriteTransaction<'_>, smt:
&OutputSmt)` always succeeded with these settings.


![image](https://github.com/tari-project/tari/assets/39146854/fadd5c51-f98b-43b3-8cae-1c4fcdf58850)


![image](https://github.com/tari-project/tari/assets/39146854/411a1ef2-663c-4c34-838b-fde74042da99)


What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal authored Apr 4, 2024
1 parent 70319cd commit b48a830
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 120 deletions.
1 change: 1 addition & 0 deletions applications/minotari_console_wallet/src/ui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ fn crossterm_loop(mut app: App<CrosstermBackend<Stdout>>) -> Result<(), ExitErro
error!(target: LOG_TARGET, "Error drawing interface. {}", e);
ExitCode::InterfaceError
})?;
#[allow(clippy::blocks_in_conditions)]
match events.next().map_err(|e| {
error!(target: LOG_TARGET, "Error reading input event: {}", e);
ExitCode::InterfaceError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ where B: BlockchainBackend + 'static
"A peer has requested a block with hash {}", block_hex
);

#[allow(clippy::blocks_in_conditions)]
let maybe_block = match self
.blockchain_db
.fetch_block_by_hash(hash, true)
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncRpcService<B> {
#[tari_comms::async_trait]
impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcService<B> {
#[instrument(level = "trace", name = "sync_rpc::sync_blocks", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_blocks(
&self,
request: Request<SyncBlocksRequest>,
Expand Down Expand Up @@ -273,6 +274,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", name = "sync_rpc::sync_headers", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_headers(
&self,
request: Request<SyncHeadersRequest>,
Expand Down Expand Up @@ -373,6 +375,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn get_header_by_height(
&self,
request: Request<u64>,
Expand All @@ -389,6 +392,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "debug", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn find_chain_split(
&self,
request: Request<FindChainSplitRequest>,
Expand Down Expand Up @@ -452,6 +456,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn get_chain_metadata(&self, _: Request<()>) -> Result<Response<proto::base_node::ChainMetadata>, RpcStatus> {
let chain_metadata = self
.db()
Expand All @@ -462,6 +467,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_kernels(
&self,
request: Request<SyncKernelsRequest>,
Expand Down Expand Up @@ -588,6 +594,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
}

#[instrument(level = "trace", skip(self), err)]
#[allow(clippy::blocks_in_conditions)]
async fn sync_utxos(&self, request: Request<SyncUtxosRequest>) -> Result<Streaming<SyncUtxosResponse>, RpcStatus> {
let req = request.message();
let peer_node_id = request.context().peer_node_id();
Expand Down
38 changes: 34 additions & 4 deletions base_layer/core/src/chain_storage/lmdb_db/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,53 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Instant;

use lmdb_zero::error;
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use tari_storage::lmdb_store::BYTES_PER_MB;

use crate::chain_storage::ChainStorageError;

pub const LOG_TARGET: &str = "c::cs::lmdb_db::lmdb";

pub fn serialize<T>(data: &T) -> Result<Vec<u8>, ChainStorageError>
/// Serialize the given data into a byte vector
/// Note:
/// `size_hint` is given as an option as checking what the serialized would be is expensive
/// for large data structures at ~30% overhead
pub fn serialize<T>(data: &T, size_hint: Option<usize>) -> Result<Vec<u8>, ChainStorageError>
where T: Serialize {
let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?;
#[allow(clippy::cast_possible_truncation)]
let mut buf = Vec::with_capacity(size as usize);
let start = Instant::now();
let mut buf = if let Some(size) = size_hint {
Vec::with_capacity(size)
} else {
let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?;
#[allow(clippy::cast_possible_truncation)]
Vec::with_capacity(size as usize)
};
let check_time = start.elapsed();
bincode::serialize_into(&mut buf, data).map_err(|e| {
error!(target: LOG_TARGET, "Could not serialize lmdb: {:?}", e);
ChainStorageError::AccessError(e.to_string())
})?;
if buf.len() >= BYTES_PER_MB {
let serialize_time = start.elapsed() - check_time;
trace!(
"lmdb_replace - {} MB, serialize check in {:.2?}, serialize in {:.2?}",
buf.len() / BYTES_PER_MB,
check_time,
serialize_time
);
}
if let Some(size) = size_hint {
if buf.len() > size {
warn!(
target: LOG_TARGET,
"lmdb_replace - Serialized size hint was too small. Expected {}, got {}", size, buf.len()
);
}
}
Ok(buf)
}

Expand Down
33 changes: 25 additions & 8 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::fmt::Debug;
use std::{fmt::Debug, time::Instant};

use lmdb_zero::{
del,
Expand All @@ -37,6 +37,7 @@ use lmdb_zero::{
};
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use tari_storage::lmdb_store::BYTES_PER_MB;
use tari_utilities::hex::to_hex;

use crate::chain_storage::{
Expand All @@ -62,7 +63,7 @@ where
K: AsLmdbBytes + ?Sized + Debug,
V: Serialize + Debug,
{
let val_buf = serialize(val)?;
let val_buf = serialize(val, None)?;
match txn.access().put(db, key, &val_buf, put::NOOVERWRITE) {
Ok(_) => {
trace!(
Expand Down Expand Up @@ -112,7 +113,7 @@ where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
let val_buf = serialize(val)?;
let val_buf = serialize(val, None)?;
txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| {
if let lmdb_zero::Error::Code(code) = &e {
if *code == lmdb_zero::error::MAP_FULL {
Expand All @@ -128,13 +129,20 @@ where
}

/// Inserts or replaces the item at the given key. If the key does not exist, a new entry is created
pub fn lmdb_replace<K, V>(txn: &WriteTransaction<'_>, db: &Database, key: &K, val: &V) -> Result<(), ChainStorageError>
pub fn lmdb_replace<K, V>(
txn: &WriteTransaction<'_>,
db: &Database,
key: &K,
val: &V,
size_hint: Option<usize>,
) -> Result<(), ChainStorageError>
where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
let val_buf = serialize(val)?;
txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| {
let val_buf = serialize(val, size_hint)?;
let start = Instant::now();
let res = txn.access().put(db, key, &val_buf, put::Flags::empty()).map_err(|e| {
if let lmdb_zero::Error::Code(code) = &e {
if *code == lmdb_zero::error::MAP_FULL {
return ChainStorageError::DbResizeRequired(Some(val_buf.len()));
Expand All @@ -145,7 +153,16 @@ where
"Could not replace value in lmdb transaction: {:?}", e
);
ChainStorageError::AccessError(e.to_string())
})
});
if val_buf.len() >= BYTES_PER_MB {
let write_time = start.elapsed();
trace!(
"lmdb_replace - {} MB, lmdb write in {:.2?}",
val_buf.len() / BYTES_PER_MB,
write_time
);
}
res
}

/// Deletes the given key. An error is returned if the key does not exist
Expand Down Expand Up @@ -175,7 +192,7 @@ where
K: AsLmdbBytes + ?Sized,
V: Serialize,
{
txn.access().del_item(db, key, &serialize(value)?)?;
txn.access().del_item(db, key, &serialize(value, None)?)?;
Ok(())
}

Expand Down
Loading

0 comments on commit b48a830

Please sign in to comment.