diff --git a/Cargo.lock b/Cargo.lock index 2da2fff188cfa..86364415ba2c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1184,6 +1184,7 @@ dependencies = [ "serde", "static_assertions", "status-line", + "tokio", ] [[package]] diff --git a/api/src/context.rs b/api/src/context.rs index 4cea588b9906a..3ba77fe8923eb 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -296,37 +296,49 @@ impl Context { &self, ) -> Result { if let Some(indexer_reader) = self.indexer_reader.as_ref() { - if let Some(latest_version) = indexer_reader - .get_latest_internal_indexer_ledger_version() - .map_err(|err| { - E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError) - })? - { - let (_, _, new_block_event) = self - .db - .get_block_info_by_version(latest_version) - .map_err(|_| { - E::service_unavailable_with_code_no_info( - "Failed to get block", - AptosErrorCode::InternalError, - ) - })?; - let (oldest_version, oldest_block_height) = - self.get_oldest_version_and_block_height()?; - return Ok(LedgerInfo::new_ledger_info( - &self.chain_id(), - new_block_event.epoch(), - latest_version, - oldest_version, - oldest_block_height, - new_block_event.height(), - new_block_event.proposed_time(), - )); + if indexer_reader.is_internal_indexer_enabled() { + if let Some(mut latest_version) = indexer_reader + .get_latest_internal_indexer_ledger_version() + .map_err(|err| { + E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError) + })? + { + // The internal indexer version can be ahead of the storage committed version since it syncs to db's latest synced version + let last_storage_version = + self.get_latest_storage_ledger_info()?.ledger_version.0; + latest_version = std::cmp::min(latest_version, last_storage_version); + let (_, block_end_version, new_block_event) = self + .db + .get_block_info_by_version(latest_version) + .map_err(|_| { + E::service_unavailable_with_code_no_info( + "Failed to get block", + AptosErrorCode::InternalError, + ) + })?; + let (oldest_version, oldest_block_height) = + self.get_oldest_version_and_block_height()?; + return Ok(LedgerInfo::new_ledger_info( + &self.chain_id(), + new_block_event.epoch(), + block_end_version, + oldest_version, + oldest_block_height, + new_block_event.height(), + new_block_event.proposed_time(), + )); + } else { + // Indexer doesn't have data yet as DB is boostrapping. + return Err(E::service_unavailable_with_code_no_info( + "DB is bootstrapping", + AptosErrorCode::InternalError, + )); + } } } Err(E::service_unavailable_with_code_no_info( - "Indexer reader doesn't exist, or doesn't have data.", + "Indexer reader doesn't exist", AptosErrorCode::InternalError, )) } diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index b9f938de52b12..45452d7311d15 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -43,7 +43,7 @@ use aptos_types::{ ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, transaction::{ signature_verified_transaction::into_signature_verified_block, Transaction, - TransactionPayload, TransactionStatus, + TransactionPayload, TransactionStatus, Version, }, }; use aptos_vm::AptosVM; @@ -53,6 +53,7 @@ use hyper::{HeaderMap, Response}; use rand::SeedableRng; use serde_json::{json, Value}; use std::{boxed::Box, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use tokio::sync::watch::channel; use warp::{http::header::CONTENT_TYPE, Filter, Rejection, Reply}; use warp_reverse_proxy::reverse_proxy_filter; @@ -118,12 +119,14 @@ pub fn new_test_context( let (root_key, genesis, genesis_waypoint, validators) = builder.build(&mut rng).unwrap(); let (validator_identity, _, _, _) = validators[0].get_key_objects(None).unwrap(); let validator_owner = validator_identity.account_address.unwrap(); - + let (sender, recver) = channel::(0); let (db, db_rw) = if use_db_with_indexer { - DbReaderWriter::wrap(AptosDB::new_for_test_with_indexer( + let mut aptos_db = AptosDB::new_for_test_with_indexer( &tmp_dir, node_config.storage.rocksdb_configs.enable_storage_sharding, - )) + ); + aptos_db.add_version_update_subscriber(sender).unwrap(); + DbReaderWriter::wrap(aptos_db) } else { DbReaderWriter::wrap( AptosDB::open( @@ -155,7 +158,7 @@ pub fn new_test_context( .storage .set_data_dir(tmp_dir.path().to_path_buf()); let mock_indexer_service = - MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config); + MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config, recver); let context = Context::new( ChainId::test(), diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 4634c22c3efb7..93acbfd7e7e44 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -605,7 +605,7 @@ pub fn setup_environment_and_start_node( let mut admin_service = services::start_admin_service(&node_config); // Set up the storage database and any RocksDB checkpoints - let (db_rw, backup_service, genesis_waypoint, indexer_db_opt) = + let (db_rw, backup_service, genesis_waypoint, indexer_db_opt, update_receiver) = storage::initialize_database_and_checkpoints(&mut node_config)?; admin_service.set_aptos_db(db_rw.clone().into()); @@ -687,7 +687,13 @@ pub fn setup_environment_and_start_node( indexer_runtime, indexer_grpc_runtime, internal_indexer_db_runtime, - ) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id, indexer_db_opt)?; + ) = services::bootstrap_api_and_indexer( + &node_config, + db_rw.clone(), + chain_id, + indexer_db_opt, + update_receiver, + )?; // Create mempool and get the consensus to mempool sender let (mempool_runtime, consensus_to_mempool_sender) = diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index a6b94bde33bc8..2a686806ae360 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -34,7 +34,10 @@ use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender}; use std::{sync::Arc, time::Instant}; -use tokio::runtime::{Handle, Runtime}; +use tokio::{ + runtime::{Handle, Runtime}, + sync::watch::Receiver as WatchReceiver, +}; const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024; const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1; @@ -46,6 +49,7 @@ pub fn bootstrap_api_and_indexer( db_rw: DbReaderWriter, chain_id: ChainId, internal_indexer_db: Option, + update_receiver: Option>, ) -> anyhow::Result<( Receiver, Option, @@ -68,11 +72,15 @@ pub fn bootstrap_api_and_indexer( None => (None, None), }; - let (db_indexer_runtime, txn_event_reader) = - match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) { - Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)), - None => (None, None), - }; + let (db_indexer_runtime, txn_event_reader) = match bootstrap_internal_indexer_db( + node_config, + db_rw.clone(), + internal_indexer_db, + update_receiver, + ) { + Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)), + None => (None, None), + }; let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader); diff --git a/aptos-node/src/storage.rs b/aptos-node/src/storage.rs index 0089a7961b2ea..8ee67228fe71d 100644 --- a/aptos-node/src/storage.rs +++ b/aptos-node/src/storage.rs @@ -10,11 +10,16 @@ use aptos_executor::db_bootstrapper::maybe_bootstrap; use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService; use aptos_logger::{debug, info}; use aptos_storage_interface::{DbReader, DbReaderWriter}; -use aptos_types::{ledger_info::LedgerInfoWithSignatures, waypoint::Waypoint}; +use aptos_types::{ + ledger_info::LedgerInfoWithSignatures, transaction::Version, waypoint::Waypoint, +}; use aptos_vm::AptosVM; use either::Either; use std::{fs, path::Path, sync::Arc, time::Instant}; -use tokio::runtime::Runtime; +use tokio::{ + runtime::Runtime, + sync::watch::{channel, Receiver as WatchReceiver}, +}; pub(crate) fn maybe_apply_genesis( db_rw: &DbReaderWriter, @@ -45,46 +50,60 @@ pub(crate) fn bootstrap_db( DbReaderWriter, Option, Option, + Option>, )> { let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config); - let (aptos_db_reader, db_rw, backup_service) = - match FastSyncStorageWrapper::initialize_dbs(node_config, internal_indexer_db.clone())? { - Either::Left(db) => { - let (db_arc, db_rw) = DbReaderWriter::wrap(db); - let db_backup_service = start_backup_service( - node_config.storage.backup_service_address, - db_arc.clone(), - ); - maybe_apply_genesis(&db_rw, node_config)?; - (db_arc as Arc, db_rw, Some(db_backup_service)) - }, - Either::Right(fast_sync_db_wrapper) => { - let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis(); - maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?; - let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper); - let fast_sync_db = db_arc.get_fast_sync_db(); - // FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis - let ledger_info = db_arc - .get_temporary_db_with_genesis() - .get_epoch_ending_ledger_info(0) - .expect("Genesis ledger info must exist"); - - if fast_sync_db - .get_latest_ledger_info_option() - .expect("should returns Ok results") - .is_none() - { - // it means the DB is empty and we need to - // commit the genesis ledger info to the DB. - fast_sync_db.commit_genesis_ledger_info(&ledger_info)?; - } - - let db_backup_service = - start_backup_service(node_config.storage.backup_service_address, fast_sync_db); - (db_arc as Arc, db_rw, Some(db_backup_service)) - }, - }; - Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db)) + let (update_sender, update_receiver) = if internal_indexer_db.is_some() { + let (sender, receiver) = channel::(0); + (Some(sender), Some(receiver)) + } else { + (None, None) + }; + + let (aptos_db_reader, db_rw, backup_service) = match FastSyncStorageWrapper::initialize_dbs( + node_config, + internal_indexer_db.clone(), + update_sender, + )? { + Either::Left(db) => { + let (db_arc, db_rw) = DbReaderWriter::wrap(db); + let db_backup_service = + start_backup_service(node_config.storage.backup_service_address, db_arc.clone()); + maybe_apply_genesis(&db_rw, node_config)?; + (db_arc as Arc, db_rw, Some(db_backup_service)) + }, + Either::Right(fast_sync_db_wrapper) => { + let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis(); + maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?; + let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper); + let fast_sync_db = db_arc.get_fast_sync_db(); + // FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis + let ledger_info = db_arc + .get_temporary_db_with_genesis() + .get_epoch_ending_ledger_info(0) + .expect("Genesis ledger info must exist"); + + if fast_sync_db + .get_latest_ledger_info_option() + .expect("should returns Ok results") + .is_none() + { + // it means the DB is empty and we need to + // commit the genesis ledger info to the DB. + fast_sync_db.commit_genesis_ledger_info(&ledger_info)?; + } + let db_backup_service = + start_backup_service(node_config.storage.backup_service_address, fast_sync_db); + (db_arc as Arc, db_rw, Some(db_backup_service)) + }, + }; + Ok(( + aptos_db_reader, + db_rw, + backup_service, + internal_indexer_db, + update_receiver, + )) } /// In consensus-only mode, return a in-memory based [FakeAptosDB] and @@ -157,6 +176,7 @@ pub fn initialize_database_and_checkpoints( Option, Waypoint, Option, + Option>, )> { // If required, create RocksDB checkpoints and change the working directory. // This is test-only. @@ -166,7 +186,8 @@ pub fn initialize_database_and_checkpoints( // Open the database let instant = Instant::now(); - let (_aptos_db, db_rw, backup_service, indexer_db_opt) = bootstrap_db(node_config)?; + let (_aptos_db, db_rw, backup_service, indexer_db_opt, update_receiver) = + bootstrap_db(node_config)?; // Log the duration to open storage debug!( @@ -179,5 +200,6 @@ pub fn initialize_database_and_checkpoints( backup_service, node_config.base.waypoint.genesis_waypoint(), indexer_db_opt, + update_receiver, )) } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index ca26fd0c6e49e..2feaf9c3b5702 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -9,26 +9,34 @@ use aptos_db_indexer::{ indexer_reader::IndexerReaders, }; use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; +use aptos_logger::info; use aptos_storage_interface::DbReader; use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version}; use std::{ path::{Path, PathBuf}, sync::Arc, + time::Duration, }; -use tokio::runtime::Handle; +use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver}; const SERVICE_TYPE: &str = "internal_indexer_db_service"; const INTERNAL_INDEXER_DB: &str = "internal_indexer_db"; pub struct InternalIndexerDBService { pub db_indexer: Arc, + pub update_receiver: WatchReceiver, } impl InternalIndexerDBService { - pub fn new(db_reader: Arc, internal_indexer_db: InternalIndexerDB) -> Self { + pub fn new( + db_reader: Arc, + internal_indexer_db: InternalIndexerDB, + update_receiver: WatchReceiver, + ) -> Self { let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader)); Self { db_indexer: internal_db_indexer, + update_receiver, } } @@ -140,9 +148,17 @@ impl InternalIndexerDBService { let next_version = self.db_indexer.process_a_batch(start_version)?; if next_version == start_version { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + if let Ok(recv_res) = + tokio::time::timeout(Duration::from_millis(100), self.update_receiver.changed()) + .await + { + if recv_res.is_err() { + info!("update sender is dropped"); + return Ok(()); + } + } continue; - } + }; log_grpc_step( SERVICE_TYPE, IndexerGrpcStep::InternalIndexerDBProcessed, @@ -166,7 +182,11 @@ pub struct MockInternalIndexerDBService { } impl MockInternalIndexerDBService { - pub fn new_for_test(db_reader: Arc, node_config: &NodeConfig) -> Self { + pub fn new_for_test( + db_reader: Arc, + node_config: &NodeConfig, + update_receiver: WatchReceiver, + ) -> Self { if !node_config .indexer_db_config .is_internal_indexer_db_enabled() @@ -179,7 +199,8 @@ impl MockInternalIndexerDBService { let db = InternalIndexerDBService::get_indexer_db(node_config).unwrap(); let handle = Handle::current(); - let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db); + let mut internal_indexer_db_service = + InternalIndexerDBService::new(db_reader, db, update_receiver); let db_indexer = internal_indexer_db_service.get_db_indexer(); let config_clone = node_config.to_owned(); handle.spawn(async move { diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index ff5c17d5d9b49..cfd9dfc2ce5b7 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -14,9 +14,9 @@ use aptos_db_indexer::{ }; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, transaction::Version}; use std::sync::Arc; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::watch::Receiver as WatchReceiver}; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; @@ -24,14 +24,18 @@ pub fn bootstrap_internal_indexer_db( config: &NodeConfig, db_rw: DbReaderWriter, internal_indexer_db: Option, + update_receiver: Option>, ) -> Option<(Runtime, Arc)> { if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() { return None; } let runtime = aptos_runtimes::spawn_named_runtime("index-db".to_string(), None); // Set up db config and open up the db initially to read metadata - let mut indexer_service = - InternalIndexerDBService::new(db_rw.reader, internal_indexer_db.unwrap()); + let mut indexer_service = InternalIndexerDBService::new( + db_rw.reader, + internal_indexer_db.unwrap(), + update_receiver.expect("Internal indexer db update receiver is missing"), + ); let db_indexer = indexer_service.get_db_indexer(); // Spawn task for db indexer let config_clone = config.to_owned(); diff --git a/storage/aptosdb/Cargo.toml b/storage/aptosdb/Cargo.toml index cc86b63746f0e..86477b75f3676 100644 --- a/storage/aptosdb/Cargo.toml +++ b/storage/aptosdb/Cargo.toml @@ -57,6 +57,7 @@ rayon = { workspace = true } serde = { workspace = true } static_assertions = { workspace = true } status-line = { workspace = true } +tokio = { workspace = true } [dev-dependencies] aptos-executor-types = { workspace = true } diff --git a/storage/aptosdb/src/db/include/aptosdb_internal.rs b/storage/aptosdb/src/db/include/aptosdb_internal.rs index 5556d8a41d6f1..73cc62d2dafd0 100644 --- a/storage/aptosdb/src/db/include/aptosdb_internal.rs +++ b/storage/aptosdb/src/db/include/aptosdb_internal.rs @@ -63,6 +63,7 @@ impl AptosDB { commit_lock: std::sync::Mutex::new(()), indexer: None, skip_index_and_usage, + update_subscriber: None, } } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 36a6931137b52..28f8008ee57f0 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -669,6 +669,13 @@ impl AptosDB { COMMITTED_TXNS.inc_by(num_txns); LATEST_TXN_VERSION.set(version as i64); + if let Some(update_sender) = &self.update_subscriber { + update_sender.send( + version + ).map_err(| err | { + AptosDbError::Other(format!("Failed to send update to subscriber: {}", err)) + })?; + } // Activate the ledger pruner and state kv pruner. // Note the state merkle pruner is activated when state snapshots are persisted // in their async thread. diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 51a85a940a779..2608e7ed52707 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -81,7 +81,7 @@ use std::{ sync::Arc, time::Instant, }; - +use tokio::sync::watch::Sender; #[cfg(test)] mod aptosdb_test; #[cfg(any(test, feature = "fuzzing"))] @@ -101,6 +101,7 @@ pub struct AptosDB { commit_lock: std::sync::Mutex<()>, indexer: Option, skip_index_and_usage: bool, + update_subscriber: Option>, } // DbReader implementations and private functions used by them. @@ -186,6 +187,11 @@ impl AptosDB { Ok((ledger_db, state_merkle_db, state_kv_db)) } + pub fn add_version_update_subscriber(&mut self, sender: Sender) -> Result<()> { + self.update_subscriber = Some(sender); + Ok(()) + } + /// Gets an instance of `BackupHandler` for data backup purpose. pub fn get_backup_handler(&self) -> BackupHandler { BackupHandler::new(Arc::clone(&self.state_store), Arc::clone(&self.ledger_db)) diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index ca5f1fe2f009a..703cfe9326f37 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -18,6 +18,7 @@ use aptos_types::{ }; use either::Either; use std::sync::Arc; +use tokio::sync::watch::Sender; pub const SECONDARY_DB_DIR: &str = "fast_sync_secondary"; @@ -44,8 +45,9 @@ impl FastSyncStorageWrapper { pub fn initialize_dbs( config: &NodeConfig, internal_indexer_db: Option, + update_sender: Option>, ) -> Result> { - let db_main = AptosDB::open( + let mut db_main = AptosDB::open( config.storage.get_dir_paths(), /*readonly=*/ false, config.storage.storage_pruner_config, @@ -56,6 +58,9 @@ impl FastSyncStorageWrapper { internal_indexer_db, ) .map_err(|err| anyhow!("fast sync DB failed to open {}", err))?; + if let Some(sender) = update_sender { + db_main.add_version_update_subscriber(sender)?; + } let mut db_dir = config.storage.dir(); // when the db is empty and configured to do fast sync, we will create a second DB diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index f21aade0905a5..63351a633a15d 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -363,6 +363,7 @@ impl DBIndexer { pub fn process_a_batch(&self, start_version: Version) -> Result { let mut version = start_version; let num_transactions = self.get_num_of_transactions(version)?; + // This promises num_transactions should be readable from main db let mut db_iter = self.get_main_db_iter(version, num_transactions)?; let batch = SchemaBatch::new(); db_iter.try_for_each(|res| { @@ -407,7 +408,9 @@ impl DBIndexer { version += 1; Ok::<(), AptosDbError>(()) })?; + // Assert we have processes all the readable transaction. assert_eq!(num_transactions, version - start_version); + if self.indexer_db.transaction_enabled() { batch.put::( &MetadataKey::TransactionVersion,