Skip to content

Commit

Permalink
Add a few counters for BlockSTM size
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Mar 19, 2024
1 parent 660c701 commit b9bf67b
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 15 deletions.
11 changes: 11 additions & 0 deletions aptos-move/aptos-aggregator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ impl DelayedFieldValue {
},
})
}

/// Approximate memory consumption of current DelayedFieldValue
pub fn get_approximate_memory_size(&self) -> usize {
// 32 + len
std::mem::size_of::<DelayedFieldValue>()
+ match &self {
DelayedFieldValue::Aggregator(_) | DelayedFieldValue::Snapshot(_) => 0,
// additional allocated memory for the data:
DelayedFieldValue::Derived(v) => v.len(),
}
}
}

impl TryFromMoveValue for DelayedFieldValue {
Expand Down
50 changes: 48 additions & 2 deletions aptos-move/block-executor/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{
exponential_buckets, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, Histogram, HistogramVec, IntCounter, IntCounterVec,
exponential_buckets, register_avg_counter_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_vec, Histogram, HistogramVec, IntCounter,
IntCounterVec,
};
use aptos_mvhashmap::BlockStateStats;
use aptos_types::fee_statement::FeeStatement;
use once_cell::sync::Lazy;

Expand Down Expand Up @@ -211,6 +213,22 @@ pub static BLOCK_COMMITTED_TXNS: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});

pub static BLOCK_VIEW_DISTINCT_KEYS: Lazy<HistogramVec> = Lazy::new(|| {
register_avg_counter_vec(
"aptos_execution_block_view_distinct_keys",
"Size (number of keys) ",
&["mode", "object_type"],
)
});

pub static BLOCK_VIEW_BASE_VALUES_MEMORY_USAGE: Lazy<HistogramVec> = Lazy::new(|| {
register_avg_counter_vec(
"aptos_execution_block_view_base_values_memory_usage",
"Memory usage (in bytes) for base values",
&["mode", "object_type"],
)
});

fn observe_gas(counter: &Lazy<HistogramVec>, mode_str: &str, fee_statement: &FeeStatement) {
counter
.with_label_values(&[mode_str, GasType::TOTAL_GAS])
Expand Down Expand Up @@ -275,3 +293,31 @@ pub(crate) fn update_txn_gas_counters(txn_fee_statements: &Vec<FeeStatement>, is
observe_gas(&TXN_GAS, mode_str, fee_statement);
}
}

pub(crate) fn update_state_counters(block_state_stats: BlockStateStats, is_parallel: bool) {
let mode_str = if is_parallel {
Mode::PARALLEL
} else {
Mode::SEQUENTIAL
};

BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "resource"])
.observe(block_state_stats.num_resources as f64);
BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "resource_group"])
.observe(block_state_stats.num_resource_groups as f64);
BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "delayed_field"])
.observe(block_state_stats.num_delayed_fields as f64);
BLOCK_VIEW_DISTINCT_KEYS
.with_label_values(&[mode_str, "module"])
.observe(block_state_stats.num_modules as f64);

BLOCK_VIEW_BASE_VALUES_MEMORY_USAGE
.with_label_values(&[mode_str, "resource"])
.observe(block_state_stats.base_resources_size as f64);
BLOCK_VIEW_BASE_VALUES_MEMORY_USAGE
.with_label_values(&[mode_str, "delayed_field"])
.observe(block_state_stats.base_delayed_fields_size as f64);
}
5 changes: 5 additions & 0 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,9 @@ where
}
});
drop(timer);

counters::update_state_counters(versioned_cache.stats(), true);

// Explicit async drops.
DEFAULT_DROPPER.schedule_drop((last_input_output, scheduler, versioned_cache));

Expand Down Expand Up @@ -1272,6 +1275,8 @@ where

ret.resize_with(num_txns, E::Output::skip_output);

counters::update_state_counters(unsync_map.stats(), false);

// TODO add block end info to output.
// block_limit_processor.is_block_limit_reached();

Expand Down
2 changes: 1 addition & 1 deletion aptos-move/block-executor/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ impl<'a, T: Transaction, X: Executable> SequentialState<'a, T, X> {
}

pub(crate) fn set_delayed_field_value(&self, id: T::Identifier, base_value: DelayedFieldValue) {
self.unsync_map.write_delayed_field(id, base_value)
self.unsync_map.set_base_delayed_field(id, base_value)
}

pub(crate) fn read_delayed_field(&self, id: T::Identifier) -> Option<DelayedFieldValue> {
Expand Down
21 changes: 21 additions & 0 deletions aptos-move/mvhashmap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ impl<
}
}

pub fn stats(&self) -> BlockStateStats {
BlockStateStats {
num_resources: self.data.num_keys(),
num_resource_groups: self.group_data.num_keys(),
num_delayed_fields: self.delayed_fields.num_keys(),
num_modules: self.modules.num_keys(),
base_resources_size: self.data.total_base_value_size(),
base_delayed_fields_size: self.delayed_fields.total_base_value_size(),
}
}

/// Contains 'simple' versioned data (nothing contained in groups).
pub fn data(&self) -> &VersionedData<K, V> {
&self.data
Expand Down Expand Up @@ -92,3 +103,13 @@ impl<
Self::new()
}
}

pub struct BlockStateStats {
pub num_resources: usize,
pub num_resource_groups: usize,
pub num_delayed_fields: usize,
pub num_modules: usize,

pub base_resources_size: u64,
pub base_delayed_fields_size: u64,
}
44 changes: 42 additions & 2 deletions aptos-move/mvhashmap/src/unsync_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{
types::{GroupReadResult, MVModulesOutput, UnsyncGroupError, ValueWithLayout},
utils::module_hash,
BlockStateStats,
};
use aptos_aggregator::types::{code_invariant_error, DelayedFieldValue};
use aptos_crypto::hash::HashValue;
Expand All @@ -16,7 +17,16 @@ use aptos_vm_types::resource_group_adapter::group_size_as_sum;
use move_binary_format::errors::PartialVMResult;
use move_core_types::value::MoveTypeLayout;
use serde::Serialize;
use std::{cell::RefCell, collections::HashMap, fmt::Debug, hash::Hash, sync::Arc};
use std::{
cell::RefCell,
collections::HashMap,
fmt::Debug,
hash::Hash,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

/// UnsyncMap is designed to mimic the functionality of MVHashMap for sequential execution.
/// In this case only the latest recorded version is relevant, simplifying the implementation.
Expand All @@ -38,6 +48,9 @@ pub struct UnsyncMap<
executable_cache: RefCell<HashMap<HashValue, Arc<X>>>,
executable_bytes: RefCell<usize>,
delayed_field_map: RefCell<HashMap<I, DelayedFieldValue>>,

total_base_resource_size: AtomicU64,
total_base_delayed_field_size: AtomicU64,
}

impl<
Expand All @@ -56,6 +69,8 @@ impl<
executable_cache: RefCell::new(HashMap::new()),
executable_bytes: RefCell::new(0),
delayed_field_map: RefCell::new(HashMap::new()),
total_base_resource_size: AtomicU64::new(0),
total_base_delayed_field_size: AtomicU64::new(0),
}
}
}
Expand All @@ -72,6 +87,17 @@ impl<
Self::default()
}

pub fn stats(&self) -> BlockStateStats {
BlockStateStats {
num_resources: self.resource_map.borrow().len(),
num_resource_groups: self.group_cache.borrow().len(),
num_delayed_fields: self.delayed_field_map.borrow().len(),
num_modules: self.module_map.borrow().len(),
base_resources_size: self.total_base_resource_size.load(Ordering::Relaxed),
base_delayed_fields_size: self.total_base_delayed_field_size.load(Ordering::Relaxed),
}
}

pub fn set_group_base_values(
&self,
group_key: K,
Expand Down Expand Up @@ -245,7 +271,13 @@ impl<
}

pub fn set_base_value(&self, key: K, value: ValueWithLayout<V>) {
self.resource_map.borrow_mut().insert(key, value);
let cur_size = value.bytes_len();
if self.resource_map.borrow_mut().insert(key, value).is_none() {
if let Some(cur_size) = cur_size {
self.total_base_resource_size
.fetch_add(cur_size as u64, Ordering::Relaxed);
}
}
}

/// We return false if the executable was already stored, as this isn't supposed to happen
Expand Down Expand Up @@ -274,6 +306,14 @@ impl<
pub fn write_delayed_field(&self, id: I, value: DelayedFieldValue) {
self.delayed_field_map.borrow_mut().insert(id, value);
}

pub fn set_base_delayed_field(&self, id: I, value: DelayedFieldValue) {
self.total_base_delayed_field_size.fetch_add(
value.get_approximate_memory_size() as u64,
Ordering::Relaxed,
);
self.delayed_field_map.borrow_mut().insert(id, value);
}
}

#[cfg(test)]
Expand Down
19 changes: 18 additions & 1 deletion aptos-move/mvhashmap/src/versioned_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{
collections::btree_map::{self, BTreeMap},
fmt::Debug,
hash::Hash,
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

/// Every entry in shared multi-version data-structure has an "estimate" flag
Expand Down Expand Up @@ -52,6 +55,7 @@ struct VersionedValue<V> {
/// Maps each key (access path) to an internal versioned value representation.
pub struct VersionedData<K, V> {
values: DashMap<K, VersionedValue<V>>,
total_base_value_size: AtomicU64,
}

impl<V> Entry<V> {
Expand Down Expand Up @@ -213,9 +217,18 @@ impl<K: Hash + Clone + Debug + Eq, V: TransactionWrite> VersionedData<K, V> {
pub(crate) fn new() -> Self {
Self {
values: DashMap::new(),
total_base_value_size: AtomicU64::new(0),
}
}

pub(crate) fn num_keys(&self) -> usize {
self.values.len()
}

pub(crate) fn total_base_value_size(&self) -> u64 {
self.total_base_value_size.load(Ordering::Relaxed)
}

pub fn add_delta(&self, key: K, txn_idx: TxnIndex, delta: DeltaOp) {
let mut v = self.values.entry(key).or_default();
v.versioned_map.insert(
Expand Down Expand Up @@ -278,6 +291,10 @@ impl<K: Hash + Clone + Debug + Eq, V: TransactionWrite> VersionedData<K, V> {
use ValueWithLayout::*;
match v.versioned_map.entry(ShiftedTxnIndex::zero_idx()) {
Vacant(v) => {
if let Some(base_size) = value.bytes_len() {
self.total_base_value_size
.fetch_add(base_size as u64, Ordering::Relaxed);
}
v.insert(CachePadded::new(Entry::new_write_from(0, value)));
},
Occupied(mut o) => {
Expand Down
27 changes: 21 additions & 6 deletions aptos-move/mvhashmap/src/versioned_delayed_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
fmt::Debug,
hash::Hash,
iter::DoubleEndedIterator,
sync::atomic::Ordering,
sync::atomic::{AtomicU64, Ordering},
};

pub enum CommitError {
Expand Down Expand Up @@ -378,7 +378,9 @@ pub struct VersionedDelayedFields<K: Clone> {

/// No deltas are allowed below next_idx_to_commit version, as all deltas (and snapshots)
/// must be materialized and converted to Values during commit.
next_idx_to_commit: AtomicTxnIndex,
next_idx_to_commit: CachePadded<AtomicTxnIndex>,

total_base_value_size: CachePadded<AtomicU64>,
}

impl<K: Eq + Hash + Clone + Debug + Copy> VersionedDelayedFields<K> {
Expand All @@ -388,20 +390,33 @@ impl<K: Eq + Hash + Clone + Debug + Copy> VersionedDelayedFields<K> {
pub(crate) fn new() -> Self {
Self {
values: DashMap::new(),
next_idx_to_commit: AtomicTxnIndex::new(0),
next_idx_to_commit: CachePadded::new(AtomicTxnIndex::new(0)),
total_base_value_size: CachePadded::new(AtomicU64::new(0)),
}
}

pub(crate) fn num_keys(&self) -> usize {
self.values.len()
}

pub(crate) fn total_base_value_size(&self) -> u64 {
self.total_base_value_size.load(Ordering::Relaxed)
}

/// Must be called when an delayed field from storage is resolved, with ID replacing the
/// base value. This ensures that VersionedValue exists for the delayed field before any
/// other uses (adding deltas, etc).
///
/// Setting base value multiple times, even concurrently, is okay for the same ID,
/// because the corresponding value prior to the block is fixed.
pub fn set_base_value(&self, id: K, base_value: DelayedFieldValue) {
self.values
.entry(id)
.or_insert(VersionedValue::new(Some(base_value)));
self.values.entry(id).or_insert_with(|| {
self.total_base_value_size.fetch_add(
base_value.get_approximate_memory_size() as u64,
Ordering::Relaxed,
);
VersionedValue::new(Some(base_value))
});
}

/// Must be called when an delayed field creation with a given ID and initial value is
Expand Down
4 changes: 4 additions & 0 deletions aptos-move/mvhashmap/src/versioned_group_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ impl<
}
}

pub(crate) fn num_keys(&self) -> usize {
self.group_values.len()
}

pub fn set_raw_base_values(&self, key: K, base_values: impl IntoIterator<Item = (T, V)>) {
// Incarnation is irrelevant for storage version, set to 0.
self.group_values
Expand Down
4 changes: 4 additions & 0 deletions aptos-move/mvhashmap/src/versioned_modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl<K: Hash + Clone + Eq, V: TransactionWrite, X: Executable> VersionedModules<
}
}

pub(crate) fn num_keys(&self) -> usize {
self.values.len()
}

/// Mark an entry from transaction 'txn_idx' at access path 'key' as an estimated write
/// (for future incarnation). Will panic if the entry is not in the data-structure.
pub fn mark_estimate(&self, key: &K, txn_idx: TxnIndex) {
Expand Down
15 changes: 13 additions & 2 deletions crates/aptos-metrics-core/src/avg_counter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
// Copyright © Aptos Foundation

use prometheus::{register_histogram, Histogram};
use prometheus::{register_histogram, register_histogram_vec, Histogram, HistogramVec};

// use histogram, instead of pair of sum/count counters, to guarantee
// atomicity of observing and fetching (which Histogram handles correctly)
pub fn register_avg_counter(name: &str, desc: &str) -> Histogram {
register_histogram!(
name,
desc,
// We need to have at least one bucket in histogram, otherwise default buckets are used
// We need to have at least one bucket in histogram, otherwise default buckets are used.
vec![0.5],
)
.unwrap()
}

pub fn register_avg_counter_vec(name: &str, desc: &str, labels: &[&str]) -> HistogramVec {
register_histogram_vec!(
name,
desc,
labels,
// We need to have at least one bucket in histogram, otherwise default buckets are used.
vec![0.5],
)
.unwrap()
Expand Down
Loading

0 comments on commit b9bf67b

Please sign in to comment.