Skip to content

Commit

Permalink
feat!: add deleted height to unique_id index (#3506)
Browse files Browse the repository at this point in the history
* feat: add deleted height to unique_id index

- adds the spend/deleted height to the asset id index when a unique id
  UTXO is spent
- head unique_id (ie. <asset_pk, unique_id> in current UTXO set) has
  "special" key in unique_id_index
- handle reorg code to update unique_id_index
- replace hex string keys with byte keys in kernel and input dbs
- add test for fetch_utxo_by_unique_id with deleted height param

* fix lil clippy

* fix more clippy
  • Loading branch information
sdbondi authored Nov 3, 2021
1 parent b1ee1df commit 70df667
Show file tree
Hide file tree
Showing 18 changed files with 648 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ where T: BlockchainBackend + 'static
for id in unique_ids {
let output = self
.blockchain_db
.fetch_utxo_by_unique_id(Some(asset_public_key.clone()), id)
.fetch_utxo_by_unique_id(Some(asset_public_key.clone()), id, None)
.await?;
if let Some(out) = output {
match out.output {
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/src/blocks/block_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ pub(crate) mod hash_serializer {
mod test {
use crate::blocks::BlockHeader;
use tari_crypto::tari_utilities::Hashable;

#[test]
fn from_previous() {
let mut h1 = crate::proof_of_work::sha3_test::get_header();
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, deleted: Arc<Bitmap>) -> (Vec<PrunedOutput>, Bitmap), "fetch_utxos_by_mmr_position");

make_async_fn!(fetch_utxo_by_unique_id(parent_public_key: Option<PublicKey>,unique_id: HashOutput) -> Option<UtxoMinedInfo>, "fetch_utxo_by_unique_id");
make_async_fn!(fetch_utxo_by_unique_id(parent_public_key: Option<PublicKey>,unique_id: HashOutput, deleted_at: Option<u64>) -> Option<UtxoMinedInfo>, "fetch_utxo_by_unique_id");

make_async_fn!(fetch_all_unspent_by_parent_public_key(
parent_public_key: PublicKey,
Expand Down
5 changes: 3 additions & 2 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ pub trait BlockchainBackend: Send + Sync {
commitment: &Commitment,
) -> Result<Option<HashOutput>, ChainStorageError>;

/// Returns the unspent TransactionOutput output that matches the given unique_id if it exists in the current UTXO
/// set, otherwise None is returned.
/// Returns the unspent TransactionOutput output that matches the given unique_id if it exists, otherwise None is
/// returned.
fn fetch_utxo_by_unique_id(
&self,
parent_public_key: Option<&PublicKey>,
unique_id: &[u8],
deleted_at: Option<u64>,
) -> Result<Option<UtxoMinedInfo>, ChainStorageError>;

/// Returns all unspent outputs with a parent public key
Expand Down
3 changes: 2 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@ where B: BlockchainBackend
&self,
parent_public_key: Option<PublicKey>,
unique_id: HashOutput,
deleted_at: Option<u64>,
) -> Result<Option<UtxoMinedInfo>, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_utxo_by_unique_id(parent_public_key.as_ref(), &unique_id)
db.fetch_utxo_by_unique_id(parent_public_key.as_ref(), &unique_id, deleted_at)
}

pub fn fetch_all_unspent_by_parent_public_key(
Expand Down
49 changes: 49 additions & 0 deletions base_layer/core/src/chain_storage/lmdb_db/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::chain_storage::ChainStorageError;
use lmdb_zero::error;
use log::*;
use serde::{de::DeserializeOwned, Serialize};

pub const LOG_TARGET: &str = "c::cs::lmdb_db::lmdb";

pub fn serialize<T>(data: &T) -> Result<Vec<u8>, ChainStorageError>
where T: Serialize {
let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?;
let mut buf = Vec::with_capacity(size as usize);
bincode::serialize_into(&mut buf, data).map_err(|e| {
error!(target: LOG_TARGET, "Could not serialize lmdb: {:?}", e);
ChainStorageError::AccessError(e.to_string())
})?;
Ok(buf)
}

pub fn deserialize<T>(buf_bytes: &[u8]) -> Result<T, error::Error>
where T: DeserializeOwned {
bincode::deserialize(buf_bytes)
.map_err(|e| {
error!(target: LOG_TARGET, "Could not deserialize lmdb: {:?}", e);
e
})
.map_err(|e| error::Error::ValRejected(e.to_string()))
}
104 changes: 104 additions & 0 deletions base_layer/core/src/chain_storage/lmdb_db/key_prefix_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::chain_storage::{lmdb_db::helpers::deserialize, ChainStorageError};
use lmdb_zero::{ConstAccessor, Cursor, LmdbResultExt};
use serde::de::DeserializeOwned;
use std::marker::PhantomData;

pub struct KeyPrefixCursor<'a, V> {
cursor: Cursor<'a, 'a>,
value_type: PhantomData<V>,
prefix_key: &'a [u8],
access: ConstAccessor<'a>,
has_seeked: bool,
}

impl<'a, V> KeyPrefixCursor<'a, V>
where V: DeserializeOwned
{
pub(super) fn new(cursor: Cursor<'a, 'a>, access: ConstAccessor<'a>, prefix_key: &'a [u8]) -> Self {
Self {
cursor,
access,
prefix_key,
value_type: PhantomData,
has_seeked: false,
}
}

/// Returns the item on or after the key prefix, progressing forwards until the key prefix no longer matches
pub fn next(&mut self) -> Result<Option<(Vec<u8>, V)>, ChainStorageError> {
if !self.has_seeked {
if let Some((k, val)) = self.seek_gte(self.prefix_key)? {
return Ok(Some((k, val)));
}
}

match self.cursor.next(&self.access).to_opt()? {
Some((k, v)) => Self::deserialize_if_matches(self.prefix_key, k, v),
None => Ok(None),
}
}

/// Returns the item on or before the given seek key, progressing backwards until the key prefix no longer matches
pub fn prev(&mut self) -> Result<Option<(Vec<u8>, V)>, ChainStorageError> {
if !self.has_seeked {
let prefix_key = self.prefix_key;
if let Some((k, val)) = self.seek_gte(prefix_key)? {
// seek_range_k returns the greater key, so we only want to return the current value that was seeked to
// if it exactly matches the prefix_key
if k == prefix_key {
return Ok(Some((k, val)));
}
}
}

match self.cursor.prev(&self.access).to_opt()? {
Some((k, v)) => Self::deserialize_if_matches(self.prefix_key, k, v),
None => Ok(None),
}
}

pub fn seek_gte(&mut self, key: &[u8]) -> Result<Option<(Vec<u8>, V)>, ChainStorageError> {
self.has_seeked = true;
let seek_result = self.cursor.seek_range_k(&self.access, key).to_opt()?;
let (k, v) = match seek_result {
Some(r) => r,
None => return Ok(None),
};
Self::deserialize_if_matches(key, k, v)
}

fn deserialize_if_matches(
key_prefix: &[u8],
k: &[u8],
v: &[u8],
) -> Result<Option<(Vec<u8>, V)>, ChainStorageError> {
let prefix_len = key_prefix.len();
if k.len() < prefix_len || k[..prefix_len] != *key_prefix {
return Ok(None);
}
let val = deserialize::<V>(v)?;
Ok(Some((k.to_vec(), val)))
}
}
109 changes: 30 additions & 79 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::chain_storage::{error::ChainStorageError, OrNotFound};
use crate::chain_storage::{
error::ChainStorageError,
lmdb_db::{
helpers::{deserialize, serialize},
key_prefix_cursor::KeyPrefixCursor,
},
OrNotFound,
};
use lmdb_zero::{
del,
error::{self, LmdbResultExt},
Expand All @@ -37,34 +44,10 @@ use lmdb_zero::{
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
use tari_crypto::tari_utilities::hex::{to_hex, Hex};
use tari_crypto::tari_utilities::hex::to_hex;

pub const LOG_TARGET: &str = "c::cs::lmdb_db::lmdb";

// TODO: Calling `access` for every lmdb operation has some overhead (an atomic read and set). Check if is possible to
// pass an Accessor instead of the WriteTransaction?

pub fn serialize<T>(data: &T) -> Result<Vec<u8>, ChainStorageError>
where T: Serialize {
let size = bincode::serialized_size(&data).map_err(|e| ChainStorageError::AccessError(e.to_string()))?;
let mut buf = Vec::with_capacity(size as usize);
bincode::serialize_into(&mut buf, data).map_err(|e| {
error!(target: LOG_TARGET, "Could not serialize lmdb: {:?}", e);
ChainStorageError::AccessError(e.to_string())
})?;
Ok(buf)
}

pub fn deserialize<T>(buf_bytes: &[u8]) -> Result<T, error::Error>
where T: DeserializeOwned {
bincode::deserialize(buf_bytes)
.map_err(|e| {
error!(target: LOG_TARGET, "Could not deserialize lmdb: {:?}", e);
e
})
.map_err(|e| error::Error::ValRejected(e.to_string()))
}

pub fn lmdb_insert<K, V>(
txn: &WriteTransaction<'_>,
db: &Database,
Expand Down Expand Up @@ -128,7 +111,7 @@ where
})
}

/// Inserts or replaces the item at the given key
/// Inserts or replaces the item at the given key. If the key does not exist, a new entry is created
pub fn lmdb_replace<K, V>(txn: &WriteTransaction<'_>, db: &Database, key: &K, val: &V) -> Result<(), ChainStorageError>
where
K: AsLmdbBytes + ?Sized,
Expand Down Expand Up @@ -182,7 +165,7 @@ where
pub fn lmdb_delete_keys_starting_with<V>(
txn: &WriteTransaction<'_>,
db: &Database,
key: &str,
key: &[u8],
) -> Result<Vec<V>, ChainStorageError>
where
V: DeserializeOwned,
Expand All @@ -193,22 +176,20 @@ where
ChainStorageError::AccessError(e.to_string())
})?;

debug!(target: LOG_TARGET, "Deleting rows matching pattern: {}", key);

let mut row = match cursor.seek_range_k(&access, key) {
Ok(r) => r,
Err(_) => return Ok(vec![]),
};
trace!(target: LOG_TARGET, "Key: {}", row.0);
trace!(target: LOG_TARGET, "Key: {}", to_hex(row.0));
let mut result = vec![];
while row.0.starts_with(key) {
while row.0[..key.len()] == *key {
let val = deserialize::<V>(row.1)?;
result.push(val);
cursor.del(&mut access, del::NODUPDATA)?;
row = match cursor.next(&access) {
Ok(r) => r,
Err(_) => break,
}
row = match cursor.next(&access).to_opt()? {
Some(r) => r,
None => break,
};
}
Ok(result)
}
Expand Down Expand Up @@ -308,67 +289,37 @@ pub fn lmdb_len(txn: &ConstTransaction<'_>, db: &Database) -> Result<usize, Chai
Ok(stats.entries)
}

pub fn lmdb_fetch_keys_starting_with<V>(
key: &str,
txn: &ConstTransaction<'_>,
db: &Database,
) -> Result<Vec<V>, ChainStorageError>
/// Return a cursor that iterates, either backwards or forwards through keys matching the given prefix
pub fn lmdb_get_prefix_cursor<'a, V>(
txn: &'a ConstTransaction<'a>,
db: &'a Database,
prefix_key: &'a [u8],
) -> Result<KeyPrefixCursor<'a, V>, ChainStorageError>
where
V: DeserializeOwned,
{
let access = txn.access();
let mut cursor = txn.cursor(db).map_err(|e| {

let cursor = txn.cursor(&*db).map_err(|e| {
error!(target: LOG_TARGET, "Could not get read cursor from lmdb: {:?}", e);
ChainStorageError::AccessError(e.to_string())
})?;

trace!(target: LOG_TARGET, "Getting rows matching pattern: {}", key);

let mut row = match cursor.seek_range_k(&access, key) {
Ok(r) => r,
Err(_) => return Ok(vec![]),
};
trace!(target: LOG_TARGET, "Key: {}", row.0);
let mut result = vec![];
while row.0.starts_with(key) {
let val = deserialize::<V>(row.1)?;
result.push(val);
row = match cursor.next(&access) {
Ok(r) => r,
Err(_) => break,
}
}
Ok(result)
Ok(KeyPrefixCursor::new(cursor, access, prefix_key))
}

pub fn lmdb_fetch_keys_starting_with_bytes<V>(
key: &[u8],
pub fn lmdb_fetch_matching_after<V>(
txn: &ConstTransaction<'_>,
db: &Database,
key_prefix: &[u8],
) -> Result<Vec<V>, ChainStorageError>
where
V: DeserializeOwned,
{
let access = txn.access();
let mut cursor = txn.cursor(db).map_err(|e| {
error!(target: LOG_TARGET, "Could not get read cursor from lmdb: {:?}", e);
ChainStorageError::AccessError(e.to_string())
})?;

let mut row = match cursor.seek_range_k(&access, key) {
Ok(r) => r,
Err(_) => return Ok(vec![]),
};
debug!(target: LOG_TARGET, "Row found: {}", row.0.to_vec().to_hex());
let mut cursor = lmdb_get_prefix_cursor(txn, db, key_prefix)?;
let mut result = vec![];
while &row.0[0..key.len()] == key {
let val = deserialize::<V>(row.1)?;
while let Some((_, val)) = cursor.next()? {
result.push(val);
row = match cursor.next(&access) {
Ok(r) => r,
Err(_) => break,
};
debug!(target: LOG_TARGET, "Row found: {}", row.0.to_vec().to_hex());
}
Ok(result)
}
Expand Down
Loading

0 comments on commit 70df667

Please sign in to comment.