From 90717ae57886337d77563bb89afdf736d27021f0 Mon Sep 17 00:00:00 2001 From: Rati Gelashvili Date: Wed, 20 Sep 2023 15:31:13 -0400 Subject: [PATCH] [Executor] Metadata and exists support (in Block-STM/executor) --- Cargo.lock | 1 + aptos-move/aptos-vm-types/src/resolver.rs | 36 +- aptos-move/block-executor/Cargo.toml | 1 + .../block-executor/src/captured_reads.rs | 474 ++++++++++++++++++ aptos-move/block-executor/src/executor.rs | 39 +- aptos-move/block-executor/src/lib.rs | 3 +- .../src/proptest_types/types.rs | 28 +- aptos-move/block-executor/src/task.rs | 6 +- .../src/txn_last_input_output.rs | 193 ++----- aptos-move/block-executor/src/view.rs | 281 +++++++---- aptos-move/mvhashmap/src/lib.rs | 5 +- .../src/unit_tests/proptest_types.rs | 4 +- .../mvhashmap/src/versioned_group_data.rs | 60 +-- types/src/write_set.rs | 17 +- 14 files changed, 811 insertions(+), 337 deletions(-) create mode 100644 aptos-move/block-executor/src/captured_reads.rs diff --git a/Cargo.lock b/Cargo.lock index 111b8dbce5ef68..173e7457cd89d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -583,6 +583,7 @@ dependencies = [ "criterion", "crossbeam", "dashmap", + "derivative", "itertools", "move-binary-format", "move-core-types", diff --git a/aptos-move/aptos-vm-types/src/resolver.rs b/aptos-move/aptos-vm-types/src/resolver.rs index 6dbaee6152eff7..dcc44eaa1853b5 100644 --- a/aptos-move/aptos-vm-types/src/resolver.rs +++ b/aptos-move/aptos-vm-types/src/resolver.rs @@ -22,31 +22,31 @@ pub trait TResourceView { /// - Err(...) otherwise (e.g. storage error). fn get_resource_state_value( &self, - key: &Self::Key, + state_key: &Self::Key, maybe_layout: Option<&Self::Layout>, ) -> anyhow::Result>; fn get_resource_bytes( &self, - key: &Self::Key, + state_key: &Self::Key, maybe_layout: Option<&Self::Layout>, ) -> anyhow::Result> { - let maybe_state_value = self.get_resource_state_value(key, maybe_layout)?; + let maybe_state_value = self.get_resource_state_value(state_key, maybe_layout)?; Ok(maybe_state_value.map(|state_value| state_value.bytes().clone())) } fn get_resource_state_value_metadata( &self, - key: &Self::Key, + state_key: &Self::Key, ) -> anyhow::Result> { // For metadata, layouts are not important. - let maybe_state_value = self.get_resource_state_value(key, None)?; + let maybe_state_value = self.get_resource_state_value(state_key, None)?; Ok(maybe_state_value.map(StateValue::into_metadata)) } - fn resource_exists(&self, key: &Self::Key) -> anyhow::Result { + fn resource_exists(&self, state_key: &Self::Key) -> anyhow::Result { // For existence, layouts are not important. - self.get_resource_state_value(key, None) + self.get_resource_state_value(state_key, None) .map(|maybe_state_value| maybe_state_value.is_some()) } } @@ -57,7 +57,7 @@ pub trait TResourceGroupView { fn get_resource_from_group( &self, - _key: &Self::Key, + _state_key: &Self::Key, _resource_tag: &Self::Tag, ) -> anyhow::Result> { unimplemented!("TResourceGroupView not yet implemented"); @@ -73,7 +73,7 @@ pub trait TResourceGroupView { unimplemented!("TResourceGroupView not yet implemented"); } - fn resource_group_exists(&self, _key: &Self::Key) -> anyhow::Result { + fn resource_group_exists(&self, _state_key: &Self::Key) -> anyhow::Result { unimplemented!("TResourceGroupView not yet implemented"); } @@ -90,7 +90,7 @@ pub trait TResourceGroupView { /// the parallel execution setting, as a wrong value will be (later) caught by validation. /// Thus, R/W conflicts are avoided, as long as the estimates are correct (e.g. updating /// struct members of a fixed size). - fn resource_group_size(&self, _key: &Self::Key) -> anyhow::Result { + fn resource_group_size(&self, _state_key: &Self::Key) -> anyhow::Result { unimplemented!("TResourceGroupView not yet implemented"); } @@ -106,7 +106,7 @@ pub trait TResourceGroupView { /// which in the context of parallel execution does not cause a full R/W conflict. fn resource_exists_in_group( &self, - _key: &Self::Key, + _state_key: &Self::Key, _resource_tag: &Self::Tag, ) -> anyhow::Result { unimplemented!("TResourceGroupView not yet implemented"); @@ -121,23 +121,23 @@ pub trait TModuleView { /// - Ok(None) if the module is not in storage, /// - Ok(Some(...)) if the module exists in storage, /// - Err(...) otherwise (e.g. storage error). - fn get_module_state_value(&self, key: &Self::Key) -> anyhow::Result>; + fn get_module_state_value(&self, state_key: &Self::Key) -> anyhow::Result>; - fn get_module_bytes(&self, key: &Self::Key) -> anyhow::Result> { - let maybe_state_value = self.get_module_state_value(key)?; + fn get_module_bytes(&self, state_key: &Self::Key) -> anyhow::Result> { + let maybe_state_value = self.get_module_state_value(state_key)?; Ok(maybe_state_value.map(|state_value| state_value.bytes().clone())) } fn get_module_state_value_metadata( &self, - key: &Self::Key, + state_key: &Self::Key, ) -> anyhow::Result> { - let maybe_state_value = self.get_module_state_value(key)?; + let maybe_state_value = self.get_module_state_value(state_key)?; Ok(maybe_state_value.map(StateValue::into_metadata)) } - fn module_exists(&self, key: &Self::Key) -> anyhow::Result { - self.get_module_state_value(key) + fn module_exists(&self, state_key: &Self::Key) -> anyhow::Result { + self.get_module_state_value(state_key) .map(|maybe_state_value| maybe_state_value.is_some()) } } diff --git a/aptos-move/block-executor/Cargo.toml b/aptos-move/block-executor/Cargo.toml index 5b292ab8566302..d87ac8e34e749a 100644 --- a/aptos-move/block-executor/Cargo.toml +++ b/aptos-move/block-executor/Cargo.toml @@ -30,6 +30,7 @@ claims = { workspace = true } criterion = { workspace = true, optional = true } crossbeam = { workspace = true } dashmap = { workspace = true } +derivative = { workspace = true } move-binary-format = { workspace = true } move-core-types = { workspace = true } num_cpus = { workspace = true } diff --git a/aptos-move/block-executor/src/captured_reads.rs b/aptos-move/block-executor/src/captured_reads.rs new file mode 100644 index 00000000000000..1dad22b62a6700 --- /dev/null +++ b/aptos-move/block-executor/src/captured_reads.rs @@ -0,0 +1,474 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::task::Transaction; +use anyhow::bail; +use aptos_mvhashmap::{ + types::{MVDataError, MVDataOutput, TxnIndex, Version}, + versioned_data::VersionedData, + versioned_group_data::VersionedGroupData, +}; +use aptos_types::{state_store::state_value::StateValueMetadataKind, write_set::TransactionWrite}; +use derivative::Derivative; +use std::{ + collections::{ + hash_map::{ + Entry, + Entry::{Occupied, Vacant}, + }, + HashMap, + }, + sync::Arc, +}; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) enum ReadKind { + Exists, + Metadata, + Value, +} + +/// The enum captures the state that the transaction execution extracted from +/// a read callback to block executor, in order to be validated by Block-STM. +/// The captured state is fine-grained, e.g. it distinguishes between reading +/// a full value, which is compared conservatively based on the version (i.e. +/// if value got replaced by same value upon re-execution, validation fails). +/// Other kinds of reads can access metadata information only, or check whether +/// data exists at a given key. +#[derive(Debug, Derivative)] +#[derivative(Clone(bound = ""), PartialEq(bound = ""))] +pub(crate) enum DataRead { + // Version supercedes V comparison. + Versioned(Version, #[derivative(PartialEq = "ignore")] Arc), + Metadata(Option), + Exists(bool), + /// Read resolved an aggregatorV1 delta to a value. TODO: deprecate. + Resolved(u128), +} + +// Represents the result of comparing DataReads ('self' and 'other'). +#[derive(Debug)] +enum DataReadComparison { + // Information in 'self' DataRead contains information about the kind of the + // 'other' DataRead, and is consistent with 'other'. + Contains, + // Information in 'self' DataRead contains information about the kind of the + // 'other' DataRead, but is inconsistent with 'other'. + Inconsistent, + // All information about the kind of 'other' is not contained in 'self' kind. + // For example, exists does not provide enough information about metadata. + Insufficient, +} + +impl DataRead { + // Assigns highest rank to Versioned / Resolved, then Metadata, then Exists. + // (e.g. versioned read implies metadata and existence information, and + // metadata information implies existence information). + fn get_kind(&self) -> ReadKind { + use DataRead::*; + match self { + Versioned(_, _) | Resolved(_) => ReadKind::Value, + Metadata(_) => ReadKind::Metadata, + Exists(_) => ReadKind::Exists, + } + } + + // A convenience method, since the same key can be read in different modes, producing + // different DataRead / ReadKinds. Returns true if self has >= kind than other, i.e. + // contains more or equal information, and is consistent with the information in other. + fn contains(&self, other: &DataRead) -> DataReadComparison { + let self_kind = self.get_kind(); + let other_kind = other.get_kind(); + + if self_kind < other_kind { + DataReadComparison::Insufficient + } else { + let downcast_eq = if self_kind == other_kind { + // Optimization to avoid unnecessary clones (e.g. during validation). + self == other + } else { + self.as_kind(other_kind) + .expect("Downcast to lower kind must succeed") + == *other + }; + + if downcast_eq { + DataReadComparison::Contains + } else { + DataReadComparison::Inconsistent + } + } + } + + /// If the reads contains sufficient information, extract this information and generate + /// a new DataRead of the desired kind (e.g. Metadata kind from Value). + pub(crate) fn as_kind(&self, kind: ReadKind) -> Option> { + let self_kind = self.get_kind(); + if self_kind == kind { + return Some(self.clone()); + } + + (self_kind > kind).then(|| match (self, &kind) { + (DataRead::Versioned(_, v), ReadKind::Metadata) => { + DataRead::Metadata(v.as_state_value_metadata()) + }, + (DataRead::Versioned(_, v), ReadKind::Exists) => DataRead::Exists(!v.is_deletion()), + (DataRead::Resolved(_), ReadKind::Metadata) => DataRead::Metadata(Some(None)), + (DataRead::Resolved(_), ReadKind::Exists) => DataRead::Exists(true), + (DataRead::Metadata(maybe_metadata), ReadKind::Exists) => { + DataRead::Exists(maybe_metadata.is_some()) + }, + (_, _) => unreachable!("{:?}, {:?} must be covered", self_kind, kind), + }) + } +} + +/// Given a hashmap entry for a key, incorporate a new DataRead. This checks +/// consistency and ensures that the most comprehensive read is recorded. +fn update_entry( + entry: Entry>, + read: DataRead, +) -> anyhow::Result<()> { + match entry { + Vacant(e) => { + e.insert(read); + }, + Occupied(mut e) => { + let existing_read = e.get_mut(); + match read.contains(existing_read) { + DataReadComparison::Contains => { + *existing_read = read; + }, + DataReadComparison::Inconsistent => { + bail!("Inconsistent read results (due to speculation)") + }, + DataReadComparison::Insufficient => { + // New read is of a lower kind than existing read, but while + // reading it must have been derived from the existing read. + debug_assert!( + matches!(existing_read.contains(&read), DataReadComparison::Contains), + "New read information must be contained in the existing read" + ); + }, + } + }, + } + Ok(()) +} + +/// Additional state regarding resource groups that may be provided to the VM +/// during transaction execution. This includes Data +#[derive(Derivative)] +#[derivative(Default(bound = ""))] +struct GroupRead { + /// The size of the resource group can be read (used for gas charging). + speculative_size: Option, + /// Reads to individual resources in the group, keyed by a tag. + inner_reads: HashMap>, +} + +/// Serves as a "read-set" of a transaction execution, and +#[derive(Derivative)] +#[derivative(Default(bound = "", new = "true"))] +pub(crate) struct CapturedReads { + data_reads: HashMap>, + group_reads: HashMap>, + // Currently, we record paths for triggering module R/W fallback. + // TODO: implement a general functionality once the fallback is removed. + pub(crate) module_reads: Vec, + + /// If there is a speculative failure (e.g. delta application failure, or an + /// observed inconsistency), the transaction output is irrelevant (must be + /// discarded and transaction re-executed). We have a global flag, as which + /// read observed the inconsistency is irrelevant (moreover, typically, + /// an error is returned to the VM to wrap up the ongoing execution). + speculative_failure: bool, +} + +impl CapturedReads { + pub(crate) fn validate_data_reads( + &self, + data_map: &VersionedData, + idx_to_validate: TxnIndex, + ) -> bool { + if self.speculative_failure { + return false; + } + + use MVDataError::*; + use MVDataOutput::*; + self.data_reads.iter().all(|(k, r)| { + match data_map.fetch_data(k, idx_to_validate) { + Ok(Versioned(version, v)) => { + matches!( + DataRead::Versioned(version, v).contains(r), + DataReadComparison::Contains + ) + }, + Ok(Resolved(value)) => matches!( + DataRead::Resolved(value).contains(r), + DataReadComparison::Contains + ), + // Dependency implies a validation failure, and if the original read were to + // observe an unresolved delta, it would set the aggregator base value in the + // multi-versioned data-structure, resolve, and record the resolved value. + Err(Dependency(_)) + | Err(Unresolved(_)) + | Err(DeltaApplicationFailure) + | Err(Uninitialized) => false, + } + }) + } + + pub(crate) fn validate_group_reads( + &self, + group_map: &VersionedGroupData, + idx_to_validate: TxnIndex, + ) -> bool { + if self.speculative_failure { + return false; + } + + self.group_reads.iter().all(|(key, group)| { + let mut ret = true; + if let Some(size) = group.speculative_size { + ret &= Ok(size) == group_map.get_group_size(key, idx_to_validate); + } + + ret && group.inner_reads.iter().all(|(tag, r)| { + group_map + .read_from_group(key, tag, idx_to_validate) + .is_ok_and(|(version, v)| { + matches!( + DataRead::Versioned(version, v).contains(r), + DataReadComparison::Contains + ) + }) + }) + }) + } + + // Error means there was a inconsistency in information read (must be due to the + // speculative nature of reads). + pub(crate) fn capture_read( + &mut self, + state_key: T::Key, + maybe_tag: Option, + read: DataRead, + ) -> anyhow::Result<()> { + let ret = match maybe_tag { + Some(tag) => { + let group = self.group_reads.entry(state_key).or_default(); + update_entry(group.inner_reads.entry(tag), read) + }, + None => update_entry(self.data_reads.entry(state_key), read), + }; + + if ret.is_err() { + // Record speculative failure. + self.speculative_failure = true; + } + + ret + } + + // If maybe_tag is provided, then we check the group, otherwise, normal reads. + pub(crate) fn get_by_kind( + &self, + state_key: &T::Key, + maybe_tag: Option<&T::Tag>, + kind: ReadKind, + ) -> Option> { + assert!( + kind != ReadKind::Metadata || maybe_tag.is_none(), + "May not request metadata of a group member" + ); + + match maybe_tag { + Some(tag) => self + .group_reads + .get(state_key) + .and_then(|group| group.inner_reads.get(tag).and_then(|r| r.as_kind(kind))), + None => self.data_reads.get(state_key).and_then(|r| r.as_kind(kind)), + } + } + + #[allow(dead_code)] + pub(crate) fn group_size(&self, state_key: &T::Key) -> Option { + self.group_reads + .get(state_key) + .and_then(|group| group.speculative_size) + } + + pub(crate) fn mark_failure(&mut self) { + self.speculative_failure = true; + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::proptest_types::types::ValueType; + use aptos_mvhashmap::types::StorageVersion; + use aptos_types::{ + on_chain_config::CurrentTimeMicroseconds, state_store::state_value::StateValueMetadata, + }; + use claims::{assert_gt, assert_matches, assert_none, assert_some_eq}; + + #[test] + fn data_read_kind() { + // Test the strict ordering of enum variants for the read kinds. + assert_gt!(ReadKind::Value, ReadKind::Metadata); + assert_gt!(ReadKind::Metadata, ReadKind::Exists); + + // Test that get_kind returns the proper kind for data read instances. + + assert_eq!( + DataRead::Versioned( + Err(StorageVersion), + Arc::new(ValueType::with_len_and_metadata(1, None)) + ) + .get_kind(), + ReadKind::Value + ); + assert_eq!( + DataRead::Resolved::(200).get_kind(), + ReadKind::Value + ); + assert_eq!( + DataRead::Metadata::(Some(None)).get_kind(), + ReadKind::Metadata + ); + assert_eq!( + DataRead::Metadata::(None).get_kind(), + ReadKind::Metadata + ); + assert_eq!( + DataRead::Exists::(true).get_kind(), + ReadKind::Exists + ); + assert_eq!( + DataRead::Exists::(false).get_kind(), + ReadKind::Exists + ); + } + + macro_rules! assert_inconsistent_same_kind { + ($x:expr, $y:expr) => {{ + assert_ne!($x, $y); + assert_ne!($y, $x); + assert_matches!($x.contains(&$y), DataReadComparison::Inconsistent); + assert_matches!($y.contains(&$x), DataReadComparison::Inconsistent); + }}; + } + + macro_rules! assert_inconsistent_downcast { + ($x:expr, $y:expr) => {{ + assert_ne!($x, $y); + assert_ne!($y, $x); + assert_matches!($x.contains(&$y), DataReadComparison::Inconsistent); + assert_matches!($y.contains(&$x), DataReadComparison::Insufficient); + }}; + } + + macro_rules! assert_contains { + ($x:expr, $y:expr) => {{ + assert_some_eq!($x.as_kind($y.get_kind()), $y); + assert_matches!($x.contains(&$y), DataReadComparison::Contains); + }}; + } + + macro_rules! assert_insufficient { + ($x:expr, $y:expr) => {{ + assert_none!($x.as_kind($y.get_kind())); + assert_matches!($x.contains(&$y), DataReadComparison::Insufficient); + }}; + } + + #[test] + fn as_contained_kind() { + // Legacy state values do not have metadata. + let raw_metadata = Some(StateValueMetadata::new(5, &CurrentTimeMicroseconds { + microseconds: 7, + })); + let versioned_legacy = DataRead::Versioned( + Err(StorageVersion), + Arc::new(ValueType::with_len_and_metadata(1, None)), + ); + let versioned_deletion = DataRead::Versioned( + Ok((5, 1)), + Arc::new(ValueType::with_len_and_metadata(0, None)), + ); + let versioned_with_metadata = DataRead::Versioned( + Ok((7, 0)), + Arc::new(ValueType::with_len_and_metadata(2, raw_metadata.clone())), + ); + + let resolved = DataRead::Resolved::(200); + let deletion_metadata = DataRead::Metadata(None); + let legacy_metadata = DataRead::Metadata(Some(None)); + let metadata = DataRead::Metadata(Some(raw_metadata)); + let exists = DataRead::Exists(true); + let not_exists = DataRead::Exists(false); + + // Test contains & downcast. + assert_contains!(versioned_legacy, legacy_metadata); + assert_contains!(resolved, legacy_metadata); + assert_contains!(versioned_legacy, exists); + assert_contains!(resolved, exists); + assert_contains!(legacy_metadata, exists); + // Same checks for deletion (Resolved cannot be a deletion). + assert_contains!(versioned_deletion, deletion_metadata); + assert_contains!(versioned_deletion, not_exists); + assert_contains!(deletion_metadata, not_exists); + // Same checks with real metadata. + assert_contains!(versioned_with_metadata, metadata); + assert_contains!(versioned_with_metadata, exists); + assert_contains!(metadata, exists); + + // Test upcast. + assert_insufficient!(legacy_metadata, versioned_legacy); + assert_insufficient!(deletion_metadata, versioned_legacy); + assert_insufficient!(exists, versioned_legacy); + assert_insufficient!(not_exists, versioned_legacy); + assert_insufficient!(exists, legacy_metadata); + assert_insufficient!(not_exists, legacy_metadata); + + // Test inconsistency at the same kind. + assert_inconsistent_same_kind!(exists, not_exists); + assert_inconsistent_same_kind!(deletion_metadata, legacy_metadata); + assert_inconsistent_same_kind!(deletion_metadata, metadata); + assert_inconsistent_same_kind!(legacy_metadata, metadata); + assert_inconsistent_same_kind!(versioned_legacy, versioned_with_metadata); + assert_inconsistent_same_kind!(versioned_legacy, versioned_deletion); + assert_inconsistent_same_kind!(versioned_legacy, resolved); + assert_inconsistent_same_kind!(versioned_with_metadata, versioned_deletion); + assert_inconsistent_same_kind!(versioned_with_metadata, resolved); + assert_inconsistent_same_kind!(versioned_deletion, resolved); + // Test inconsistency with downcast. + assert_inconsistent_downcast!(versioned_legacy, metadata); + assert_inconsistent_downcast!(versioned_legacy, deletion_metadata); + assert_inconsistent_downcast!(versioned_legacy, not_exists); + assert_inconsistent_downcast!(resolved, deletion_metadata); + assert_inconsistent_downcast!(resolved, metadata); + assert_inconsistent_downcast!(resolved, not_exists); + assert_inconsistent_downcast!(versioned_with_metadata, legacy_metadata); + assert_inconsistent_downcast!(versioned_with_metadata, deletion_metadata); + assert_inconsistent_downcast!(versioned_with_metadata, not_exists); + assert_inconsistent_downcast!(versioned_deletion, legacy_metadata); + assert_inconsistent_downcast!(versioned_deletion, metadata); + assert_inconsistent_downcast!(versioned_deletion, exists); + assert_inconsistent_downcast!(metadata, not_exists); + assert_inconsistent_downcast!(legacy_metadata, not_exists); + assert_inconsistent_downcast!(deletion_metadata, exists); + + // Test that V is getting ignored in the comparison. + assert_eq!( + versioned_legacy, + DataRead::Versioned( + Err(StorageVersion), + Arc::new(ValueType::with_len_and_metadata(10, None)) + ) + ); + } +} diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index f2f35d7da9ab23..e9b976fb24cef0 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -18,7 +18,7 @@ use crate::{ use aptos_aggregator::delta_change_set::serialize; use aptos_logger::{debug, info}; use aptos_mvhashmap::{ - types::{Incarnation, MVDataError, MVDataOutput, TxnIndex}, + types::{Incarnation, TxnIndex}, unsync_map::UnsyncMap, MVHashMap, }; @@ -118,7 +118,7 @@ where idx_to_execute: TxnIndex, incarnation: Incarnation, signature_verified_block: &[T], - last_input_output: &TxnLastInputOutput, + last_input_output: &TxnLastInputOutput, versioned_cache: &MVHashMap, scheduler: &Scheduler, executor: &E, @@ -215,39 +215,18 @@ where idx_to_validate: TxnIndex, incarnation: Incarnation, validation_wave: Wave, - last_input_output: &TxnLastInputOutput, + last_input_output: &TxnLastInputOutput, versioned_cache: &MVHashMap, scheduler: &Scheduler, ) -> SchedulerTask { - use MVDataError::*; - use MVDataOutput::*; - let _timer = TASK_VALIDATE_SECONDS.start_timer(); let read_set = last_input_output .read_set(idx_to_validate) .expect("[BlockSTM]: Prior read-set must be recorded"); - let valid = read_set.iter().all(|r| { - if r.is_speculative_failure() { - return false; - } - - match versioned_cache.data().fetch_data(r.path(), idx_to_validate) { - Ok(Versioned(version, _)) => r.validate_versioned(version), - Ok(Resolved(value)) => r.validate_resolved(value), - Err(Uninitialized) => { - // Can match the current behavior for modules: the path would be considered - // 'Uninitialized' for data() hashmap, as the output is stored in the modules - // MVHashMap. We validate all module reads successfully, as reading any - // module that is also published triggeres ModulePathReadWrite fallback. - r.validate_module() - }, - // Dependency implies a validation failure, and if the original read were to - // observe an unresolved delta, it would set the aggregator base value in the - // multi-versioned data-structure, resolve, and record the resolved value. - Err(Dependency(_)) | Err(Unresolved(_)) | Err(DeltaApplicationFailure) => false, - } - }); + // TODO: validate modules when there is no r/w fallback. + let valid = read_set.validate_data_reads(versioned_cache.data(), idx_to_validate) + && read_set.validate_group_reads(versioned_cache.group_data(), idx_to_validate); let aborted = !valid && scheduler.try_abort(idx_to_validate, incarnation); @@ -282,7 +261,7 @@ where post_commit_txs: &Vec>, worker_idx: &mut usize, scheduler_task: &mut SchedulerTask, - last_input_output: &TxnLastInputOutput, + last_input_output: &TxnLastInputOutput, accumulated_fee_statement: &mut FeeStatement, txn_fee_statements: &mut Vec, ) { @@ -371,7 +350,7 @@ where &self, txn_idx: TxnIndex, versioned_cache: &MVHashMap, - last_input_output: &TxnLastInputOutput, + last_input_output: &TxnLastInputOutput, base_view: &S, ) { let delta_keys = last_input_output.delta_keys(txn_idx); @@ -432,7 +411,7 @@ where &self, executor_arguments: &E::Argument, block: &[T], - last_input_output: &TxnLastInputOutput, + last_input_output: &TxnLastInputOutput, versioned_cache: &MVHashMap, scheduler: &Scheduler, // TODO: should not need to pass base view. diff --git a/aptos-move/block-executor/src/lib.rs b/aptos-move/block-executor/src/lib.rs index 321ec25d812783..c3208c61ce098f 100644 --- a/aptos-move/block-executor/src/lib.rs +++ b/aptos-move/block-executor/src/lib.rs @@ -135,7 +135,8 @@ optimistically creating validation tasks for higher transactions in 2(b), and threads that perform these tasks can already detect validation failures due to the ESTIMATE markers on memory locations, instead of waiting for a subsequent incarnation to finish. -**/ + **/ +mod captured_reads; pub mod counters; pub mod errors; pub mod executor; diff --git a/aptos-move/block-executor/src/proptest_types/types.rs b/aptos-move/block-executor/src/proptest_types/types.rs index 2fc4784bba1197..be0bc08a16b133 100644 --- a/aptos-move/block-executor/src/proptest_types/types.rs +++ b/aptos-move/block-executor/src/proptest_types/types.rs @@ -13,7 +13,10 @@ use aptos_types::{ event::EventKey, executable::ModulePath, fee_statement::FeeStatement, - state_store::{state_storage_usage::StateStorageUsage, state_value::StateValue}, + state_store::{ + state_storage_usage::StateStorageUsage, + state_value::{StateValue, StateValueMetadataKind}, + }, write_set::{TransactionWrite, WriteOp}, }; use aptos_vm_types::resolver::TExecutorView; @@ -129,6 +132,7 @@ impl ModulePath for KeyType pub(crate) struct ValueType { /// Wrapping the types used for testing to add TransactionWrite trait implementation (below). bytes: Option, + metadata: StateValueMetadataKind, } impl Arbitrary for ValueType { @@ -159,6 +163,14 @@ impl ValueType { v.resize(16, 1); v.into() }), + metadata: None, + } + } + + pub(crate) fn with_len_and_metadata(len: usize, metadata: StateValueMetadataKind) -> Self { + Self { + bytes: (len > 0).then_some(vec![100_u8; len].into()), + metadata, } } } @@ -169,13 +181,23 @@ impl TransactionWrite for ValueType { } fn from_state_value(maybe_state_value: Option) -> Self { + let (maybe_metadata, maybe_bytes) = + match maybe_state_value.map(|state_value| state_value.into()) { + Some((maybe_metadata, bytes)) => (maybe_metadata, Some(bytes)), + None => (None, None), + }; + Self { - bytes: maybe_state_value.map(|state_value| state_value.bytes().clone()), + bytes: maybe_bytes, + metadata: maybe_metadata, } } fn as_state_value(&self) -> Option { - self.extract_raw_bytes().map(StateValue::new_legacy) + self.extract_raw_bytes().map(|bytes| match &self.metadata { + Some(metadata) => StateValue::new_with_metadata(bytes, metadata.clone()), + None => StateValue::new_legacy(bytes), + }) } } diff --git a/aptos-move/block-executor/src/task.rs b/aptos-move/block-executor/src/task.rs index a5b910cccdc042..456ba80a2693e5 100644 --- a/aptos-move/block-executor/src/task.rs +++ b/aptos-move/block-executor/src/task.rs @@ -17,15 +17,15 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash}; /// The execution result of a transaction #[derive(Debug)] -pub enum ExecutionStatus { +pub enum ExecutionStatus { /// Transaction was executed successfully. - Success(T), + Success(O), /// Transaction hit a none recoverable error during execution, halt the execution and propagate /// the error back to the caller. Abort(E), /// Transaction was executed successfully, but will skip the execution of the trailing /// transactions in the list - SkipRest(T), + SkipRest(O), } /// Trait that defines a transaction type that can be executed by the block executor. A transaction diff --git a/aptos-move/block-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs index d6cc643b22fbaa..c45d116dfb99a4 100644 --- a/aptos-move/block-executor/src/txn_last_input_output.rs +++ b/aptos-move/block-executor/src/txn_last_input_output.rs @@ -2,19 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + captured_reads::CapturedReads, errors::Error, task::{ExecutionStatus, Transaction, TransactionOutput}, }; use anyhow::anyhow; -use aptos_mvhashmap::types::{TxnIndex, Version}; -use aptos_types::{ - access_path::AccessPath, executable::ModulePath, fee_statement::FeeStatement, - write_set::WriteOp, -}; +use aptos_mvhashmap::types::TxnIndex; +use aptos_types::{fee_statement::FeeStatement, write_set::WriteOp}; use arc_swap::ArcSwapOption; use crossbeam::utils::CachePadded; use dashmap::DashSet; use std::{ + collections::HashMap, fmt::Debug, iter::{empty, Iterator}, sync::{ @@ -23,119 +22,41 @@ use std::{ }, }; -type TxnInput = Vec>; +type TxnInput = CapturedReads; + // When a transaction is committed, the output delta writes must be populated by // the WriteOps corresponding to the deltas in the corresponding outputs. #[derive(Debug)] -pub(crate) struct TxnOutput { - output_status: ExecutionStatus>, +pub(crate) struct TxnOutput { + output_status: ExecutionStatus>, } -impl TxnOutput { - pub fn from_output_status(output_status: ExecutionStatus>) -> Self { +impl TxnOutput { + pub fn from_output_status(output_status: ExecutionStatus>) -> Self { Self { output_status } } - pub fn output_status(&self) -> &ExecutionStatus> { + pub fn output_status(&self) -> &ExecutionStatus> { &self.output_status } } -/// Information about the read which is used by validation. -#[derive(Debug, Clone, PartialEq)] -enum ReadKind { - /// Read returned a value from the multi-version data-structure, with index - /// and incarnation number of the execution associated with the write of - /// that entry. - Versioned(Version), - /// Read resolved a delta. - Resolved(u128), - /// Speculative inconsistency failure. - SpeculativeFailure, - /// Module read. TODO: Design a better representation once more meaningfully separated. - Module, -} - -#[derive(Debug, Clone)] -pub struct ReadDescriptor { - access_path: K, - kind: ReadKind, -} - -impl ReadDescriptor { - pub fn from_versioned(access_path: K, version: Version) -> Self { - Self { - access_path, - kind: ReadKind::Versioned(version), - } - } - - pub fn from_resolved(access_path: K, value: u128) -> Self { - Self { - access_path, - kind: ReadKind::Resolved(value), - } - } - - pub fn from_module(access_path: K) -> Self { - Self { - access_path, - kind: ReadKind::Module, - } - } - - pub fn from_speculative_failure(access_path: K) -> Self { - Self { - access_path, - kind: ReadKind::SpeculativeFailure, - } - } - - fn module_path(&self) -> Option { - self.access_path.module_path() - } - - pub fn path(&self) -> &K { - &self.access_path - } - - // Does the read descriptor describe a read from MVHashMap w. a specified version. - pub fn validate_versioned(&self, version: Version) -> bool { - self.kind == ReadKind::Versioned(version) - } - - // Does the read descriptor describe a read from MVHashMap w. a resolved delta. - pub fn validate_resolved(&self, value: u128) -> bool { - self.kind == ReadKind::Resolved(value) - } - - // Does the read descriptor describe a read from MVHashMap w. a resolved delta. - pub fn validate_module(&self) -> bool { - self.kind == ReadKind::Module - } - - // Does the read descriptor describe to a read with a delta application failure. - pub fn is_speculative_failure(&self) -> bool { - self.kind == ReadKind::SpeculativeFailure - } -} - -pub struct TxnLastInputOutput { - inputs: Vec>>>, // txn_idx -> input. +pub struct TxnLastInputOutput, E: Debug> { + inputs: Vec>>>, // txn_idx -> input. - outputs: Vec>>>, // txn_idx -> output. + outputs: Vec>>>, // txn_idx -> output. // Record all writes and reads to access paths corresponding to modules (code) in any // (speculative) executions. Used to avoid a potential race with module publishing and // Move-VM loader cache - see 'record' function comment for more information. - module_writes: DashSet, - module_reads: DashSet, + module_writes: DashSet, + module_reads: DashSet, module_read_write_intersection: AtomicBool, } -impl - TxnLastInputOutput +impl, E: Debug + Send + Clone> + TxnLastInputOutput { pub fn new(num_txns: TxnIndex) -> Self { Self { @@ -151,16 +72,16 @@ impl } } - fn append_and_check( - paths: Vec, - set_to_append: &DashSet, - set_to_check: &DashSet, + fn append_and_check<'a>( + paths: impl Iterator, + set_to_append: &DashSet, + set_to_check: &DashSet, ) -> bool { for path in paths { // Standard flags, first show, then look. set_to_append.insert(path.clone()); - if set_to_check.contains(&path) { + if set_to_check.contains(path) { return true; } } @@ -182,36 +103,27 @@ impl pub(crate) fn record( &self, txn_idx: TxnIndex, - input: Vec>, - output: ExecutionStatus>, + input: CapturedReads, + output: ExecutionStatus>, ) -> anyhow::Result<()> { - let read_modules: Vec = input - .iter() - .filter_map(|desc| { - matches!(desc.kind, ReadKind::Module).then(|| { - desc.module_path() - .unwrap_or_else(|| panic!("Module path guaranteed to exist {:?}", desc)) - }) - }) - .collect(); - let written_modules: Vec = match &output { - ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => output - .module_write_set() - .keys() - .map(|k| { - k.module_path().unwrap_or_else(|| { - panic!("Unexpected non-module key found in putput: {:?}", k) - }) - }) - .collect(), - ExecutionStatus::Abort(_) => Vec::new(), + let written_modules = match &output { + ExecutionStatus::Success(output) | ExecutionStatus::SkipRest(output) => { + output.module_write_set() + }, + ExecutionStatus::Abort(_) => HashMap::new(), }; if !self.module_read_write_intersection.load(Ordering::Relaxed) { // Check if adding new read & write modules leads to intersections. - if Self::append_and_check(read_modules, &self.module_reads, &self.module_writes) - || Self::append_and_check(written_modules, &self.module_writes, &self.module_reads) - { + if Self::append_and_check( + input.module_reads.iter(), + &self.module_reads, + &self.module_writes, + ) || Self::append_and_check( + written_modules.keys(), + &self.module_writes, + &self.module_reads, + ) { self.module_read_write_intersection .store(true, Ordering::Release); return Err(anyhow!( @@ -230,7 +142,7 @@ impl self.module_read_write_intersection.load(Ordering::Acquire) } - pub(crate) fn read_set(&self, txn_idx: TxnIndex) -> Option>>> { + pub(crate) fn read_set(&self, txn_idx: TxnIndex) -> Option>> { self.inputs[txn_idx as usize].load_full() } @@ -269,7 +181,7 @@ impl } } - pub(crate) fn txn_output(&self, txn_idx: TxnIndex) -> Option>> { + pub(crate) fn txn_output(&self, txn_idx: TxnIndex) -> Option>> { self.outputs[txn_idx as usize].load_full() } @@ -278,8 +190,7 @@ impl pub(crate) fn modified_keys( &self, txn_idx: TxnIndex, - ) -> Option::Txn as Transaction>::Key, bool)>> - { + ) -> Option> { self.outputs[txn_idx as usize] .load_full() .and_then(|txn_output| match &txn_output.output_status { @@ -295,10 +206,7 @@ impl }) } - pub(crate) fn delta_keys( - &self, - txn_idx: TxnIndex, - ) -> Vec<<::Txn as Transaction>::Key> { + pub(crate) fn delta_keys(&self, txn_idx: TxnIndex) -> Vec { self.outputs[txn_idx as usize].load().as_ref().map_or( vec![], |txn_output| match &txn_output.output_status { @@ -310,20 +218,15 @@ impl ) } - pub(crate) fn events( - &self, - txn_idx: TxnIndex, - ) -> Box::Txn as Transaction>::Event>> { + pub(crate) fn events(&self, txn_idx: TxnIndex) -> Box> { self.outputs[txn_idx as usize].load().as_ref().map_or( - Box::new(empty::<<::Txn as Transaction>::Event>()), + Box::new(empty::()), |txn_output| match &txn_output.output_status { ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => { let events = t.get_events(); Box::new(events.into_iter()) }, - ExecutionStatus::Abort(_) => { - Box::new(empty::<<::Txn as Transaction>::Event>()) - }, + ExecutionStatus::Abort(_) => Box::new(empty::()), }, ) } @@ -333,7 +236,7 @@ impl pub(crate) fn record_delta_writes( &self, txn_idx: TxnIndex, - delta_writes: Vec<(<::Txn as Transaction>::Key, WriteOp)>, + delta_writes: Vec<(T::Key, WriteOp)>, ) { match &self.outputs[txn_idx as usize] .load_full() @@ -349,7 +252,7 @@ impl // Must be executed after parallel execution is done, grabs outputs. Will panic if // other outstanding references to the recorded outputs exist. - pub(crate) fn take_output(&self, txn_idx: TxnIndex) -> ExecutionStatus> { + pub(crate) fn take_output(&self, txn_idx: TxnIndex) -> ExecutionStatus> { let owning_ptr = self.outputs[txn_idx as usize] .swap(None) .expect("[BlockSTM]: Output must be recorded after execution"); diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 19795b51d33dd3..ade19e0bd66a54 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -2,10 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + captured_reads::{CapturedReads, DataRead, ReadKind}, counters, scheduler::{DependencyResult, DependencyStatus, Scheduler}, task::Transaction, - txn_last_input_output::ReadDescriptor, }; use aptos_aggregator::{ delta_change_set::serialize, @@ -20,7 +20,10 @@ use aptos_mvhashmap::{ use aptos_state_view::{StateViewId, TStateView}; use aptos_types::{ executable::{Executable, ModulePath}, - state_store::{state_storage_usage::StateStorageUsage, state_value::StateValue}, + state_store::{ + state_storage_usage::StateStorageUsage, + state_value::{StateValue, StateValueMetadataKind}, + }, write_set::TransactionWrite, }; use aptos_vm_logging::{log_schema::AdapterLogSchema, prelude::*}; @@ -29,34 +32,41 @@ use move_core_types::{ value::MoveTypeLayout, vm_status::{StatusCode, VMStatus}, }; -use std::{ - cell::RefCell, - fmt::Debug, - sync::{atomic::AtomicU32, Arc}, -}; +use std::{cell::RefCell, fmt::Debug, sync::atomic::AtomicU32}; /// A struct which describes the result of the read from the proxy. The client /// can interpret these types to further resolve the reads. #[derive(Debug)] -pub(crate) enum ReadResult { - // Successful read of a value. - Value(Arc), - // Similar to above, but the value was aggregated and is an integer. - U128(u128), - // Read did not return anything. +pub(crate) enum ReadResult { + Value(Option), + Metadata(Option), + Exists(bool), Uninitialized, - // Must half the execution of the calling transaction. This might be because + // Must halt the execution of the calling transaction. This might be because // there was an inconsistency in observed speculative state, or dependency // waiting indicated that the parallel execution had been halted. The String // parameter provides more context (error description / message). HaltSpeculativeExecution(String), } +impl ReadResult { + fn from_data_read(data: DataRead) -> Self { + match data { + DataRead::Versioned(_, v) => ReadResult::Value(v.as_state_value()), + DataRead::Resolved(v) => { + ReadResult::Value(Some(StateValue::new_legacy(serialize(&v).into()))) + }, + DataRead::Metadata(maybe_metadata) => ReadResult::Metadata(maybe_metadata), + DataRead::Exists(exists) => ReadResult::Exists(exists), + } + } +} + pub(crate) struct ParallelState<'a, T: Transaction, X: Executable> { versioned_map: &'a MVHashMap, scheduler: &'a Scheduler, _counter: &'a AtomicU32, - captured_reads: RefCell>>, + captured_reads: RefCell>, } impl<'a, T: Transaction, X: Executable> ParallelState<'a, T, X> { @@ -69,7 +79,7 @@ impl<'a, T: Transaction, X: Executable> ParallelState<'a, T, X> { versioned_map: shared_map, scheduler: shared_scheduler, _counter: shared_counter, - captured_reads: RefCell::new(Vec::new()), + captured_reads: RefCell::new(CapturedReads::new()), } } @@ -79,33 +89,98 @@ impl<'a, T: Transaction, X: Executable> ParallelState<'a, T, X> { key: &T::Key, txn_idx: TxnIndex, ) -> anyhow::Result, MVModulesError> { - // Register a fake read for the read / write path intersection fallback for modules. + // Record for the R/W path intersection fallback for modules. self.captured_reads .borrow_mut() - .push(ReadDescriptor::from_module(key.clone())); + .module_reads + .push(key.clone()); self.versioned_map.modules().fetch_module(key, txn_idx) } + // txn_idx is estimated to have a r/w dependency on dep_idx. + // Returns after the dependency has been resolved, the returned indicator is true if + // it is safe to continue, and false if the execution has been halted. + fn wait_for_dependency(&self, txn_idx: TxnIndex, dep_idx: TxnIndex) -> bool { + match self.scheduler.wait_for_dependency(txn_idx, dep_idx) { + DependencyResult::Dependency(dep_condition) => { + let _timer = counters::DEPENDENCY_WAIT_SECONDS.start_timer(); + // Wait on a condition variable corresponding to the encountered + // read dependency. Once the dep_idx finishes re-execution, scheduler + // will mark the dependency as resolved, and then the txn_idx will be + // scheduled for re-execution, which will re-awaken cvar here. + // A deadlock is not possible due to these condition variables: + // suppose all threads are waiting on read dependency, and consider + // one with lowest txn_idx. It observed a dependency, so some thread + // aborted dep_idx. If that abort returned execution task, by + // minimality (lower transactions aren't waiting), that thread would + // finish execution unblock txn_idx, contradiction. Otherwise, + // execution_idx in scheduler was lower at a time when at least the + // thread that aborted dep_idx was alive, and again, since lower txns + // than txn_idx are not blocked, so the execution of dep_idx will + // eventually finish and lead to unblocking txn_idx, contradiction. + let (lock, cvar) = &*dep_condition; + let mut dep_resolved = lock.lock(); + while let DependencyStatus::Unresolved = *dep_resolved { + dep_resolved = cvar.wait(dep_resolved).unwrap(); + } + // dep resolved status is either resolved or execution halted. + matches!(*dep_resolved, DependencyStatus::Resolved) + }, + DependencyResult::ExecutionHalted => false, + DependencyResult::Resolved => true, + } + } + /// Captures a read from the VM execution, but not unresolved deltas, as in this case it is the /// callers responsibility to set the aggregator's base value and call fetch_data again. - fn fetch_data(&self, key: &T::Key, txn_idx: TxnIndex) -> ReadResult { + fn read_data_by_kind(&self, key: &T::Key, txn_idx: TxnIndex, kind: ReadKind) -> ReadResult { use MVDataError::*; use MVDataOutput::*; + if let Some(data) = self + .captured_reads + .borrow() + .get_by_kind(key, None, kind.clone()) + { + return ReadResult::from_data_read(data); + } + loop { match self.versioned_map.data().fetch_data(key, txn_idx) { Ok(Versioned(version, v)) => { - self.captured_reads + let data_read = DataRead::Versioned(version, v.clone()) + .as_kind(kind) + .expect("Conversion of kind from Versioned must succeed"); + + if self + .captured_reads .borrow_mut() - .push(ReadDescriptor::from_versioned(key.clone(), version)); - return ReadResult::Value(v); + .capture_read(key.clone(), None, data_read.clone()) + .is_err() + { + // Inconsistency in recorded reads. + break; + } + + return ReadResult::from_data_read(data_read); }, Ok(Resolved(value)) => { - self.captured_reads + let data_read = DataRead::Resolved(value) + .as_kind(kind) + .expect("Conversion of kind from Resolved must succeed"); + + if self + .captured_reads .borrow_mut() - .push(ReadDescriptor::from_resolved(key.clone(), value)); - return ReadResult::U128(value); + .capture_read(key.clone(), None, data_read.clone()) + .is_err() + { + // Inconsistency in recorded reads. + break; + } + + return ReadResult::from_data_read(data_read); }, Err(Uninitialized) | Err(Unresolved(_)) => { // The underlying assumption here for not recording anything about the read is @@ -115,58 +190,20 @@ impl<'a, T: Transaction, X: Executable> ParallelState<'a, T, X> { return ReadResult::Uninitialized; }, Err(Dependency(dep_idx)) => { - // `self.txn_idx` estimated to depend on a write from `dep_idx`. - match self.scheduler.wait_for_dependency(txn_idx, dep_idx) { - DependencyResult::Dependency(dep_condition) => { - let _timer = counters::DEPENDENCY_WAIT_SECONDS.start_timer(); - // Wait on a condition variable corresponding to the encountered - // read dependency. Once the dep_idx finishes re-execution, scheduler - // will mark the dependency as resolved, and then the txn_idx will be - // scheduled for re-execution, which will re-awaken cvar here. - // A deadlock is not possible due to these condition variables: - // suppose all threads are waiting on read dependency, and consider - // one with lowest txn_idx. It observed a dependency, so some thread - // aborted dep_idx. If that abort returned execution task, by - // minimality (lower transactions aren't waiting), that thread would - // finish execution unblock txn_idx, contradiction. Otherwise, - // execution_idx in scheduler was lower at a time when at least the - // thread that aborted dep_idx was alive, and again, since lower txns - // than txn_idx are not blocked, so the execution of dep_idx will - // eventually finish and lead to unblocking txn_idx, contradiction. - let (lock, cvar) = &*dep_condition; - let mut dep_resolved = lock.lock(); - while let DependencyStatus::Unresolved = *dep_resolved { - dep_resolved = cvar.wait(dep_resolved).unwrap(); - } - if let DependencyStatus::ExecutionHalted = *dep_resolved { - return ReadResult::HaltSpeculativeExecution( - "Speculative error to halt BlockSTM early.".to_string(), - ); - } - }, - DependencyResult::ExecutionHalted => { - return ReadResult::HaltSpeculativeExecution( - "Speculative error to halt BlockSTM early.".to_string(), - ); - }, - DependencyResult::Resolved => continue, + if !self.wait_for_dependency(txn_idx, dep_idx) { + break; } }, Err(DeltaApplicationFailure) => { - // Delta application failure currently should never happen. Here, we assume it - // happened because of speculation and return 0 to the Move-VM. Validation will - // ensure the transaction re-executes if 0 wasn't the right number. - - self.captured_reads - .borrow_mut() - .push(ReadDescriptor::from_speculative_failure(key.clone())); - - return ReadResult::HaltSpeculativeExecution( - "Delta application failure (must be speculative)".to_string(), - ); + self.captured_reads.borrow_mut().mark_failure(); + break; }, }; } + + ReadResult::HaltSpeculativeExecution( + "Delta application failure (must be speculative)".to_string(), + ) } } @@ -205,7 +242,7 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< } /// Drains the captured reads. - pub(crate) fn take_reads(&self) -> Vec> { + pub(crate) fn take_reads(&self) -> CapturedReads { match &self.latest_view { ViewState::Sync(state) => state.captured_reads.take(), ViewState::Unsync(_) => { @@ -227,21 +264,16 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> LatestView< state_key ); } + + // TODO: AggregatorID in V2 can be replaced here. ret } -} - -impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceView - for LatestView<'a, T, S, X> -{ - type Key = T::Key; - type Layout = MoveTypeLayout; - fn get_resource_state_value( + fn get_resource_state_value_impl( &self, - state_key: &Self::Key, - _maybe_layout: Option<&Self::Layout>, - ) -> anyhow::Result> { + state_key: &T::Key, + kind: ReadKind, + ) -> anyhow::Result { debug_assert!( state_key.module_path().is_none(), "Reading a module {:?} using ResourceView", @@ -250,24 +282,19 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceVi match &self.latest_view { ViewState::Sync(state) => { - let mut mv_value = state.fetch_data(state_key, self.txn_idx); - - if matches!(mv_value, ReadResult::Uninitialized) { - let from_storage = self.base_view.get_state_value(state_key)?; + let mut ret = state.read_data_by_kind(state_key, self.txn_idx, kind.clone()); - // This base value can also be used to resolve AggregatorV1 directly from - // the versioned data-structure (without more storage calls). + if matches!(ret, ReadResult::Uninitialized) { + let from_storage = self.get_base_value(state_key)?; state.versioned_map.data().provide_base_value( state_key.clone(), TransactionWrite::from_state_value(from_storage), ); - mv_value = state.fetch_data(state_key, self.txn_idx); + ret = state.read_data_by_kind(state_key, self.txn_idx, kind); } - match mv_value { - ReadResult::Value(v) => Ok(v.as_state_value()), - ReadResult::U128(v) => Ok(Some(StateValue::new_legacy(serialize(&v).into()))), + match ret { // ExecutionHalted indicates that the parallel execution is halted. // The read should return immediately and log the error. // For now we use STORAGE_ERROR as the VM will not log the speculative eror, @@ -279,19 +306,71 @@ impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceVi ReadResult::Uninitialized => { unreachable!("base value must already be recorded in the MV data structure") }, + _ => Ok(ret), } }, - ViewState::Unsync(state) => state.unsync_map.fetch_data(state_key).map_or_else( - || { - // TODO: AggregatorV2 ID for sequential must be replaced in this flow. - self.get_base_value(state_key) - }, - |v| Ok(v.as_state_value()), - ), + ViewState::Unsync(state) => { + let ret = state.unsync_map.fetch_data(state_key).map_or_else( + || self.get_base_value(state_key), + |v| Ok(v.as_state_value()), + ); + ret.map(|maybe_state_value| match kind { + ReadKind::Value => ReadResult::Value(maybe_state_value), + ReadKind::Metadata => { + ReadResult::Metadata(maybe_state_value.map(StateValue::into_metadata)) + }, + ReadKind::Exists => ReadResult::Exists(maybe_state_value.is_some()), + }) + }, } } +} + +impl<'a, T: Transaction, S: TStateView, X: Executable> TResourceView + for LatestView<'a, T, S, X> +{ + type Key = T::Key; + type Layout = MoveTypeLayout; + + fn get_resource_state_value( + &self, + state_key: &Self::Key, + _maybe_layout: Option<&Self::Layout>, + ) -> anyhow::Result> { + self.get_resource_state_value_impl(state_key, ReadKind::Value) + .map(|res| { + if let ReadResult::Value(v) = res { + v + } else { + unreachable!("Read result must be Value kind") + } + }) + } + + fn get_resource_state_value_metadata( + &self, + state_key: &Self::Key, + ) -> anyhow::Result> { + self.get_resource_state_value_impl(state_key, ReadKind::Metadata) + .map(|res| { + if let ReadResult::Metadata(v) = res { + v + } else { + unreachable!("Read result must be Value kind") + } + }) + } - // TODO: implement here fn get_resource_state_value_metadata & resource_exists. + fn resource_exists(&self, state_key: &Self::Key) -> anyhow::Result { + self.get_resource_state_value_impl(state_key, ReadKind::Exists) + .map(|res| { + if let ReadResult::Exists(v) = res { + v + } else { + unreachable!("Read result must be Value kind") + } + }) + } } impl<'a, T: Transaction, S: TStateView, X: Executable> TModuleView diff --git a/aptos-move/mvhashmap/src/lib.rs b/aptos-move/mvhashmap/src/lib.rs index 8d55e324640d93..0f6b4803ddc474 100644 --- a/aptos-move/mvhashmap/src/lib.rs +++ b/aptos-move/mvhashmap/src/lib.rs @@ -14,15 +14,14 @@ use serde::Serialize; use std::{fmt::Debug, hash::Hash}; pub mod types; +#[cfg(test)] +mod unit_tests; pub mod unsync_map; mod utils; pub mod versioned_data; pub mod versioned_group_data; pub mod versioned_modules; -#[cfg(test)] -mod unit_tests; - /// Main multi-version data-structure used by threads to read/write during parallel /// execution. /// diff --git a/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs b/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs index 9807fb9947d78d..e22660b92ab395 100644 --- a/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs +++ b/aptos-move/mvhashmap/src/unit_tests/proptest_types.rs @@ -271,10 +271,10 @@ where if test_group { match map.group_data().read_from_group( &KeyType(key.clone()), - idx as TxnIndex, &5, + idx as TxnIndex, ) { - Ok((v, _)) => { + Ok((_, v)) => { assert_value(v); break; }, diff --git a/aptos-move/mvhashmap/src/versioned_group_data.rs b/aptos-move/mvhashmap/src/versioned_group_data.rs index 8e30bc3ffd7484..b083a941044d3a 100644 --- a/aptos-move/mvhashmap/src/versioned_group_data.rs +++ b/aptos-move/mvhashmap/src/versioned_group_data.rs @@ -165,9 +165,9 @@ impl VersionedGro fn get_latest_tagged_value( &self, - txn_idx: TxnIndex, tag: &T, - ) -> Result<(Arc, Version), MVGroupError> { + txn_idx: TxnIndex, + ) -> Result<(Version, Arc), MVGroupError> { let common_error = || -> MVGroupError { if self.idx_to_update.contains_key(&ShiftedTxnIndex::zero()) { MVGroupError::TagNotFound @@ -192,8 +192,8 @@ impl VersionedGro )) } else { Ok(( - entry.value.clone(), idx.idx().map(|idx| (idx, entry.incarnation)), + entry.value.clone(), )) } }, @@ -291,11 +291,11 @@ impl< pub fn read_from_group( &self, key: &K, - txn_idx: TxnIndex, tag: &T, - ) -> anyhow::Result<(Arc, Version), MVGroupError> { + txn_idx: TxnIndex, + ) -> anyhow::Result<(Version, Arc), MVGroupError> { match self.group_values.get(key) { - Some(g) => g.get_latest_tagged_value(txn_idx, tag), + Some(g) => g.get_latest_tagged_value(tag, txn_idx), None => Err(MVGroupError::Uninitialized), } } @@ -392,18 +392,18 @@ mod test { ); // for reading a tag at ap_1, w.o. returning size, idx = 3 is Uninitialized. assert_matches!( - map.read_from_group(&ap_1, 3, &1), + map.read_from_group(&ap_1, &1, 3), Err(MVGroupError::Uninitialized) ); // ... but idx = 4 should find the previously stored value. assert_eq!( - map.read_from_group(&ap_1, 4, &1).unwrap(), + map.read_from_group(&ap_1, &1, 4).unwrap(), // Arc compares by value, no return size, incarnation. - (Arc::new(TestValue::with_len(1)), Ok((3, 1))) + (Ok((3, 1)), Arc::new(TestValue::with_len(1))) ); // ap_0 should still be uninitialized. assert_matches!( - map.read_from_group(&ap_0, 3, &1), + map.read_from_group(&ap_0, &1, 3), Err(MVGroupError::Uninitialized) ); @@ -415,7 +415,7 @@ mod test { (1..3).map(|i| (i, TestValue::with_len(4))), ); assert_matches!( - map.read_from_group(&ap_2, 4, &2), + map.read_from_group(&ap_2, &2, 4), Err(MVGroupError::Uninitialized) ); map.provide_base_values( @@ -426,21 +426,21 @@ mod test { // Tag not found vs not initialized, assert_matches!( - map.read_from_group(&ap_2, 4, &2), + map.read_from_group(&ap_2, &2, 4), Err(MVGroupError::TagNotFound) ); assert_matches!( - map.read_from_group(&ap_2, 5, &4), + map.read_from_group(&ap_2, &4, 5), Err(MVGroupError::TagNotFound) ); // vs finding a versioned entry from txn 4, vs from storage. assert_eq!( - map.read_from_group(&ap_2, 5, &2).unwrap(), - (Arc::new(TestValue::with_len(4)), Ok((4, 0))) + map.read_from_group(&ap_2, &2, 5).unwrap(), + (Ok((4, 0)), Arc::new(TestValue::with_len(4))) ); assert_eq!( - map.read_from_group(&ap_2, 5, &0).unwrap(), - (Arc::new(TestValue::with_len(2)), Err(StorageVersion)) + map.read_from_group(&ap_2, &0, 5).unwrap(), + (Err(StorageVersion), Arc::new(TestValue::with_len(2))) ); } @@ -458,8 +458,8 @@ mod test { (0..2).map(|i| (i, TestValue::new(vec![5, 3]))), ); assert_eq!( - map.read_from_group(&ap, 12, &1).unwrap(), - (Arc::new(TestValue::new(vec![5, 3])), Ok((5, 3))) + map.read_from_group(&ap, &1, 12).unwrap(), + (Ok((5, 3)), Arc::new(TestValue::new(vec![5, 3]))) ); map.write( ap.clone(), @@ -469,27 +469,27 @@ mod test { (1..3).map(|i| (i, TestValue::new(vec![10, 1]))), ); assert_eq!( - map.read_from_group(&ap, 12, &1).unwrap(), - (Arc::new(TestValue::new(vec![10, 1])), Ok((10, 1))) + map.read_from_group(&ap, &1, 12).unwrap(), + (Ok((10, 1)), Arc::new(TestValue::new(vec![10, 1]))) ); map.mark_estimate(&ap, 10); - assert_matches!(map.read_from_group(&ap, 12, &1), Err(Dependency(10))); - assert_matches!(map.read_from_group(&ap, 12, &2), Err(Dependency(10))); - assert_matches!(map.read_from_group(&ap, 12, &3), Err(Uninitialized)); + assert_matches!(map.read_from_group(&ap, &1, 12), Err(Dependency(10))); + assert_matches!(map.read_from_group(&ap, &2, 12), Err(Dependency(10))); + assert_matches!(map.read_from_group(&ap, &3, 12), Err(Uninitialized)); assert_eq!( - map.read_from_group(&ap, 12, &0).unwrap(), - (Arc::new(TestValue::new(vec![5, 3])), Ok((5, 3))) + map.read_from_group(&ap, &0, 12).unwrap(), + (Ok((5, 3)), Arc::new(TestValue::new(vec![5, 3]))) ); map.delete(&ap, 10); assert_eq!( - map.read_from_group(&ap, 12, &0).unwrap(), - (Arc::new(TestValue::new(vec![5, 3])), Ok((5, 3))) + map.read_from_group(&ap, &0, 12).unwrap(), + (Ok((5, 3)), Arc::new(TestValue::new(vec![5, 3]))) ); assert_eq!( - map.read_from_group(&ap, 12, &1).unwrap(), - (Arc::new(TestValue::new(vec![5, 3])), Ok((5, 3))) + map.read_from_group(&ap, &1, 12).unwrap(), + (Ok((5, 3)), Arc::new(TestValue::new(vec![5, 3]))) ); } diff --git a/types/src/write_set.rs b/types/src/write_set.rs index 9eca13dd046ecb..0ca831e929cc4a 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -7,7 +7,7 @@ use crate::state_store::{ state_key::StateKey, - state_value::{StateValue, StateValueMetadata}, + state_value::{StateValue, StateValueMetadata, StateValueMetadataKind}, }; use anyhow::{bail, Result}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; @@ -170,8 +170,17 @@ impl WriteOp { pub trait TransactionWrite { fn bytes(&self) -> Option<&Bytes>; + // Returns state value that would be observed by a read following the 'self' write. fn as_state_value(&self) -> Option; + // Returns metadata that would be observed by a read following the 'self' write. + // Provided as a separate method to avoid the clone in as_state_value method + // (although default implementation below does just that). + fn as_state_value_metadata(&self) -> Option { + self.as_state_value() + .map(|state_value| state_value.into_metadata()) + } + // Often, the contents of W:TransactionWrite are converted to Option, e.g. // to emulate reading from storage after W has been applied. However, in some contexts, // it is also helpful to convert a StateValue to a potential instance of W that would @@ -212,6 +221,12 @@ impl TransactionWrite for WriteOp { }) } + // Note that even if WriteOp is DeletionWithMetadata, the method returns None, as a later + // read would not read the metadata of the deletion op. + fn as_state_value_metadata(&self) -> Option { + self.bytes().map(|_| self.metadata().cloned()) + } + fn from_state_value(maybe_state_value: Option) -> Self { match maybe_state_value.map(|state_value| state_value.into()) { None => WriteOp::Deletion,