Skip to content

Commit

Permalink
Allow using skip_index_and_usage on state sync code path. (aptos-labs…
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored and Zekun Wang committed Oct 13, 2023
1 parent 80ac3b2 commit 378e1b1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
2 changes: 1 addition & 1 deletion storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2119,7 +2119,7 @@ impl DbWriter for AptosDB {
first_version,
latest_in_memory_state.current.usage(),
None,
/*skip_index_and_usage=*/ false,
self.skip_index_and_usage,
)?;

{
Expand Down
59 changes: 32 additions & 27 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,20 @@ impl DbReader for StateDb {
}

fn get_state_storage_usage(&self, version: Option<Version>) -> Result<StateStorageUsage> {
if self.skip_usage {
return Ok(StateStorageUsage::new_untracked());
}
version.map_or(Ok(StateStorageUsage::zero()), |version| {
Ok(self
.ledger_db
.metadata_db()
.get::<VersionDataSchema>(&version)?
.ok_or_else(|| AptosDbError::NotFound(format!("VersionData at {}", version)))?
.get_state_storage_usage())
Ok(
match self
.ledger_db
.metadata_db()
.get::<VersionDataSchema>(&version)?
{
Some(data) => data.get_state_storage_usage(),
None => {
ensure!(self.skip_usage, "VersionData at {version} is missing.");
StateStorageUsage::new_untracked()
},
},
)
})
}
}
Expand Down Expand Up @@ -887,26 +891,28 @@ impl StateStore {
})
.collect();

if !skip_usage {
for i in 0..num_versions {
let mut items_delta = 0;
let mut bytes_delta = 0;
for usage_delta in usage_deltas.iter() {
items_delta += usage_delta[i].0;
bytes_delta += usage_delta[i].1;
}
usage = StateStorageUsage::new(
(usage.items() as i64 + items_delta) as usize,
(usage.bytes() as i64 + bytes_delta) as usize,
);
for i in 0..num_versions {
let mut items_delta = 0;
let mut bytes_delta = 0;
for usage_delta in usage_deltas.iter() {
items_delta += usage_delta[i].0;
bytes_delta += usage_delta[i].1;
}
usage = StateStorageUsage::new(
(usage.items() as i64 + items_delta) as usize,
(usage.bytes() as i64 + bytes_delta) as usize,
);
if !skip_usage || i == num_versions - 1 {
let version = first_version + i as u64;
info!("Write usage at version {version}, {usage:?}.");
batch
.put::<VersionDataSchema>(&version, &usage.into())
.unwrap();
}
}

if !expected_usage.is_untracked() {
ensure!(
if !expected_usage.is_untracked() {
ensure!(
expected_usage == usage,
"Calculated state db usage at version {} not expected. expected: {:?}, calculated: {:?}, base version: {:?}, base version usage: {:?}",
first_version + value_state_sets.len() as u64 - 1,
Expand All @@ -915,12 +921,11 @@ impl StateStore {
base_version,
base_version_usage,
);
}

STATE_ITEMS.set(usage.items() as i64);
TOTAL_STATE_BYTES.set(usage.bytes() as i64);
}

STATE_ITEMS.set(usage.items() as i64);
TOTAL_STATE_BYTES.set(usage.bytes() as i64);

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions storage/aptosdb/src/utils/truncation_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub(crate) fn get_state_merkle_commit_progress(
)
}

// TODO(grao): Not all target version is valid. e.g. If we turn on skip_index_and_usage, we can
// only truncate to a point with usage persisted (end of chunk or block).
pub(crate) fn truncate_ledger_db(ledger_db: Arc<LedgerDb>, target_version: Version) -> Result<()> {
let event_store = EventStore::new(ledger_db.event_db_arc());
let transaction_store = TransactionStore::new(Arc::clone(&ledger_db));
Expand Down

0 comments on commit 378e1b1

Please sign in to comment.