From 625118a03f8f49e36c97ed709080a7ddccbe7484 Mon Sep 17 00:00:00 2001 From: jill <121921928+jillxuu@users.noreply.github.com> Date: Mon, 12 Feb 2024 12:07:13 -0800 Subject: [PATCH] [table info][1/4] separate indexer async v2 db from aptosdb (#11799) * separate indexer async v2 db from aptosdb * address comments * remove unrelated changes --- Cargo.lock | 27 ++------- api/Cargo.toml | 1 + .../src/fake_context.rs | 1 + api/src/accounts.rs | 10 +++- api/src/context.rs | 8 ++- api/src/events.rs | 5 +- api/src/runtime.rs | 5 +- api/src/state.rs | 10 +++- api/src/tests/converter_test.rs | 2 +- api/src/transactions.rs | 24 ++++++-- api/src/view_function.rs | 6 +- api/test-context/src/test_context.rs | 2 +- api/types/Cargo.toml | 2 + api/types/src/convert.rs | 57 +++++++++++++------ .../src/aptos_test_harness.rs | 2 +- .../src/storage_interface.rs | 1 - aptos-node/Cargo.toml | 1 + aptos-node/src/services.rs | 16 +++++- crates/aptos-genesis/src/lib.rs | 1 - crates/aptos-genesis/src/mainnet.rs | 1 - crates/indexer/src/indexer/fetcher.rs | 2 +- crates/indexer/src/runtime.rs | 8 ++- .../indexer-grpc-fullnode/Cargo.toml | 1 + .../indexer-grpc-fullnode/src/runtime.rs | 10 +++- .../src/stream_coordinator.rs | 3 +- .../indexer-grpc-table-info/Cargo.toml | 31 +--------- .../indexer-grpc-table-info/src/runtime.rs | 45 ++++++++++----- .../src/table_info_service.rs | 42 ++++++-------- .../executor-benchmark/src/db_generator.rs | 1 - execution/executor-benchmark/src/lib.rs | 1 - .../src/tests/driver_factory.rs | 1 - storage/aptosdb/src/db/aptosdb_test.rs | 1 - .../src/db/include/aptosdb_internal.rs | 21 ------- .../aptosdb/src/db/include/aptosdb_reader.rs | 47 +-------------- .../src/db/include/aptosdb_testonly.rs | 7 +-- .../aptosdb/src/db/include/aptosdb_writer.rs | 41 ------------- storage/aptosdb/src/db/mod.rs | 8 +-- .../aptosdb/src/fast_sync_storage_wrapper.rs | 2 - storage/backup/backup-cli/src/utils/mod.rs | 1 - storage/db-tool/src/bootstrap.rs | 1 - storage/db-tool/src/replay_verify.rs | 1 - storage/indexer/src/db_ops.rs | 22 +++++++ storage/indexer/src/db_v2.rs | 43 ++++++-------- storage/indexer/src/lib.rs | 2 + storage/indexer/src/table_info_reader.rs | 19 +++++++ storage/storage-interface/src/lib.rs | 32 ----------- 46 files changed, 251 insertions(+), 324 deletions(-) create mode 100644 storage/indexer/src/db_ops.rs create mode 100644 storage/indexer/src/table_info_reader.rs diff --git a/Cargo.lock b/Cargo.lock index a793e23e9e595..2be34d423d123 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -399,6 +399,7 @@ dependencies = [ "aptos-cached-packages", "aptos-config", "aptos-crypto", + "aptos-db-indexer", "aptos-framework", "aptos-gas-meter", "aptos-gas-schedule", @@ -517,6 +518,7 @@ dependencies = [ "anyhow", "aptos-config", "aptos-crypto", + "aptos-db-indexer", "aptos-framework", "aptos-logger", "aptos-openapi", @@ -2183,6 +2185,7 @@ dependencies = [ "aptos-config", "aptos-crypto", "aptos-db", + "aptos-db-indexer", "aptos-executor", "aptos-executor-types", "aptos-framework", @@ -2299,50 +2302,31 @@ version = "1.0.0" dependencies = [ "anyhow", "aptos-api", - "aptos-api-test-context", "aptos-api-types", "aptos-bitvec", "aptos-config", - "aptos-crypto", "aptos-db", - "aptos-executor", - "aptos-executor-types", - "aptos-framework", - "aptos-genesis", - "aptos-global-constants", + "aptos-db-indexer", "aptos-indexer-grpc-fullnode", "aptos-indexer-grpc-utils", "aptos-logger", "aptos-mempool", - "aptos-mempool-notifications", "aptos-metrics-core", - "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf)", - "aptos-proptest-helpers", "aptos-protos 1.1.2", "aptos-rocksdb-options", "aptos-runtimes", - "aptos-sdk", - "aptos-secure-storage", + "aptos-schemadb", "aptos-storage-interface", - "aptos-temppath", "aptos-types", - "aptos-vm", - "aptos-vm-validator", "base64 0.13.1", "bytes", "chrono", "fail 0.5.1", "futures", - "goldenfile", "hex", "hyper", - "move-binary-format", - "move-core-types", - "move-package", "move-resource-viewer", "once_cell", - "rand 0.7.3", - "regex", "serde", "serde_json", "tokio", @@ -3027,6 +3011,7 @@ dependencies = [ "aptos-data-client", "aptos-data-streaming-service", "aptos-db", + "aptos-db-indexer", "aptos-dkg-runtime", "aptos-event-notifications", "aptos-executor", diff --git a/api/Cargo.toml b/api/Cargo.toml index 7a0aa061fa505..04cd8ec534cc6 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -19,6 +19,7 @@ aptos-bcs-utils = { workspace = true } aptos-build-info = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-gas-schedule = { workspace = true } aptos-global-constants = { workspace = true } diff --git a/api/openapi-spec-generator/src/fake_context.rs b/api/openapi-spec-generator/src/fake_context.rs index 5a87200f855ac..98dee1782b641 100644 --- a/api/openapi-spec-generator/src/fake_context.rs +++ b/api/openapi-spec-generator/src/fake_context.rs @@ -16,5 +16,6 @@ pub fn get_fake_context() -> Context { Arc::new(MockDbReaderWriter), mempool.ac_client, NodeConfig::default(), + None, /* table info reader */ ) } diff --git a/api/src/accounts.rs b/api/src/accounts.rs index 0ae87d4f634b9..4cfa4bc398de8 100644 --- a/api/src/accounts.rs +++ b/api/src/accounts.rs @@ -351,7 +351,10 @@ impl Account { .latest_state_view_poem(&self.latest_ledger_info)?; let converted_resources = state_view .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_resources(resources.iter().map(|(k, v)| (k.clone(), v.as_slice()))) .context("Failed to build move resource response from data in DB") .map_err(|err| { @@ -545,7 +548,10 @@ impl Account { })?; resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .move_struct_fields(resource_type, &bytes) .context("Failed to convert move structs from storage") .map_err(|err| { diff --git a/api/src/context.rs b/api/src/context.rs index cf30dd40b3972..b8a84503ffe18 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -18,6 +18,7 @@ use aptos_api_types::{ }; use aptos_config::config::{NodeConfig, RoleType}; use aptos_crypto::HashValue; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule}; use aptos_logger::{error, info, warn, Schema}; use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; @@ -76,6 +77,7 @@ pub struct Context { gas_limit_cache: Arc>, view_function_stats: Arc, simulate_txn_stats: Arc, + pub table_info_reader: Option>, } impl std::fmt::Debug for Context { @@ -90,6 +92,7 @@ impl Context { db: Arc, mp_sender: MempoolClientSender, node_config: NodeConfig, + table_info_reader: Option>, ) -> Self { let (view_function_stats, simulate_txn_stats) = { let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some(); @@ -126,6 +129,7 @@ impl Context { })), view_function_stats, simulate_txn_stats, + table_info_reader, } } @@ -661,7 +665,7 @@ impl Context { let state_view = self.latest_state_view_poem(ledger_info)?; let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(self.db.clone()); + let converter = resolver.as_converter(self.db.clone(), self.table_info_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { @@ -692,7 +696,7 @@ impl Context { let state_view = self.latest_state_view_poem(ledger_info)?; let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(self.db.clone()); + let converter = resolver.as_converter(self.db.clone(), self.table_info_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { diff --git a/api/src/events.rs b/api/src/events.rs index f5232bbca725c..53a2f32db04ae 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -186,7 +186,10 @@ impl EventsApi { .context .latest_state_view_poem(&latest_ledger_info)? .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_versioned_events(&events) .context("Failed to convert events from storage into response") .map_err(|err| { diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 0fc82a98614fe..aacab86b4024b 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -10,6 +10,7 @@ use crate::{ }; use anyhow::Context as AnyhowContext; use aptos_config::config::{ApiConfig, NodeConfig}; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; @@ -34,11 +35,12 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, + table_info_reader: Option>, ) -> anyhow::Result { let max_runtime_workers = get_max_runtime_workers(&config.api); let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers)); - let context = Context::new(chain_id, db, mp_sender, config.clone()); + let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader); attach_poem_to_runtime(runtime.handle(), context.clone(), config, false) .context("Failed to attach poem to runtime")?; @@ -339,6 +341,7 @@ mod tests { ChainId::test(), context.db.clone(), context.mempool.ac_client.clone(), + None, ); assert!(ret.is_ok()); diff --git a/api/src/state.rs b/api/src/state.rs index 75621f679baa0..34c74b44d4a58 100644 --- a/api/src/state.rs +++ b/api/src/state.rs @@ -315,7 +315,10 @@ impl StateApi { AcceptType::Json => { let resource = state_view .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_resource(&resource_type, &bytes) .context("Failed to deserialize resource data retrieved from DB") .map_err(|err| { @@ -421,7 +424,10 @@ impl StateApi { .state_view(ledger_version.map(|inner| inner.0))?; let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(self.context.db.clone()); + let converter = resolver.as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ); // Convert key to lookup version for DB let vm_key = converter diff --git a/api/src/tests/converter_test.rs b/api/src/tests/converter_test.rs index b72703ca21f2f..05be4aafbb11e 100644 --- a/api/src/tests/converter_test.rs +++ b/api/src/tests/converter_test.rs @@ -21,7 +21,7 @@ async fn test_value_conversion() { let state_view = context.latest_state_view(); let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(context.db); + let converter = resolver.as_converter(context.db, None); assert_value_conversion(&converter, "u8", 1i32, VmMoveValue::U8(1)); assert_value_conversion(&converter, "u64", "1", VmMoveValue::U64(1)); diff --git a/api/src/transactions.rs b/api/src/transactions.rs index 2b095dd7016bc..5726064112feb 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -762,7 +762,10 @@ impl TransactionsApi { let timestamp = self.context.get_block_timestamp(ledger_info, txn.version)?; resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_onchain_transaction(timestamp, txn) .context("Failed to convert on chain transaction to Transaction") .map_err(|err| { @@ -774,7 +777,10 @@ impl TransactionsApi { })? }, TransactionData::Pending(txn) => resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_pending_transaction(*txn) .context("Failed to convert on pending transaction to Transaction") .map_err(|err| { @@ -946,7 +952,10 @@ impl TransactionsApi { .context .latest_state_view_poem(ledger_info)? .as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_signed_transaction_poem(data.0, self.context.chain_id()) .context("Failed to create SignedTransaction from SubmitTransactionRequest") .map_err(|err| { @@ -1025,7 +1034,7 @@ impl TransactionsApi { .map(|(index, txn)| { self.context .latest_state_view_poem(ledger_info)?.as_move_resolver() - .as_converter(self.context.db.clone()) + .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) .try_into_signed_transaction_poem(txn, self.context.chain_id()) .context(format!("Failed to create SignedTransaction from SubmitTransactionRequest at position {}", index)) .map_err(|err| { @@ -1117,7 +1126,7 @@ impl TransactionsApi { // We provide the pending transaction so that users have the hash associated let pending_txn = resolver - .as_converter(self.context.db.clone()) + .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) .try_into_pending_transaction_poem(txn) .context("Failed to build PendingTransaction from mempool response, even though it said the request was accepted") .map_err(|err| SubmitTransactionError::internal_with_code( @@ -1350,7 +1359,10 @@ impl TransactionsApi { let state_view = self.context.latest_state_view_poem(&ledger_info)?; let resolver = state_view.as_move_resolver(); let raw_txn: RawTransaction = resolver - .as_converter(self.context.db.clone()) + .as_converter( + self.context.db.clone(), + self.context.table_info_reader.clone(), + ) .try_into_raw_transaction_poem(request.transaction, self.context.chain_id()) .context("The given transaction is invalid") .map_err(|err| { diff --git a/api/src/view_function.rs b/api/src/view_function.rs index 26767cb5ab286..f87503d40eaf8 100644 --- a/api/src/view_function.rs +++ b/api/src/view_function.rs @@ -97,7 +97,7 @@ fn view_request( ViewFunctionRequest::Json(data) => { let resolver = state_view.as_move_resolver(); resolver - .as_converter(context.db.clone()) + .as_converter(context.db.clone(), context.table_info_reader.clone()) .convert_view_function(data.0) .map_err(|err| { BasicErrorWith404::bad_request_with_code( @@ -171,7 +171,7 @@ fn view_request( AcceptType::Json => { let resolver = state_view.as_move_resolver(); let return_types = resolver - .as_converter(context.db.clone()) + .as_converter(context.db.clone(), context.table_info_reader.clone()) .function_return_types(&view_function) .and_then(|tys| { tys.into_iter() @@ -191,7 +191,7 @@ fn view_request( .zip(return_types.into_iter()) .map(|(v, ty)| { resolver - .as_converter(context.db.clone()) + .as_converter(context.db.clone(), context.table_info_reader.clone()) .try_into_move_value(&ty, &v) }) .collect::>>() diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index f93400202f75b..800d8bacee042 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -130,7 +130,6 @@ pub fn new_test_context( false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .unwrap(), ) @@ -146,6 +145,7 @@ pub fn new_test_context( db.clone(), mempool.ac_client.clone(), node_config.clone(), + None, /* table info reader */ ); // Configure the testing depending on which API version we're testing. diff --git a/api/types/Cargo.toml b/api/types/Cargo.toml index 7154f0cee7f58..b36f5698cda21 100644 --- a/api/types/Cargo.toml +++ b/api/types/Cargo.toml @@ -16,6 +16,7 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-logger = { workspace = true } aptos-openapi = { workspace = true } @@ -35,3 +36,4 @@ poem-openapi = { workspace = true } poem-openapi-derive = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } + diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index d69c5f7a869de..b9f3a45f78712 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -18,6 +18,7 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_storage_interface::DbReader; use aptos_types::{ access_path::{AccessPath, Path}, @@ -25,7 +26,7 @@ use aptos_types::{ contract_event::{ContractEvent, EventWithVersion}, state_store::{ state_key::{StateKey, StateKeyInner}, - table::TableHandle, + table::{TableHandle, TableInfo}, }, transaction::{ EntryFunction, ExecutionStatus, ModuleBundle, Multisig, RawTransaction, Script, @@ -62,13 +63,19 @@ const OBJECT_STRUCT: &IdentStr = ident_str!("Object"); pub struct MoveConverter<'a, R: ?Sized> { inner: MoveValueAnnotator<'a, R>, db: Arc, + table_info_reader: Option>, } impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { - pub fn new(inner: &'a R, db: Arc) -> Self { + pub fn new( + inner: &'a R, + db: Arc, + table_info_reader: Option>, + ) -> Self { Self { inner: MoveValueAnnotator::new(inner), db, + table_info_reader, } } @@ -422,12 +429,9 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { key: &[u8], value: &[u8], ) -> Result> { - if !self.db.indexer_enabled() && !self.db.indexer_async_v2_enabled() { - return Ok(None); - } - let table_info = match self.db.get_table_info(handle) { - Ok(ti) => ti, - Err(_) => { + let table_info = match self.get_table_info(handle)? { + Some(ti) => ti, + None => { aptos_logger::warn!( "Table info not found for handle {:?}, can't decode table item. OK for simulation", handle @@ -435,6 +439,7 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { return Ok(None); // if table item not found return None anyway to avoid crash }, }; + let key = self.try_into_move_value(&table_info.key_type, key)?; let value = self.try_into_move_value(&table_info.value_type, value)?; @@ -451,12 +456,9 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { handle: TableHandle, key: &[u8], ) -> Result> { - if !self.db.indexer_enabled() && !self.db.indexer_async_v2_enabled() { - return Ok(None); - } - let table_info = match self.db.get_table_info(handle) { - Ok(ti) => ti, - Err(_) => { + let table_info = match self.get_table_info(handle)? { + Some(ti) => ti, + None => { aptos_logger::warn!( "Table info not found for handle {:?}, can't decode table item. OK for simulation", handle @@ -464,6 +466,7 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { return Ok(None); // if table item not found return None anyway to avoid crash }, }; + let key = self.try_into_move_value(&table_info.key_type, key)?; Ok(Some(DeletedTableData { @@ -924,6 +927,18 @@ impl<'a, R: ModuleResolver + ?Sized> MoveConverter<'a, R> { args, }) } + + fn get_table_info(&self, handle: TableHandle) -> Result> { + if let Some(table_info_reader) = self.table_info_reader.as_ref() { + // Attempt to get table_info from the table_info_reader if it exists + Ok(table_info_reader.get_table_info(handle)?) + } else if self.db.indexer_enabled() { + // Attempt to get table_info from the db if indexer is enabled + Ok(Some(self.db.get_table_info(handle)?)) + } else { + Ok(None) + } + } } impl<'a, R: ModuleResolver + ?Sized> ExplainVMStatus for MoveConverter<'a, R> { @@ -934,12 +949,20 @@ impl<'a, R: ModuleResolver + ?Sized> ExplainVMStatus for MoveConverter<'a, R> { } } pub trait AsConverter { - fn as_converter(&self, db: Arc) -> MoveConverter; + fn as_converter( + &self, + db: Arc, + table_info_reader: Option>, + ) -> MoveConverter; } impl AsConverter for R { - fn as_converter(&self, db: Arc) -> MoveConverter { - MoveConverter::new(self, db) + fn as_converter( + &self, + db: Arc, + table_info_reader: Option>, + ) -> MoveConverter { + MoveConverter::new(self, db, table_info_reader) } } diff --git a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs index effee015f0f4e..9a25a4cabf246 100644 --- a/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs +++ b/aptos-move/aptos-transactional-test-harness/src/aptos_test_harness.rs @@ -919,7 +919,7 @@ impl<'a> MoveTestAdapter<'a> for AptosTestAdapter<'a> { }, AptosSubCommand::ViewTableCommand(view_table_cmd) => { let resolver = self.storage.as_move_resolver(); - let converter = resolver.as_converter(Arc::new(FakeDbReader {})); + let converter = resolver.as_converter(Arc::new(FakeDbReader {}), None); let vm_key = converter .try_into_vm_value(&view_table_cmd.key_type, view_table_cmd.key_value) diff --git a/aptos-move/aptos-validator-interface/src/storage_interface.rs b/aptos-move/aptos-validator-interface/src/storage_interface.rs index ca914522f8328..f9ed2da2a90a4 100644 --- a/aptos-move/aptos-validator-interface/src/storage_interface.rs +++ b/aptos-move/aptos-validator-interface/src/storage_interface.rs @@ -32,7 +32,6 @@ impl DBDebuggerInterface { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .map_err(anyhow::Error::from)?, ))) diff --git a/aptos-node/Cargo.toml b/aptos-node/Cargo.toml index 9e87b7bc9cb7d..fa55dbeb1ff31 100644 --- a/aptos-node/Cargo.toml +++ b/aptos-node/Cargo.toml @@ -28,6 +28,7 @@ aptos-crypto = { workspace = true } aptos-data-client = { workspace = true } aptos-data-streaming-service = { workspace = true } aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-dkg-runtime = { workspace = true } aptos-event-notifications = { workspace = true } aptos-executor = { workspace = true } diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index 2562a6d848086..ad2c37a914c0f 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,6 +11,7 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info; @@ -52,20 +53,28 @@ pub fn bootstrap_api_and_indexer( let (mempool_client_sender, mempool_client_receiver) = mpsc::channel(AC_SMP_CHANNEL_BUFFER_SIZE); - let indexer_table_info = bootstrap_indexer_table_info( + let (indexer_table_info_runtime, indexer_async_v2) = match bootstrap_indexer_table_info( node_config, chain_id, db_rw.clone(), mempool_client_sender.clone(), - ); + ) { + Some((runtime, indexer_v2)) => (Some(runtime), Some(indexer_v2)), + None => (None, None), + }; // Create the API runtime + let table_info_reader: Option> = indexer_async_v2.map(|arc| { + let trait_object: Arc = arc; + trait_object + }); let api_runtime = if node_config.api.enabled { Some(bootstrap_api( node_config, chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), + table_info_reader.clone(), )?) } else { None @@ -77,6 +86,7 @@ pub fn bootstrap_api_and_indexer( chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), + table_info_reader, ); // Create the indexer runtime @@ -90,7 +100,7 @@ pub fn bootstrap_api_and_indexer( Ok(( mempool_client_receiver, api_runtime, - indexer_table_info, + indexer_table_info_runtime, indexer_runtime, indexer_grpc, )) diff --git a/crates/aptos-genesis/src/lib.rs b/crates/aptos-genesis/src/lib.rs index 1eeeea0f8e178..73633e2d2c73c 100644 --- a/crates/aptos-genesis/src/lib.rs +++ b/crates/aptos-genesis/src/lib.rs @@ -156,7 +156,6 @@ impl GenesisInfo { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?; let db_rw = DbReaderWriter::new(aptosdb); aptos_executor::db_bootstrapper::generate_waypoint::(&db_rw, genesis) diff --git a/crates/aptos-genesis/src/mainnet.rs b/crates/aptos-genesis/src/mainnet.rs index d3cbb9c15cce7..7e1aa674dd72d 100644 --- a/crates/aptos-genesis/src/mainnet.rs +++ b/crates/aptos-genesis/src/mainnet.rs @@ -141,7 +141,6 @@ impl MainnetGenesisInfo { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?; let db_rw = DbReaderWriter::new(aptosdb); aptos_executor::db_bootstrapper::generate_waypoint::(&db_rw, genesis) diff --git a/crates/indexer/src/indexer/fetcher.rs b/crates/indexer/src/indexer/fetcher.rs index 42f775256b236..c565ddf516eab 100644 --- a/crates/indexer/src/indexer/fetcher.rs +++ b/crates/indexer/src/indexer/fetcher.rs @@ -244,7 +244,7 @@ async fn fetch_nexts( let state_view = context.latest_state_view().unwrap(); let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(context.db.clone()); + let converter = resolver.as_converter(context.db.clone(), context.table_info_reader.clone()); let mut transactions = vec![]; for (ind, raw_txn) in raw_txns.into_iter().enumerate() { diff --git a/crates/indexer/src/runtime.rs b/crates/indexer/src/runtime.rs index 14067021107f7..c3bf5a0516c0d 100644 --- a/crates/indexer/src/runtime.rs +++ b/crates/indexer/src/runtime.rs @@ -90,7 +90,13 @@ pub fn bootstrap( let node_config = config.clone(); runtime.spawn(async move { - let context = Arc::new(Context::new(chain_id, db, mp_sender, node_config)); + let context = Arc::new(Context::new( + chain_id, + db, + mp_sender, + node_config, + None, /* table info reader */ + )); run_forever(indexer_config, context).await; }); diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml index e687cf779d6c1..03a78588be33e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/Cargo.toml @@ -35,6 +35,7 @@ aptos-api = { workspace = true } aptos-api-types = { workspace = true } aptos-bitvec = { workspace = true } aptos-config = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } aptos-logger = { workspace = true } aptos-mempool = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 79a92569572cb..3b687b5c8ed2b 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -7,6 +7,7 @@ use crate::{ }; use aptos_api::context::Context; use aptos_config::config::NodeConfig; +use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_protos::{ @@ -34,6 +35,7 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, + table_info_reader: Option>, ) -> Option { if !config.indexer_grpc.enabled { return None; @@ -50,7 +52,13 @@ pub fn bootstrap( let output_batch_size = node_config.indexer_grpc.output_batch_size; runtime.spawn(async move { - let context = Arc::new(Context::new(chain_id, db, mp_sender, node_config)); + let context = Arc::new(Context::new( + chain_id, + db, + mp_sender, + node_config, + table_info_reader, + )); let service_context = ServiceContext { context: context.clone(), processor_task_count, diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs index bc8f68da5119c..dc710b437b232 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs @@ -338,7 +338,8 @@ impl IndexerStreamCoordinator { let first_version = raw_txns.first().map(|txn| txn.version).unwrap(); let state_view = context.latest_state_view().unwrap(); let resolver = state_view.as_move_resolver(); - let converter = resolver.as_converter(context.db.clone()); + let converter = + resolver.as_converter(context.db.clone(), context.table_info_reader.clone()); // Enrich data with block metadata let (_, _, block_event) = context diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml index b7f4496ec9f64..d739b3b042c48 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml @@ -34,43 +34,18 @@ aptos-api = { workspace = true } aptos-api-types = { workspace = true } aptos-bitvec = { workspace = true } aptos-config = { workspace = true } +aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true } aptos-indexer-grpc-fullnode = { workspace = true } aptos-indexer-grpc-utils = { workspace = true } aptos-logger = { workspace = true } aptos-mempool = { workspace = true } aptos-metrics-core = { workspace = true } -aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } aptos-runtimes = { workspace = true } -aptos-sdk = { workspace = true } +aptos-schemadb = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } -aptos-vm = { workspace = true } - -move-binary-format = { workspace = true } -move-core-types = { workspace = true } -move-package = { workspace = true } - -[dev-dependencies] -goldenfile = { workspace = true } -rand = { workspace = true } -regex = { workspace = true } - -aptos-api-test-context = { workspace = true } -aptos-crypto = { workspace = true } -aptos-db = { workspace = true } -aptos-executor = { workspace = true } -aptos-executor-types = { workspace = true } -aptos-framework = { workspace = true } -aptos-genesis = { workspace = true } -aptos-global-constants = { workspace = true } -aptos-mempool = { workspace = true } -aptos-mempool-notifications = { workspace = true } -aptos-proptest-helpers = { workspace = true } -aptos-secure-storage = { workspace = true } -aptos-temppath = { workspace = true } -aptos-vm = { workspace = true } -aptos-vm-validator = { workspace = true } [features] failpoints = ["fail/failpoints"] diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index 969f9757f563d..cb5aa9531e880 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -4,48 +4,65 @@ use crate::table_info_service::TableInfoService; use aptos_api::context::Context; use aptos_config::config::NodeConfig; +use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::ChainId; use std::sync::Arc; use tokio::runtime::Runtime; +const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; + /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime pub fn bootstrap( config: &NodeConfig, chain_id: ChainId, - db: DbReaderWriter, + db_rw: DbReaderWriter, mp_sender: MempoolClientSender, -) -> Option { +) -> Option<(Runtime, Arc)> { if !config.indexer_table_info.enabled { return None; } let runtime = aptos_runtimes::spawn_named_runtime("table-info".to_string(), None); + // Set up db config and open up the db initially to read metadata let node_config = config.clone(); - let parser_task_count = node_config.indexer_table_info.parser_task_count; - let parser_batch_size = node_config.indexer_table_info.parser_batch_size; - let enable_expensive_logging = node_config.indexer_table_info.enable_expensive_logging; - let next_version = db.reader.get_indexer_async_v2_next_version().unwrap(); + let db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join(INDEX_ASYNC_V2_DB_NAME); + let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config; + let db = + open_db(db_path, &rocksdb_config).expect("Failed to open up indexer async v2 db initially"); + + let indexer_async_v2 = + Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2")); + let indexer_async_v2_clone = Arc::clone(&indexer_async_v2); // Spawn the runtime for table info parsing runtime.spawn(async move { let context = Arc::new(Context::new( chain_id, - db.reader.clone(), + db_rw.reader.clone(), mp_sender, - node_config, + node_config.clone(), + None, )); + let mut parser = TableInfoService::new( context, - next_version, - parser_task_count, - parser_batch_size, - enable_expensive_logging, + indexer_async_v2_clone.next_version(), + node_config.indexer_table_info.parser_task_count, + node_config.indexer_table_info.parser_batch_size, + node_config.indexer_table_info.enable_expensive_logging, + indexer_async_v2_clone, ); - parser.run(db.clone()).await + + parser.run().await; }); - Some(runtime) + + Some((runtime, indexer_async_v2)) } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs index 4d68ba98a9342..af965fac7b993 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs @@ -4,12 +4,12 @@ use anyhow::Error; use aptos_api::context::Context; use aptos_api_types::TransactionOnChainData; +use aptos_db_indexer::db_v2::IndexerAsyncV2; use aptos_indexer_grpc_fullnode::stream_coordinator::{ IndexerStreamCoordinator, TransactionBatchInfo, }; use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; use aptos_logger::{debug, error, info, sample, sample::SampleRate}; -use aptos_storage_interface::{DbReaderWriter, DbWriter}; use aptos_types::write_set::WriteSet; use std::{sync::Arc, time::Duration}; use tonic::Status; @@ -24,6 +24,7 @@ pub struct TableInfoService { pub parser_batch_size: u16, pub context: Arc, pub enable_expensive_logging: bool, + pub indexer_async_v2: Arc, } impl TableInfoService { @@ -33,6 +34,7 @@ impl TableInfoService { parser_task_count: u16, parser_batch_size: u16, enable_expensive_logging: bool, + indexer_async_v2: Arc, ) -> Self { Self { current_version: request_start_version, @@ -40,6 +42,7 @@ impl TableInfoService { parser_batch_size, context, enable_expensive_logging, + indexer_async_v2, } } @@ -49,13 +52,13 @@ impl TableInfoService { /// 4. write parsed table info to rocksdb /// 5. after all batches from the loop complete, if pending on items not empty, move on to 6, otherwise, start from 1 again /// 6. retry all the txns in the loop sequentially to clean up the pending on items - pub async fn run(&mut self, db: DbReaderWriter) { + pub async fn run(&mut self) { loop { let start_time = std::time::Instant::now(); let ledger_version = self.get_highest_known_version().await.unwrap_or_default(); let batches = self.get_batches(ledger_version).await; let results = self - .process_multiple_batches(db.clone(), batches, ledger_version) + .process_multiple_batches(self.indexer_async_v2.clone(), batches, ledger_version) .await; let max_version = self.get_max_batch_version(results).unwrap_or_default(); let versions_processed = max_version - self.current_version + 1; @@ -85,18 +88,17 @@ impl TableInfoService { /// 2. Get write sets from transactions and parse write sets to get handle -> key,value type mapping, write the mapping to the rocksdb async fn process_multiple_batches( &self, - db: DbReaderWriter, + indexer_async_v2: Arc, batches: Vec, ledger_version: u64, ) -> Vec> { let mut tasks = vec![]; - let db_writer = db.writer.clone(); let context = self.context.clone(); for batch in batches.iter().cloned() { let task = tokio::spawn(Self::process_single_batch( context.clone(), - db_writer.clone(), + indexer_async_v2.clone(), ledger_version, batch, false, /* end_early_if_pending_on_empty */ @@ -115,8 +117,7 @@ impl TableInfoService { last_batch.start_version + last_batch.num_transactions_to_fetch as u64; // Clean up pending on items across threads - db.writer - .clone() + self.indexer_async_v2 .cleanup_pending_on_items() .expect("[Table Info] Failed to clean up the pending on items"); @@ -126,12 +127,7 @@ impl TableInfoService { // // Risk of this sequential approach is that it could be slow when the txns to process contain extremely // nested table items, but the risk is bounded by the the configuration of the number of txns to process and number of threads - if !db - .reader - .clone() - .is_indexer_async_v2_pending_on_empty() - .unwrap_or(false) - { + if !self.indexer_async_v2.is_indexer_async_v2_pending_on_empty() { let retry_batch = TransactionBatchInfo { start_version: self.current_version, num_transactions_to_fetch: total_txns_to_process as u16, @@ -140,7 +136,7 @@ impl TableInfoService { Self::process_single_batch( context.clone(), - db_writer, + indexer_async_v2.clone(), ledger_version, retry_batch, true, /* end_early_if_pending_on_empty */ @@ -151,16 +147,12 @@ impl TableInfoService { } assert!( - db.reader - .clone() - .is_indexer_async_v2_pending_on_empty() - .unwrap_or(false), + self.indexer_async_v2.is_indexer_async_v2_pending_on_empty(), "Missing data in table info parsing after sequential retry" ); // Update rocksdb's to be processed next version after verifying all txns are successfully parsed - db.writer - .clone() + self.indexer_async_v2 .update_next_version(end_version + 1) .unwrap(); @@ -179,7 +171,7 @@ impl TableInfoService { /// if pending on items are not empty async fn process_single_batch( context: Arc, - db_writer: Arc, + indexer_async_v2: Arc, ledger_version: u64, batch: TransactionBatchInfo, end_early_if_pending_on_empty: bool, @@ -197,7 +189,7 @@ impl TableInfoService { Self::parse_table_info( context.clone(), raw_txns.clone(), - db_writer.clone(), + indexer_async_v2, end_early_if_pending_on_empty, ) .expect("[Table Info] Failed to parse table info"); @@ -270,7 +262,7 @@ impl TableInfoService { fn parse_table_info( context: Arc, raw_txns: Vec, - db_writer: Arc, + indexer_async_v2: Arc, end_early_if_pending_on_empty: bool, ) -> Result<(), Error> { if raw_txns.is_empty() { @@ -281,7 +273,7 @@ impl TableInfoService { let first_version = raw_txns.first().map(|txn| txn.version).unwrap(); let write_sets: Vec = raw_txns.iter().map(|txn| txn.changes.clone()).collect(); let write_sets_slice: Vec<&WriteSet> = write_sets.iter().collect(); - db_writer + indexer_async_v2 .index_table_info( context.db.clone(), first_version, diff --git a/execution/executor-benchmark/src/db_generator.rs b/execution/executor-benchmark/src/db_generator.rs index 2930e0fe875d9..a9fe87b29fae8 100644 --- a/execution/executor-benchmark/src/db_generator.rs +++ b/execution/executor-benchmark/src/db_generator.rs @@ -74,7 +74,6 @@ fn bootstrap_with_genesis(db_dir: impl AsRef, enable_storage_sharding: boo false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .expect("DB should open."), ); diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index b237ff23e0583..faeefcb241561 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -66,7 +66,6 @@ where false, config.storage.buffered_state_target_items, config.storage.max_num_nodes_per_lru_cache_shard, - false, ) .expect("DB should open."), ); diff --git a/state-sync/state-sync-driver/src/tests/driver_factory.rs b/state-sync/state-sync-driver/src/tests/driver_factory.rs index 7a7df57591ce6..13d88dd66ba9c 100644 --- a/state-sync/state-sync-driver/src/tests/driver_factory.rs +++ b/state-sync/state-sync-driver/src/tests/driver_factory.rs @@ -40,7 +40,6 @@ fn test_new_initialized_configs() { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .unwrap(); let (_, db_rw) = DbReaderWriter::wrap(db); diff --git a/storage/aptosdb/src/db/aptosdb_test.rs b/storage/aptosdb/src/db/aptosdb_test.rs index 0ad8e5b52b95b..f57be62579a64 100644 --- a/storage/aptosdb/src/db/aptosdb_test.rs +++ b/storage/aptosdb/src/db/aptosdb_test.rs @@ -205,7 +205,6 @@ pub fn test_state_merkle_pruning_impl( false, /* enable_indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* enable_indexer_async_v2 */ ) .unwrap(); diff --git a/storage/aptosdb/src/db/include/aptosdb_internal.rs b/storage/aptosdb/src/db/include/aptosdb_internal.rs index b44cb4a6c7de2..5337cdf99223c 100644 --- a/storage/aptosdb/src/db/include/aptosdb_internal.rs +++ b/storage/aptosdb/src/db/include/aptosdb_internal.rs @@ -56,7 +56,6 @@ impl AptosDB { ledger_commit_lock: std::sync::Mutex::new(()), indexer: None, skip_index_and_usage, - indexer_async_v2: None, } } @@ -69,7 +68,6 @@ impl AptosDB { buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, empty_buffered_state_for_restore: bool, - enable_indexer_async_v2: bool, ) -> Result { ensure!( pruner_config.eq(&NO_OP_STORAGE_PRUNER_CONFIG) || !readonly, @@ -101,13 +99,6 @@ impl AptosDB { )?; } - if enable_indexer_async_v2 { - myself.open_indexer_async_v2( - db_paths.default_root_path(), - rocksdb_configs.index_db_config, - )?; - } - Ok(myself) } @@ -153,16 +144,6 @@ impl AptosDB { Ok(()) } - fn open_indexer_async_v2( - &mut self, - db_root_path: impl AsRef, - rocksdb_config: RocksdbConfig, - ) -> Result<()> { - let indexer_async_v2 = IndexerAsyncV2::open(db_root_path, rocksdb_config, DashMap::new())?; - self.indexer_async_v2 = Some(indexer_async_v2); - Ok(()) - } - #[cfg(any(test, feature = "fuzzing"))] fn new_without_pruner + Clone>( db_root_path: P, @@ -170,7 +151,6 @@ impl AptosDB { buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, enable_indexer: bool, - enable_indexer_async_v2: bool, ) -> Self { Self::open( StorageDirPaths::from_path(db_root_path), @@ -180,7 +160,6 @@ impl AptosDB { enable_indexer, buffered_state_target_items, max_num_nodes_per_lru_cache_shard, - enable_indexer_async_v2, ) .expect("Unable to open AptosDB") } diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 78b4174604f08..95ce8b0a4f113 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -817,11 +817,6 @@ impl DbReader for AptosDB { self.indexer.is_some() } - /// Returns whether the indexer async v2 DB has been enabled or not - fn indexer_async_v2_enabled(&self) -> bool { - self.indexer_async_v2.is_some() - } - fn get_state_storage_usage(&self, version: Option) -> Result { gauged_api("get_state_storage_usage", || { if let Some(v) = version { @@ -830,28 +825,6 @@ impl DbReader for AptosDB { self.state_store.get_usage(version) }) } - - /// Returns the next version for indexer async v2 to be processed - /// It is mainly used by table info service to decide the start version - fn get_indexer_async_v2_next_version(&self) -> Result { - gauged_api("get_indexer_async_v2_next_version", || { - Ok(self - .indexer_async_v2 - .as_ref() - .map(|indexer| indexer.next_version()) - .unwrap_or(0)) - }) - } - - fn is_indexer_async_v2_pending_on_empty(&self) -> Result { - gauged_api("is_indexer_async_v2_pending_on_empty", || { - Ok(self - .indexer_async_v2 - .as_ref() - .map(|indexer| indexer.is_indexer_async_v2_pending_on_empty()) - .unwrap_or(false)) - }) - } } impl AptosDB { @@ -1026,26 +999,8 @@ impl AptosDB { Ok(events_with_version) } - fn get_table_info_option(&self, handle: TableHandle) -> Result> { - if self.indexer_async_v2_enabled() { - return self.get_table_info_from_indexer_async_v2(handle); - } - - self.get_table_info_from_indexer(handle) - } - - fn get_table_info_from_indexer_async_v2( - &self, - handle: TableHandle, - ) -> Result> { - match &self.indexer_async_v2 { - Some(indexer_async_v2) => indexer_async_v2.get_table_info_with_retry(handle), - None => bail!("Indexer Async V2 not enabled."), - } - } - /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready - fn get_table_info_from_indexer(&self, handle: TableHandle) -> Result> { + fn get_table_info_option(&self, handle: TableHandle) -> Result> { match &self.indexer { Some(indexer) => indexer.get_table_info(handle), None => bail!("Indexer not enabled."), diff --git a/storage/aptosdb/src/db/include/aptosdb_testonly.rs b/storage/aptosdb/src/db/include/aptosdb_testonly.rs index 4d0c402132b03..3dea085136ca7 100644 --- a/storage/aptosdb/src/db/include/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/include/aptosdb_testonly.rs @@ -17,7 +17,6 @@ impl AptosDB { BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, false, /* indexer */ - false, /* indexer async v2 */ ) } @@ -38,14 +37,13 @@ impl AptosDB { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, max_node_cache, - false, /* indexer async v2 */ ) .expect("Unable to open AptosDB") } /// This opens db in non-readonly mode, without the pruner and cache. pub fn new_for_test_no_cache + Clone>(db_root_path: P) -> Self { - Self::new_without_pruner(db_root_path, false, BUFFERED_STATE_TARGET_ITEMS, 0, false, false) + Self::new_without_pruner(db_root_path, false, BUFFERED_STATE_TARGET_ITEMS, 0, false) } /// This opens db in non-readonly mode, without the pruner, and with the indexer @@ -56,7 +54,6 @@ impl AptosDB { BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, true, /* indexer */ - true, /* indexer async v2 */ ) } @@ -71,7 +68,6 @@ impl AptosDB { buffered_state_target_items, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, false, /* indexer */ - false, /* indexer async v2 */ ) } @@ -83,7 +79,6 @@ impl AptosDB { BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, false, /* indexer */ - false, /* indexer async v2 */ ) } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 6c6f8ec42af57..3f2e273d4fda1 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -198,47 +198,6 @@ impl DbWriter for AptosDB { Ok(()) }) } - - /// Open up dbwriter for table info indexing on indexer async v2 rocksdb - fn index_table_info( - &self, - db_reader: Arc, - first_version: Version, - write_sets: &[&WriteSet], - end_early_if_pending_on_empty: bool, - ) -> Result<()> { - gauged_api("index_table_info", || { - self.indexer_async_v2 - .as_ref() - .map(|indexer| { - indexer.index_table_info( - db_reader, - first_version, - write_sets, - end_early_if_pending_on_empty, - ) - }) - .unwrap_or(Ok(())) - }) - } - - fn cleanup_pending_on_items(&self) -> Result<()> { - gauged_api("cleanup_pending_on_items", || { - self.indexer_async_v2 - .as_ref() - .map(|indexer| indexer.cleanup_pending_on_items()) - .unwrap_or(Ok(())) - }) - } - - fn update_next_version(&self, end_version: u64) -> Result<()> { - gauged_api("update_next_version", || { - self.indexer_async_v2 - .as_ref() - .map(|indexer| indexer.update_next_version(end_version)) - .unwrap_or(Ok(())) - }) - } } impl AptosDB { diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index e209899cf27ec..3008f9025b88d 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -30,7 +30,7 @@ use aptos_config::config::{ PrunerConfig, RocksdbConfig, RocksdbConfigs, StorageDirPaths, NO_OP_STORAGE_PRUNER_CONFIG, }; use aptos_crypto::HashValue; -use aptos_db_indexer::{db_v2::IndexerAsyncV2, Indexer}; +use aptos_db_indexer::Indexer; use aptos_experimental_runtimes::thread_manager::{optimal_min_len, THREAD_MANAGER}; use aptos_logger::prelude::*; use aptos_metrics_core::TimerHelper; @@ -72,7 +72,6 @@ use aptos_types::{ write_set::WriteSet, }; use aptos_vm::data_cache::AsMoveResolver; -use dashmap::DashMap; use move_resource_viewer::MoveValueAnnotator; use rayon::prelude::*; use std::{ @@ -101,7 +100,6 @@ pub struct AptosDB { ledger_commit_lock: std::sync::Mutex<()>, indexer: Option, skip_index_and_usage: bool, - indexer_async_v2: Option, } // DbReader implementations and private functions used by them. @@ -123,7 +121,6 @@ impl AptosDB { enable_indexer: bool, buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, - enable_indexer_async_v2: bool, ) -> Result { Self::open_internal( &db_paths, @@ -134,7 +131,6 @@ impl AptosDB { buffered_state_target_items, max_num_nodes_per_lru_cache_shard, false, - enable_indexer_async_v2, ) } @@ -146,7 +142,6 @@ impl AptosDB { enable_indexer: bool, buffered_state_target_items: usize, max_num_nodes_per_lru_cache_shard: usize, - enable_indexer_async_v2: bool, ) -> Result { Self::open_internal( &db_paths, @@ -157,7 +152,6 @@ impl AptosDB { buffered_state_target_items, max_num_nodes_per_lru_cache_shard, true, - enable_indexer_async_v2, ) } diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index d191810c6b24d..d29bf307ef25e 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -49,7 +49,6 @@ impl FastSyncStorageWrapper { config.storage.enable_indexer, config.storage.buffered_state_target_items, config.storage.max_num_nodes_per_lru_cache_shard, - config.indexer_table_info.enabled, ) .map_err(|err| anyhow!("fast sync DB failed to open {}", err))?; @@ -76,7 +75,6 @@ impl FastSyncStorageWrapper { config.storage.enable_indexer, config.storage.buffered_state_target_items, config.storage.max_num_nodes_per_lru_cache_shard, - config.indexer_table_info.enabled, ) .map_err(|err| anyhow!("Secondary DB failed to open {}", err))?; diff --git a/storage/backup/backup-cli/src/utils/mod.rs b/storage/backup/backup-cli/src/utils/mod.rs index a3b9bf33ae150..f737d3d868b36 100644 --- a/storage/backup/backup-cli/src/utils/mod.rs +++ b/storage/backup/backup-cli/src/utils/mod.rs @@ -291,7 +291,6 @@ impl TryFrom for GlobalRestoreOptions { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?) .get_restore_handler(); diff --git a/storage/db-tool/src/bootstrap.rs b/storage/db-tool/src/bootstrap.rs index 74ce3cb605b3d..46aed10777957 100644 --- a/storage/db-tool/src/bootstrap.rs +++ b/storage/db-tool/src/bootstrap.rs @@ -57,7 +57,6 @@ impl Command { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ ) .expect("Failed to open DB."); let db = DbReaderWriter::new(db); diff --git a/storage/db-tool/src/replay_verify.rs b/storage/db-tool/src/replay_verify.rs index deafc78a6445e..21c1bd817e400 100644 --- a/storage/db-tool/src/replay_verify.rs +++ b/storage/db-tool/src/replay_verify.rs @@ -69,7 +69,6 @@ impl Opt { false, /* indexer */ BUFFERED_STATE_TARGET_ITEMS, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, - false, /* indexer async v2 */ )?) .get_restore_handler(); let ret = ReplayVerifyCoordinator::new( diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs new file mode 100644 index 0000000000000..2f109bed876f0 --- /dev/null +++ b/storage/indexer/src/db_ops.rs @@ -0,0 +1,22 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::column_families; +use anyhow::Result; +use aptos_config::config::RocksdbConfig; +use aptos_rocksdb_options::gen_rocksdb_options; +use aptos_schemadb::DB; +use std::{mem, path::Path}; + +pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Result { + Ok(DB::open( + db_path, + "index_asnync_v2_db", + column_families(), + &gen_rocksdb_options(rocksdb_config, false), + )?) +} + +pub fn close_db(db: DB) { + mem::drop(db) +} diff --git a/storage/indexer/src/db_v2.rs b/storage/indexer/src/db_v2.rs index 7c60a758ff671..a21fcb7b435fe 100644 --- a/storage/indexer/src/db_v2.rs +++ b/storage/indexer/src/db_v2.rs @@ -7,13 +7,9 @@ /// and this file will be moved to /ecosystem/indexer-grpc/indexer-grpc-table-info. use crate::{ metadata::{MetadataKey, MetadataValue}, - schema::{ - column_families, indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema, - }, + schema::{indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema}, }; -use aptos_config::config::RocksdbConfig; use aptos_logger::info; -use aptos_rocksdb_options::gen_rocksdb_options; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{ db_other_bail as bail, state_view::DbStateView, AptosDbError, DbReader, Result, @@ -39,6 +35,8 @@ use move_core_types::{ use move_resource_viewer::{AnnotatedMoveValue, MoveValueAnnotator}; use std::{ collections::{BTreeMap, HashMap}, + fs, + path::PathBuf, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -46,12 +44,11 @@ use std::{ time::Duration, }; -pub const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; const TABLE_INFO_RETRY_TIME_MILLIS: u64 = 10; #[derive(Debug)] pub struct IndexerAsyncV2 { - db: DB, + pub db: DB, // Next version to be processed next_version: AtomicU64, // It is used in the context of processing write ops and extracting table information. @@ -65,21 +62,7 @@ pub struct IndexerAsyncV2 { } impl IndexerAsyncV2 { - /// Opens up this rocksdb to get ready for read and write when bootstraping the aptosdb - pub fn open( - db_root_path: impl AsRef, - rocksdb_config: RocksdbConfig, - pending_on: DashMap>, - ) -> Result { - let db_path = db_root_path.as_ref().join(INDEX_ASYNC_V2_DB_NAME); - - let db = DB::open( - db_path, - "index_asnync_v2_db", - column_families(), - &gen_rocksdb_options(&rocksdb_config, false), - )?; - + pub fn new(db: DB) -> Result { let next_version = db .get::(&MetadataKey::LatestVersion)? .map_or(0, |v| v.expect_version()); @@ -87,7 +70,7 @@ impl IndexerAsyncV2 { Ok(Self { db, next_version: AtomicU64::new(next_version), - pending_on, + pending_on: DashMap::new(), }) } @@ -151,12 +134,10 @@ impl IndexerAsyncV2 { } pub fn update_next_version(&self, end_version: u64) -> Result<()> { - let batch = SchemaBatch::new(); - batch.put::( + self.db.put::( &MetadataKey::LatestVersion, &MetadataValue::Version(end_version - 1), )?; - self.db.write_schemas(batch)?; self.next_version.store(end_version, Ordering::Relaxed); Ok(()) } @@ -201,7 +182,10 @@ impl IndexerAsyncV2 { } pub fn next_version(&self) -> Version { - self.next_version.load(Ordering::Relaxed) + self.db + .get::(&MetadataKey::LatestVersion) + .unwrap() + .map_or(0, |v| v.expect_version()) } pub fn get_table_info(&self, handle: TableHandle) -> Result> { @@ -227,6 +211,11 @@ impl IndexerAsyncV2 { pub fn is_indexer_async_v2_pending_on_empty(&self) -> bool { self.pending_on.is_empty() } + + pub fn create_checkpoint(&self, path: &PathBuf) -> Result<()> { + fs::remove_dir_all(path).unwrap_or(()); + self.db.create_checkpoint(path) + } } struct TableInfoParser<'a, R> { diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 5eebb36331880..b152baa1bb13d 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -3,9 +3,11 @@ /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready mod db; +pub mod db_ops; pub mod db_v2; mod metadata; mod schema; +pub mod table_info_reader; use crate::{ db::INDEX_DB_NAME, diff --git a/storage/indexer/src/table_info_reader.rs b/storage/indexer/src/table_info_reader.rs new file mode 100644 index 0000000000000..f0ddcff4aa1ff --- /dev/null +++ b/storage/indexer/src/table_info_reader.rs @@ -0,0 +1,19 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::db_v2::IndexerAsyncV2; +use aptos_storage_interface::Result; +use aptos_types::state_store::table::{TableHandle, TableInfo}; + +/// Table info reader is to create a thin interface for other services to read the db data, +/// this standalone db is officially not part of the AptosDB anymore. +/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. +pub trait TableInfoReader: Send + Sync { + fn get_table_info(&self, handle: TableHandle) -> Result>; +} + +impl TableInfoReader for IndexerAsyncV2 { + fn get_table_info(&self, handle: TableHandle) -> Result> { + self.get_table_info_with_retry(handle) + } +} diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index c6f4f567f2085..cebc7e7c276be 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -450,16 +450,6 @@ pub trait DbReader: Send + Sync { /// Returns whether the internal indexer DB has been enabled or not fn indexer_enabled(&self) -> bool; - /// Returns whether the internal indexer async v2 DB has been enabled or not - fn indexer_async_v2_enabled(&self) -> bool; - - /// Returns the next version which internal indexer async v2 DB should parse - fn get_indexer_async_v2_next_version(&self) -> Result; - - /// Returns boolean whether indexer async v2 pending on items are empty - /// if so, the whole batches are processed completely, if not, need to retry - fn is_indexer_async_v2_pending_on_empty(&self) -> Result; - /// Returns state storage usage at the end of an epoch. fn get_state_storage_usage(&self, version: Option) -> Result; ); // end delegated @@ -577,28 +567,6 @@ pub trait DbWriter: Send + Sync { ) -> Result<()> { unimplemented!() } - - /// Index table info mapping for the indexer async v2 rocksdb. - /// Called by the table info service when its constantly parsing the table info. - fn index_table_info( - &self, - db_reader: Arc, - first_version: Version, - write_sets: &[&WriteSet], - end_early_if_pending_on_empty: bool, - ) -> Result<()> { - unimplemented!() - } - - /// Clean up pending on items in the indexer async v2 rocksdb. - /// Called by the table info service when all threads finish processing. - fn cleanup_pending_on_items(&self) -> Result<()> { - unimplemented!() - } - - fn update_next_version(&self, end_version: u64) -> Result<()> { - unimplemented!() - } } #[derive(Clone)]