diff --git a/api/src/context.rs b/api/src/context.rs index 1a77f5d5ee1f4..7380be8873e6f 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -30,7 +30,7 @@ use aptos_types::{ account_address::AccountAddress, account_config::{AccountResource, NewBlockEvent}, chain_id::ChainId, - contract_event::EventWithVersion, + contract_event::{ContractEvent, ContractEventV1, EventWithVersion}, event::EventKey, indexer::indexer_db_reader::IndexerReader, ledger_info::LedgerInfoWithSignatures, @@ -818,12 +818,17 @@ impl Context { .into_iter() .zip(infos) .enumerate() - .map(|(i, ((txn, txn_output), info))| { - let version = start_version + i as u64; - let (write_set, events, _, _, _) = txn_output.unpack(); - self.get_accumulator_root_hash(version) - .map(|h| (version, txn, info, events, h, write_set).into()) - }) + .map( + |(i, ((txn, txn_output), info))| -> Result { + let version = start_version + i as u64; + let (write_set, mut events, _, _, _) = txn_output.unpack(); + if self.node_config.indexer_db_config.enable_event_translation { + let _ = self.translate_v2_to_v1_events_for_version(version, &mut events); + } + let h = self.get_accumulator_root_hash(version)?; + Ok((version, txn, info, events, h, write_set).into()) + }, + ) .collect() } @@ -878,7 +883,14 @@ impl Context { })?; txns.into_inner() .into_iter() - .map(|t| self.convert_into_transaction_on_chain_data(t)) + .map(|t| -> Result { + let mut txn = self.convert_into_transaction_on_chain_data(t)?; + if self.node_config.indexer_db_config.enable_event_translation { + let _ = + self.translate_v2_to_v1_events_for_version(txn.version, &mut txn.events); + } + Ok(txn) + }) .collect::>>() .context("Failed to parse account transactions") .map_err(|err| E::internal_with_code(err, AptosErrorCode::InternalError, ledger_info)) @@ -889,10 +901,18 @@ impl Context { hash: HashValue, ledger_version: u64, ) -> Result> { - self.db + if let Some(t) = self + .db .get_transaction_by_hash(hash, ledger_version, true)? - .map(|t| self.convert_into_transaction_on_chain_data(t)) - .transpose() + { + let mut txn: TransactionOnChainData = self.convert_into_transaction_on_chain_data(t)?; + if self.node_config.indexer_db_config.enable_event_translation { + let _ = self.translate_v2_to_v1_events_for_version(txn.version, &mut txn.events); + } + Ok(Some(txn)) + } else { + Ok(None) + } } pub async fn get_pending_transaction_by_hash( @@ -915,11 +935,60 @@ impl Context { version: u64, ledger_version: u64, ) -> Result { - self.convert_into_transaction_on_chain_data(self.db.get_transaction_by_version( - version, - ledger_version, - true, - )?) + let mut txn = self.convert_into_transaction_on_chain_data( + self.db + .get_transaction_by_version(version, ledger_version, true)?, + )?; + if self.node_config.indexer_db_config.enable_event_translation { + let _ = self.translate_v2_to_v1_events_for_version(version, &mut txn.events); + } + Ok(txn) + } + + fn translate_v2_to_v1_events_for_version( + &self, + version: u64, + events: &mut [ContractEvent], + ) -> Result<()> { + for (idx, event) in events.iter_mut().enumerate() { + let translated_event = self + .indexer_reader + .as_ref() + .ok_or(anyhow!("Internal indexer reader doesn't exist"))? + .get_translated_v1_event_by_version_and_index(version, idx as u64); + if let Ok(translated_event) = translated_event { + *event = ContractEvent::V1(translated_event); + } + } + Ok(()) + } + + pub fn translate_v2_to_v1_events_for_simulation( + &self, + events: &mut [ContractEvent], + ) -> Result<()> { + let mut count_map: HashMap = HashMap::new(); + for event in events.iter_mut() { + if let ContractEvent::V2(v2) = event { + let translated_event = self + .indexer_reader + .as_ref() + .ok_or(anyhow!("Internal indexer reader doesn't exist"))? + .translate_event_v2_to_v1(v2)?; + if let Some(v1) = translated_event { + let count = count_map.get(&v1.key()).unwrap_or(&0); + let v1_adjusted = ContractEventV1::new( + v1.key().clone(), + v1.sequence_number() + count, + v1.type_tag().clone(), + v1.event_data().to_vec(), + ); + *event = ContractEvent::V1(v1_adjusted); + count_map.insert(v1.key().clone(), count + 1); + } + } + } + Ok(()) } pub fn get_accumulator_root_hash(&self, version: u64) -> Result { diff --git a/api/src/tests/event_v2_translation_test.rs b/api/src/tests/event_v2_translation_test.rs new file mode 100644 index 0000000000000..e136d6641ffd0 --- /dev/null +++ b/api/src/tests/event_v2_translation_test.rs @@ -0,0 +1,73 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::{new_test_context, new_test_context_with_db_sharding_and_internal_indexer}; +use aptos_api_test_context::current_function_name; +use serde_json::json; +use std::time::Duration; +use tokio::time::sleep; + +static MODULE_EVENT_MIGRATION: u64 = 57; + +const SLEEP_DURATION: Duration = Duration::from_millis(250); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_feature_enable_disable() { + let mut context = new_test_context(current_function_name!()); + context.enable_feature(MODULE_EVENT_MIGRATION).await; + assert!(context.is_feature_enabled(MODULE_EVENT_MIGRATION).await); + context.disable_feature(MODULE_EVENT_MIGRATION).await; + assert!(!context.is_feature_enabled(MODULE_EVENT_MIGRATION).await); + context.enable_feature(MODULE_EVENT_MIGRATION).await; + assert!(context.is_feature_enabled(MODULE_EVENT_MIGRATION).await); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_event_v2_translation_simulation() { + // let context = &mut new_test_context(current_function_name!()); + let context = + &mut new_test_context_with_db_sharding_and_internal_indexer(current_function_name!()); + + let account1 = &mut context.api_create_account().await; + // let account1 = &mut context.create_account().await; + + sleep(SLEEP_DURATION).await; + + let account2 = &mut context.api_create_account().await; + // let account2 = &mut context.create_account().await; + + sleep(SLEEP_DURATION).await; + + context.enable_feature(MODULE_EVENT_MIGRATION).await; + + // sleep(SLEEP_DURATION).await; + + let payload = json!({ + "type": "entry_function_payload", + "function": "0x1::coin::transfer", + "type_arguments": ["0x1::aptos_coin::AptosCoin"], + "arguments": [ + account1.address().to_hex_literal(), "100" + ] + }); + let resp = context.simulate_transaction(account2, payload, 200).await; + + // sleep(SLEEP_DURATION).await; + + // The V2 event should not appear. + assert!(!resp[0]["events"] + .as_array() + .unwrap() + .iter() + .any(|x| x["type"] == "0x1::coin::CoinDeposit")); + + // The translated V1 event should appear. + assert!(resp[0]["events"] + .as_array() + .unwrap() + .iter() + .any(|x| x["type"] == "0x1::coin::DepositEvent" + && x["guid"]["creation_number"] == "2" + && x["guid"]["account_address"] == account1.address().to_hex_literal())); +} diff --git a/api/src/tests/mod.rs b/api/src/tests/mod.rs index 340721b1ce200..8c2bb45be9e8b 100644 --- a/api/src/tests/mod.rs +++ b/api/src/tests/mod.rs @@ -5,6 +5,7 @@ mod accounts_test; mod blocks_test; mod converter_test; +mod event_v2_translation_test; mod events_test; mod index_test; mod invalid_post_request_test; @@ -36,7 +37,7 @@ fn new_test_context_with_config(test_name: String, node_config: NodeConfig) -> T fn new_test_context_with_db_sharding_and_internal_indexer(test_name: String) -> TestContext { let mut node_config = NodeConfig::default(); node_config.storage.rocksdb_configs.enable_storage_sharding = true; - node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10); + node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, true, 10); let test_context = super_new_test_context(test_name, node_config, false, None); let _ = test_context .get_indexer_reader() @@ -51,6 +52,6 @@ fn new_test_context_with_sharding_and_delayed_internal_indexer( ) -> TestContext { let mut node_config = NodeConfig::default(); node_config.storage.rocksdb_configs.enable_storage_sharding = true; - node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 1); + node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, true, 1); super_new_test_context(test_name, node_config, false, end_version) } diff --git a/api/src/transactions.rs b/api/src/transactions.rs index 56495de0ad2c6..03f0f73e73ced 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -1439,11 +1439,16 @@ impl TransactionsApi { output.gas_used(), exe_status, ); + let mut events = output.events().to_vec(); + let _ = self + .context + .translate_v2_to_v1_events_for_simulation(&mut events); + let simulated_txn = TransactionOnChainData { version, transaction: txn, info, - events: output.events().to_vec(), + events, accumulator_root_hash: zero_hash, changes: output.write_set().clone(), }; diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index f18f382263187..75f79dbba53c2 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -367,6 +367,52 @@ impl TestContext { ) } + pub async fn enable_feature(&mut self, feature: u64) { + // script { + // fun main(root: &signer, feature: u64) { + // let aptos_framework = aptos_framework::aptos_governance::get_signer_testnet_only(root, @0x1); + // std::features::change_feature_flags_for_next_epoch(&aptos_framework, vector[feature], vector[]); + // aptos_framework::aptos_governance::reconfigure(&aptos_framework); + // std::features::on_new_epoch(&aptos_framework); + // } + // } + let mut root = self.root_account().await; + self.api_execute_script( + &mut root, + "a11ceb0b0700000a06010004030418051c1707336f08a2012006c201260000000100020301000101030502000100040602000101050602000102060c03010c0002060c05010303060c0a030a0301060c106170746f735f676f7665726e616e6365086665617475726573176765745f7369676e65725f746573746e65745f6f6e6c79236368616e67655f666561747572655f666c6167735f666f725f6e6578745f65706f63680b7265636f6e6669677572650c6f6e5f6e65775f65706f63680000000000000000000000000000000000000000000000000000000000000001052000000000000000000000000000000000000000000000000000000000000000010a0301000000010e0b00070011000c020e020b0140040100000000000000070111010e0211020e02110302", + json!([]), + json!([feature.to_string()]), + ).await; + } + + pub async fn disable_feature(&mut self, feature: u64) { + // script { + // fun main(root: &signer, feature: u64) { + // let aptos_framework = aptos_framework::aptos_governance::get_signer_testnet_only(root, @0x1); + // std::features::change_feature_flags_for_next_epoch(&aptos_framework, vector[], vector[feature]); + // aptos_framework::aptos_governance::reconfigure(&aptos_framework); + // std::features::on_new_epoch(&aptos_framework); + // } + // } + let mut root = self.root_account().await; + self.api_execute_script( + &mut root, + "a11ceb0b0700000a06010004030418051c1707336f08a2012006c201260000000100020301000101030502000100040602000101050602000102060c03010c0002060c05010303060c0a030a0301060c106170746f735f676f7665726e616e6365086665617475726573176765745f7369676e65725f746573746e65745f6f6e6c79236368616e67655f666561747572655f666c6167735f666f725f6e6578745f65706f63680b7265636f6e6669677572650c6f6e5f6e65775f65706f63680000000000000000000000000000000000000000000000000000000000000001052000000000000000000000000000000000000000000000000000000000000000010a0301000000010e0b00070011000c020e0207010b014004010000000000000011010e0211020e02110302", + json!([]), + json!([feature.to_string()]), + ).await; + } + + pub async fn is_feature_enabled(&self, feature: u64) -> bool { + let request = json!({ + "function":"0x1::features::is_enabled", + "arguments": vec![feature.to_string()], + "type_arguments": Vec::::new(), + }); + let resp = self.post("/view", request).await; + resp[0].as_bool().unwrap() + } + pub fn latest_state_view(&self) -> DbStateView { self.context .state_view_at_version(self.get_latest_ledger_info().version()) @@ -395,6 +441,29 @@ impl TestContext { account } + pub async fn api_create_account(&mut self) -> LocalAccount { + let root = &mut self.root_account().await; + let account = self.gen_account(); + self.api_execute_aptos_account_transfer(root, account.address(), TRANSFER_AMOUNT) + .await; + account + } + + pub async fn api_execute_aptos_account_transfer( + &mut self, + sender: &mut LocalAccount, + receiver: AccountAddress, + amount: u64, + ) { + self.api_execute_entry_function( + sender, + "0x1::aptos_account::transfer", + json!([]), + json!([receiver.to_hex_literal(), amount.to_string()]), + ) + .await; + } + pub async fn create_user_account(&self, account: &LocalAccount) -> SignedTransaction { let mut tc = self.root_account().await; self.create_user_account_by(&mut tc, account) @@ -861,6 +930,27 @@ impl TestContext { .await; } + pub async fn api_execute_script( + &mut self, + account: &mut LocalAccount, + bytecode: &str, + type_args: serde_json::Value, + args: serde_json::Value, + ) { + self.api_execute_txn( + account, + json!({ + "type": "script_payload", + "code": { + "bytecode": bytecode, + }, + "type_arguments": type_args, + "arguments": args + }), + ) + .await; + } + pub async fn api_execute_txn(&mut self, account: &mut LocalAccount, payload: Value) { self.api_execute_txn_expecting(account, payload, 202).await; } diff --git a/aptos-move/framework/aptos-framework/doc/coin.md b/aptos-move/framework/aptos-framework/doc/coin.md index f43569cd0787a..a0f3b2a75db6b 100644 --- a/aptos-move/framework/aptos-framework/doc/coin.md +++ b/aptos-move/framework/aptos-framework/doc/coin.md @@ -2815,11 +2815,12 @@ Deposit the coin balance into the recipient's account and emit an event. event::emit( CoinDeposit { coin_type: type_name<CoinType>(), account: account_addr, amount: coin.value } ); + } else { + event::emit_event<DepositEvent>( + &mut coin_store.deposit_events, + DepositEvent { amount: coin.value }, + ); }; - event::emit_event<DepositEvent>( - &mut coin_store.deposit_events, - DepositEvent { amount: coin.value }, - ); merge(&mut coin_store.coin, coin); } else { let metadata = paired_metadata<CoinType>(); diff --git a/aptos-move/framework/aptos-framework/sources/coin.move b/aptos-move/framework/aptos-framework/sources/coin.move index 5f6598024d492..3675141f400a0 100644 --- a/aptos-move/framework/aptos-framework/sources/coin.move +++ b/aptos-move/framework/aptos-framework/sources/coin.move @@ -900,11 +900,12 @@ module aptos_framework::coin { event::emit( CoinDeposit { coin_type: type_name(), account: account_addr, amount: coin.value } ); + } else { + event::emit_event( + &mut coin_store.deposit_events, + DepositEvent { amount: coin.value }, + ); }; - event::emit_event( - &mut coin_store.deposit_events, - DepositEvent { amount: coin.value }, - ); merge(&mut coin_store.coin, coin); } else { let metadata = paired_metadata(); diff --git a/config/src/config/internal_indexer_db_config.rs b/config/src/config/internal_indexer_db_config.rs index 323d02c090b45..59a887d4d1c51 100644 --- a/config/src/config/internal_indexer_db_config.rs +++ b/config/src/config/internal_indexer_db_config.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; pub struct InternalIndexerDBConfig { pub enable_transaction: bool, pub enable_event: bool, + pub enable_event_translation: bool, pub enable_statekeys: bool, pub batch_size: usize, } @@ -20,12 +21,14 @@ impl InternalIndexerDBConfig { pub fn new( enable_transaction: bool, enable_event: bool, + enable_event_translation: bool, enable_statekeys: bool, batch_size: usize, ) -> Self { Self { enable_transaction, enable_event, + enable_event_translation, enable_statekeys, batch_size, } @@ -39,6 +42,10 @@ impl InternalIndexerDBConfig { self.enable_event } + pub fn enable_event_translation(&self) -> bool { + self.enable_event_translation + } + pub fn enable_statekeys(&self) -> bool { self.enable_statekeys } @@ -57,6 +64,7 @@ impl Default for InternalIndexerDBConfig { Self { enable_transaction: false, enable_event: false, + enable_event_translation: false, enable_statekeys: false, batch_size: 10_000, } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index 1806cf6231e7c..e76069b04b4f0 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -51,7 +51,8 @@ impl InternalIndexerDBService { .expect("Failed to open internal indexer db"), ); - let internal_indexer_db_config = InternalIndexerDBConfig::new(false, false, true, 10_000); + let internal_indexer_db_config = + InternalIndexerDBConfig::new(false, false, false, true, 10_000); Some(InternalIndexerDB::new(arc_db, internal_indexer_db_config)) } diff --git a/storage/aptosdb/src/event_store/mod.rs b/storage/aptosdb/src/event_store/mod.rs index 1ff4d31c330d4..e909ab782a4f7 100644 --- a/storage/aptosdb/src/event_store/mod.rs +++ b/storage/aptosdb/src/event_store/mod.rs @@ -19,13 +19,14 @@ use aptos_crypto::{ }; use aptos_db_indexer_schemas::schema::{ event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, + translated_v1_event::TranslatedV1EventSchema, }; use aptos_schemadb::{iterator::SchemaIterator, schema::ValueCodec, ReadOptions, SchemaBatch, DB}; use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result}; use aptos_types::{ account_address::AccountAddress, account_config::{new_block_event_key, NewBlockEvent}, - contract_event::ContractEvent, + contract_event::{ContractEvent, ContractEventV1}, event::EventKey, proof::position::Position, transaction::Version, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 4288fcc8bebb6..899c392d598cb 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -107,7 +107,7 @@ pub(crate) struct StateStore { buffered_state: Mutex, buffered_state_target_items: usize, smt_ancestors: Mutex>, - internal_indexer_db: Option, + pub internal_indexer_db: Option, } impl Deref for StateStore { diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index ac3d18709a068..7e054ff4abca2 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -9,6 +9,7 @@ use aptos_db_indexer_schemas::{ event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema, transaction_by_account::TransactionByAccountSchema, + translated_v1_event::TranslatedV1EventSchema, }, utils::{ error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter, @@ -17,25 +18,33 @@ use aptos_db_indexer_schemas::{ }; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{ - db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result, + db_ensure as ensure, db_other_bail as bail, state_view::LatestDbStateCheckpointView, + AptosDbError, DbReader, Result, }; use aptos_types::{ account_address::AccountAddress, - contract_event::{ContractEvent, EventWithVersion}, + account_config::{CoinStoreResource, DepositEvent, DEPOSIT_EVENT_TYPE}, + coin_deposit::{CoinDeposit, COIN_DEPOSIT_TYPE_STR}, + contract_event::{ContractEvent, ContractEventV1, ContractEventV2, EventWithVersion}, event::EventKey, indexer::indexer_db_reader::Order, state_store::{ state_key::{prefix::StateKeyPrefix, StateKey}, state_value::StateValue, + TStateView, }, transaction::{AccountTransactionsWithProof, Transaction, Version}, write_set::{TransactionWrite, WriteSet}, + DummyCoinType, }; +use move_core_types::language_storage::StructTag; use std::{ cmp::min, + collections::HashMap, + str::FromStr, sync::{ mpsc::{self, Receiver, Sender}, - Arc, + Arc, Mutex, }, thread, }; @@ -118,6 +127,10 @@ impl InternalIndexerDB { self.config.enable_event } + pub fn event_translation_enabled(&self) -> bool { + self.config.enable_event_translation + } + pub fn transaction_enabled(&self) -> bool { self.config.enable_transaction } @@ -273,6 +286,16 @@ impl InternalIndexerDB { .get::(key)? .map(|v| v.expect_version())) } + + pub fn get_translated_v1_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result { + self.db + .get::(&(version, index))? + .ok_or_else(|| AptosDbError::NotFound(format!("Event {} of Txn {}", index, version))) + } } pub struct DBIndexer { @@ -280,6 +303,7 @@ pub struct DBIndexer { pub main_db_reader: Arc, sender: Sender>, committer_handle: Option>, + event_sequence_number_cache: Mutex>, } impl Drop for DBIndexer { @@ -310,6 +334,7 @@ impl DBIndexer { main_db_reader: db_reader, sender, committer_handle: Some(committer_handle), + event_sequence_number_cache: Mutex::new(HashMap::new()), } } @@ -379,7 +404,7 @@ impl DBIndexer { } if self.indexer_db.event_enabled() { - events.iter().enumerate().for_each(|(idx, event)| { + events.iter().enumerate().try_for_each(|(idx, event)| { if let ContractEvent::V1(v1) = event { batch .put::( @@ -394,7 +419,43 @@ impl DBIndexer { ) .expect("Failed to put events by version to a batch"); } - }); + if self.indexer_db.event_translation_enabled() { + if let ContractEvent::V2(v2) = event { + if let Some(translated_v1_event) = + self.translate_event_v2_to_v1(v2).map_err(|e| { + anyhow::anyhow!( + "Failed to translate event: {:?}. Error: {}", + v2, + e + ) + })? + { + let key = *translated_v1_event.key(); + let sequence_number = translated_v1_event.sequence_number(); + self.cache_sequence_number(&key, sequence_number); + batch + .put::( + &(key, sequence_number), + &(version, idx as u64), + ) + .expect("Failed to put events by key to a batch"); + batch + .put::( + &(key, version, sequence_number), + &(idx as u64), + ) + .expect("Failed to put events by version to a batch"); + batch + .put::( + &(version, idx as u64), + &translated_v1_event, + ) + .expect("Failed to put translated v1 events to a batch"); + } + } + } + Ok::<(), AptosDbError>(()) + })?; } if self.indexer_db.statekeys_enabled() { @@ -441,6 +502,76 @@ impl DBIndexer { Ok(version) } + fn get_resource( + &self, + address: &AccountAddress, + struct_tag_str: &str, + ) -> Result> { + let state_view = self + .main_db_reader + .latest_state_checkpoint_view() + .expect("Failed to get state view"); + + let struct_tag = StructTag::from_str(struct_tag_str)?; + let state_key = StateKey::resource(address, &struct_tag)?; + let maybe_state_value = state_view.get_state_value(&state_key)?; + Ok(maybe_state_value) + } + + pub fn translate_event_v2_to_v1( + &self, + v2: &ContractEventV2, + ) -> Result> { + match v2.type_tag().to_canonical_string().as_str() { + COIN_DEPOSIT_TYPE_STR => { + let coin_deposit = CoinDeposit::try_from_bytes(v2.event_data())?; + let struct_tag_str = format!("0x1::coin::CoinStore<{}>", coin_deposit.coin_type()); + // We can use `DummyCoinType` as it does not affect the correctness of deserialization. + let state_value = self + .get_resource(coin_deposit.account(), &struct_tag_str)? + .expect("Event handle resource not found"); + let coin_store_resource: CoinStoreResource = + bcs::from_bytes(state_value.bytes())?; + + let key = *coin_store_resource.deposit_events().key(); + let sequence_number = self + .get_next_sequence_number(&key, coin_store_resource.deposit_events().count())?; + + let deposit_event = DepositEvent::new(coin_deposit.amount()); + Ok(Some(ContractEventV1::new( + key, + sequence_number, + DEPOSIT_EVENT_TYPE.clone(), + bcs::to_bytes(&deposit_event)?, + ))) + }, + _ => Ok(None), + } + } + + fn cache_sequence_number(&self, event_key: &EventKey, sequence_number: u64) { + let mut cache = self.event_sequence_number_cache.lock().unwrap(); + cache.insert(*event_key, sequence_number); + } + + pub fn get_next_sequence_number(&self, event_key: &EventKey, default: u64) -> Result { + let mut cache = self.event_sequence_number_cache.lock().unwrap(); + if let Some(seq) = cache.get_mut(event_key) { + Ok(*seq + 1) + } else { + let mut iter = self.indexer_db.db.iter::()?; + iter.seek_for_prev(&(*event_key, u64::max_value()))?; + let seq = iter.next().transpose()?.map_or(default, |((key, seq), _)| { + if &key == event_key { + seq + 1 + } else { + default + } + }); + Ok(seq) + } + } + pub fn get_account_transactions( &self, address: AccountAddress, @@ -550,9 +681,16 @@ impl DBIndexer { let mut events_with_version = event_indices .into_iter() .map(|(seq, ver, idx)| { - let event = self + let event = match self .main_db_reader - .get_event_by_version_and_index(ver, idx)?; + .get_event_by_version_and_index(ver, idx)? + { + event @ ContractEvent::V1(_) => event, + ContractEvent::V2(_) => ContractEvent::V1( + self.indexer_db + .get_translated_v1_event_by_version_and_index(ver, idx)?, + ), + }; let v0 = match &event { ContractEvent::V1(event) => event, ContractEvent::V2(_) => bail!("Unexpected module event"), @@ -563,6 +701,7 @@ impl DBIndexer { seq, v0.sequence_number() ); + Ok(EventWithVersion::new(ver, event)) }) .collect::>>()?; diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs index 535d043d50119..a6b79b0a22af6 100644 --- a/storage/indexer/src/indexer_reader.rs +++ b/storage/indexer/src/indexer_reader.rs @@ -5,7 +5,7 @@ use crate::{db_indexer::DBIndexer, db_v2::IndexerAsyncV2}; use anyhow::anyhow; use aptos_types::{ account_address::AccountAddress, - contract_event::EventWithVersion, + contract_event::{ContractEventV1, ContractEventV2, EventWithVersion}, event::EventKey, indexer::indexer_db_reader::{IndexerReader, Order}, state_store::{ @@ -157,4 +157,35 @@ impl IndexerReader for IndexerReaders { } anyhow::bail!("DB indexer reader is not available") } + + fn get_translated_v1_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> anyhow::Result { + if let Some(db_indexer_reader) = &self.db_indexer_reader { + if db_indexer_reader.indexer_db.event_translation_enabled() { + return Ok(db_indexer_reader + .indexer_db + .get_translated_v1_event_by_version_and_index(version, index)?); + } else { + anyhow::bail!("Event translation is not enabled") + } + } + anyhow::bail!("DB indexer reader is not available") + } + + fn translate_event_v2_to_v1( + &self, + v2: &ContractEventV2, + ) -> anyhow::Result> { + if let Some(db_indexer_reader) = &self.db_indexer_reader { + if db_indexer_reader.indexer_db.event_translation_enabled() { + return Ok(db_indexer_reader.translate_event_v2_to_v1(v2)?); + } else { + anyhow::bail!("Event translation is not enabled") + } + } + anyhow::bail!("DB indexer reader is not available") + } } diff --git a/storage/indexer_schemas/src/schema/mod.rs b/storage/indexer_schemas/src/schema/mod.rs index 0f4dfd4c7bcd6..2d436012cbd65 100644 --- a/storage/indexer_schemas/src/schema/mod.rs +++ b/storage/indexer_schemas/src/schema/mod.rs @@ -12,6 +12,9 @@ pub mod indexer_metadata; pub mod state_keys; pub mod table_info; pub mod transaction_by_account; +pub mod translated_v1_event; + +use anyhow::ensure; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; @@ -22,6 +25,7 @@ pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account"; pub const STATE_KEYS_CF_NAME: ColumnFamilyName = "state_keys"; +pub const TRANSLATED_V1_EVENT_CF_NAME: ColumnFamilyName = "translated_v1_event"; pub fn column_families() -> Vec { vec![ @@ -39,5 +43,16 @@ pub fn internal_indexer_column_families() -> Vec { EVENT_BY_VERSION_CF_NAME, TRANSACTION_BY_ACCOUNT_CF_NAME, STATE_KEYS_CF_NAME, + TRANSLATED_V1_EVENT_CF_NAME, ] } + +fn ensure_slice_len_eq(data: &[u8], len: usize) -> anyhow::Result<()> { + ensure!( + data.len() == len, + "Unexpected data len {}, expected {}.", + data.len(), + len, + ); + Ok(()) +} diff --git a/storage/indexer_schemas/src/schema/translated_v1_event/mod.rs b/storage/indexer_schemas/src/schema/translated_v1_event/mod.rs new file mode 100644 index 0000000000000..9f196482389d8 --- /dev/null +++ b/storage/indexer_schemas/src/schema/translated_v1_event/mod.rs @@ -0,0 +1,66 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for the contract events. +//! +//! A translated v1 event is keyed by the version of the transaction it belongs to and the index of +//! the original v2 event among all events yielded by the same transaction. +//! ```text +//! |<-------key----->|<---value--->| +//! | version | index | event bytes | +//! ``` + +use crate::schema::{ensure_slice_len_eq, TRANSLATED_V1_EVENT_CF_NAME}; +use anyhow::Result; +use aptos_schemadb::{ + define_pub_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{contract_event::ContractEventV1, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::mem::size_of; + +define_pub_schema!( + TranslatedV1EventSchema, + Key, + ContractEventV1, + TRANSLATED_V1_EVENT_CF_NAME +); + +type Index = u64; +type Key = (Version, Index); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (version, index) = *self; + + let mut encoded_key = Vec::with_capacity(size_of::() + size_of::()); + encoded_key.write_u64::(version)?; + encoded_key.write_u64::(index)?; + Ok(encoded_key) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + let version_size = size_of::(); + + let version = (&data[..version_size]).read_u64::()?; + let index = (&data[version_size..]).read_u64::()?; + Ok((version, index)) + } +} + +impl ValueCodec for ContractEventV1 { + fn encode_value(&self) -> Result> { + bcs::to_bytes(self).map_err(Into::into) + } + + fn decode_value(data: &[u8]) -> Result { + bcs::from_bytes(data).map_err(Into::into) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer_schemas/src/schema/translated_v1_event/test.rs b/storage/indexer_schemas/src/schema/translated_v1_event/test.rs new file mode 100644 index 0000000000000..d28c191121c04 --- /dev/null +++ b/storage/indexer_schemas/src/schema/translated_v1_event/test.rs @@ -0,0 +1,20 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + version in any::(), + index in any::(), + event in any::(), + ) { + assert_encode_decode::(&(version, index), &event); + } +} + +test_no_panic_decoding!(TranslatedV1EventSchema); diff --git a/types/src/account_config/events/deposit.rs b/types/src/account_config/events/deposit.rs index 2c59bc4fc336f..54269f54cfe22 100644 --- a/types/src/account_config/events/deposit.rs +++ b/types/src/account_config/events/deposit.rs @@ -2,7 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use move_core_types::{ident_str, identifier::IdentStr, move_resource::MoveStructType}; +use move_core_types::{ + ident_str, + identifier::IdentStr, + language_storage::{StructTag, TypeTag, CORE_CODE_ADDRESS}, + move_resource::MoveStructType, +}; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; /// Struct that represents a DepositPaymentEvent. @@ -12,6 +18,10 @@ pub struct DepositEvent { } impl DepositEvent { + pub fn new(amount: u64) -> Self { + Self { amount } + } + pub fn try_from_bytes(bytes: &[u8]) -> Result { bcs::from_bytes(bytes).map_err(Into::into) } @@ -26,3 +36,12 @@ impl MoveStructType for DepositEvent { const MODULE_NAME: &'static IdentStr = ident_str!("coin"); const STRUCT_NAME: &'static IdentStr = ident_str!("DepositEvent"); } + +pub static DEPOSIT_EVENT_TYPE: Lazy = Lazy::new(|| { + TypeTag::Struct(Box::new(StructTag { + address: CORE_CODE_ADDRESS, + module: ident_str!("coin").to_owned(), + name: ident_str!("DepositEvent").to_owned(), + type_args: vec![], + })) +}); diff --git a/types/src/coin_deposit.rs b/types/src/coin_deposit.rs new file mode 100644 index 0000000000000..46d4bf79b648a --- /dev/null +++ b/types/src/coin_deposit.rs @@ -0,0 +1,55 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use move_core_types::{ + account_address::AccountAddress, + ident_str, + language_storage::{StructTag, TypeTag, CORE_CODE_ADDRESS}, +}; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct CoinDeposit { + coin_type: String, + account: AccountAddress, + amount: u64, +} + +impl CoinDeposit { + pub fn new(coin_type: String, account: AccountAddress, amount: u64) -> Self { + Self { + coin_type, + account, + amount, + } + } + + pub fn try_from_bytes(bytes: &[u8]) -> anyhow::Result { + bcs::from_bytes(bytes).map_err(Into::into) + } + + pub fn coin_type(&self) -> &str { + &self.coin_type + } + + pub fn account(&self) -> &AccountAddress { + &self.account + } + + pub fn amount(&self) -> u64 { + self.amount + } +} + +pub const COIN_DEPOSIT_TYPE_STR: &str = + "0000000000000000000000000000000000000000000000000000000000000001::coin::CoinDeposit"; + +pub static COIN_DEPOSIT_TYPE: Lazy = Lazy::new(|| { + TypeTag::Struct(Box::new(StructTag { + address: CORE_CODE_ADDRESS, + module: ident_str!("coin").to_owned(), + name: ident_str!("CoinDeposit").to_owned(), + type_args: vec![], + })) +}); diff --git a/types/src/contract_event.rs b/types/src/contract_event.rs index 59d9a27fae406..277c068ca57c0 100644 --- a/types/src/contract_event.rs +++ b/types/src/contract_event.rs @@ -169,6 +169,7 @@ impl ContractEvent { /// Entry produced via a call to the `emit_event` builtin. #[derive(Hash, Clone, Eq, PartialEq, Serialize, Deserialize, CryptoHasher)] +#[cfg_attr(any(test, feature = "fuzzing"), derive(Arbitrary))] pub struct ContractEventV1 { /// The unique key that the event was emitted to key: EventKey, diff --git a/types/src/indexer/indexer_db_reader.rs b/types/src/indexer/indexer_db_reader.rs index 164b927de37dc..ad84c58c8f762 100644 --- a/types/src/indexer/indexer_db_reader.rs +++ b/types/src/indexer/indexer_db_reader.rs @@ -3,7 +3,7 @@ use crate::{ account_address::AccountAddress, - contract_event::EventWithVersion, + contract_event::{ContractEventV1, ContractEventV2, EventWithVersion}, event::EventKey, state_store::{ state_key::{prefix::StateKeyPrefix, StateKey}, @@ -72,4 +72,11 @@ pub trait IndexerReader: Send + Sync { Ok(()) } + fn get_translated_v1_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result; + + fn translate_event_v2_to_v1(&self, v2: &ContractEventV2) -> Result>; } diff --git a/types/src/lib.rs b/types/src/lib.rs index 9081b6c0f0a4d..c76d7d7507f12 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -11,6 +11,7 @@ pub mod block_info; pub mod block_metadata; pub mod block_metadata_ext; pub mod chain_id; +pub mod coin_deposit; pub mod contract_event; pub mod dkg; pub mod epoch_change; diff --git a/types/src/utility_coin.rs b/types/src/utility_coin.rs index 0074ac9fb7478..cdd3f4448c2ff 100644 --- a/types/src/utility_coin.rs +++ b/types/src/utility_coin.rs @@ -34,3 +34,23 @@ impl CoinType for AptosCoinType { AccountAddress::ONE } } + +pub static DUMMY_COIN_TYPE: Lazy = Lazy::new(|| { + TypeTag::Struct(Box::new(StructTag { + address: AccountAddress::ONE, + module: ident_str!("dummy_coin").to_owned(), + name: ident_str!("DummyCoin").to_owned(), + type_args: vec![], + })) +}); + +pub struct DummyCoinType; +impl CoinType for DummyCoinType { + fn type_tag() -> TypeTag { + DUMMY_COIN_TYPE.clone() + } + + fn coin_info_address() -> AccountAddress { + AccountAddress::ONE + } +}