From 9fde94f7c8d1f28f7e2d0f28a5725ba75464d7ee Mon Sep 17 00:00:00 2001 From: Jason Greathouse Date: Thu, 5 Sep 2024 17:00:13 -0500 Subject: [PATCH] fog-view poll timing (#4005) * add MC_DB_POLLING_INTERVAL_MS option to view store server --- ...ilecoin-workflow-dev-setup-environment.yaml | 18 ++++++++++++++---- fog/view/server/src/config.rs | 6 +++++- fog/view/server/src/db_fetcher.rs | 17 +++++++++++------ fog/view/server/src/server.rs | 10 ++++------ fog/view/server/test-utils/src/lib.rs | 1 + 5 files changed, 35 insertions(+), 17 deletions(-) diff --git a/.github/workflows/mobilecoin-workflow-dev-setup-environment.yaml b/.github/workflows/mobilecoin-workflow-dev-setup-environment.yaml index fda53cf162..a7ced3b4e7 100644 --- a/.github/workflows/mobilecoin-workflow-dev-setup-environment.yaml +++ b/.github/workflows/mobilecoin-workflow-dev-setup-environment.yaml @@ -294,6 +294,19 @@ jobs: object_name: fog-recovery-reader-0-postgresql src: ${{ env.PG_PATH }}/sec + - name: Generate PostgreSQL values file + run: | + mkdir -p "${VALUES_BASE_PATH}" + cat < "${VALUES_BASE_PATH}/postgresql-values.yaml" + architecture: replication + global: + postgresql: + auth: + database: fog_recovery + existingSecret: fog-recovery-postgresql + postgresqlSharedPreloadLibraries: pgaudit,pg_stat_statements + EOF + - name: Deploy PostgreSQL instance uses: mobilecoinofficial/gha-k8s-toolbox@v1 with: @@ -301,10 +314,7 @@ jobs: chart_repo: https://charts.bitnami.com/bitnami chart_name: postgresql chart_version: 15.2.2 - chart_set: | - --set=global.postgresql.auth.existingSecret=fog-recovery-postgresql - --set=global.postgresql.auth.database=fog_recovery - --set=architecture=replication + chart_values: ${{ env.VALUES_BASE_PATH }}/postgresql-values.yaml chart_wait_timeout: 5m release_name: fog-recovery-postgresql namespace: ${{ inputs.namespace }} diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 806554b827..ae3fb8f4a7 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -7,7 +7,7 @@ use clap::Parser; use mc_common::ResponderId; use mc_fog_sql_recovery_db::SqlRecoveryDbConnectionConfig; use mc_fog_uri::{FogViewRouterUri, FogViewStoreUri, FogViewUri}; -use mc_util_parse::parse_duration_in_seconds; +use mc_util_parse::{parse_duration_in_millis, parse_duration_in_seconds}; use mc_util_uri::AdminUri; use serde::Serialize; use std::{str::FromStr, time::Duration}; @@ -69,6 +69,10 @@ pub struct MobileAcctViewConfig { #[clap(long, default_value = "1000", env = "MC_BLOCK_QUERY_BATCH_SIZE")] pub block_query_batch_size: usize, + /// Database polling interval in ms. + #[clap(long, default_value = "250", value_parser = parse_duration_in_millis, env = "MC_DB_POLLING_INTERVAL_MS")] + pub db_polling_interval_ms: Duration, + /// Determines which group of TxOuts the Fog View Store instance will /// process. #[clap(long, default_value = "default", env = "MC_SHARDING_STRATEGY")] diff --git a/fog/view/server/src/db_fetcher.rs b/fog/view/server/src/db_fetcher.rs index 5ddc60c672..0602697128 100644 --- a/fog/view/server/src/db_fetcher.rs +++ b/fog/view/server/src/db_fetcher.rs @@ -17,12 +17,9 @@ use std::{ time::Duration, }; -/// Time to wait between database fetch attempts. -pub const DB_POLL_INTERNAL: Duration = Duration::from_millis(100); - /// Approximate maximum number of ETxOutRecords we will collect inside /// fetched_records before blocking and waiting for the enclave thread to pick -/// them up. Since DB fetching is significantlly faster than enclave insertion +/// them up. Since DB fetching is significantly faster than enclave insertion /// we need a mechanism that prevents fetched_records from growing indefinitely. /// This essentially caps the memory usage of the fetched_records array. /// Assuming each ETxOutRecord is <256 bytes, this gives a worst case scenario @@ -77,6 +74,7 @@ pub struct DbFetcher { impl DbFetcher { pub fn new( db: DB, + db_polling_interval: Duration, readiness_indicator: ReadinessIndicator, sharding_strategy: SS, block_query_batch_size: usize, @@ -104,6 +102,7 @@ impl DbFetcher { .spawn(move || { DbFetcherThread::start( db, + db_polling_interval, thread_stop_requested, thread_shared_state, thread_num_queued_records_limiter, @@ -179,6 +178,7 @@ where SS: ShardingStrategy + Clone + Send + Sync + 'static, { db: DB, + db_polling_interval: Duration, stop_requested: Arc, shared_state: Arc>, block_tracker: BlockTracker, @@ -197,6 +197,7 @@ where { pub fn start( db: DB, + db_polling_interval: Duration, stop_requested: Arc, shared_state: Arc>, num_queued_records_limiter: Arc<(Mutex, Condvar)>, @@ -211,6 +212,7 @@ where ); let thread = Self { db, + db_polling_interval, stop_requested, shared_state, block_tracker: BlockTracker::new(logger.clone(), sharding_strategy), @@ -243,7 +245,7 @@ where // loaded into the queue. self.readiness_indicator.set_ready(); - sleep(DB_POLL_INTERNAL); + sleep(self.db_polling_interval); } } @@ -382,7 +384,7 @@ where // We might have more work to do, we aren't sure because of the error may_have_more_work = true; // Let's back off for one interval when there is an error - sleep(DB_POLL_INTERNAL); + sleep(self.db_polling_interval); } } } @@ -415,6 +417,7 @@ mod tests { let db = db_test_context.get_db_instance(); let db_fetcher = DbFetcher::new( db.clone(), + Duration::from_millis(100), Default::default(), EpochShardingStrategy::default(), 1, @@ -651,6 +654,7 @@ mod tests { let db = db_test_context.get_db_instance(); let db_fetcher = DbFetcher::new( db.clone(), + Duration::from_millis(100), Default::default(), EpochShardingStrategy::default(), 1, @@ -714,6 +718,7 @@ mod tests { let db = db_test_context.get_db_instance(); let db_fetcher = DbFetcher::new( db.clone(), + Duration::from_millis(100), Default::default(), EpochShardingStrategy::default(), 1, diff --git a/fog/view/server/src/server.rs b/fog/view/server/src/server.rs index 07e268ff4e..c9b9f33e18 100644 --- a/fog/view/server/src/server.rs +++ b/fog/view/server/src/server.rs @@ -251,9 +251,6 @@ where logger: Logger, } -/// How long to wait between polling db -const DB_POLL_INTERNAL: Duration = Duration::from_millis(100); - impl DbPollThread where E: ViewEnclaveProxy, @@ -348,7 +345,7 @@ where logger: Logger, ) { log::debug!(logger, "Db poll thread started"); - + let polling_interval = config.db_polling_interval_ms; let mut worker = DbPollThreadWorker::new( config, stop_requested, @@ -369,7 +366,7 @@ where WorkerTickResult::HasMoreWork => {} WorkerTickResult::Sleep => { - sleep(DB_POLL_INTERNAL); + sleep(polling_interval); } } } @@ -434,7 +431,7 @@ pub enum WorkerTickResult { Sleep, } -/// Telemetry: block indes currently being worked on. +/// Telemetry: block index currently being worked on. const TELEMETRY_BLOCK_INDEX_KEY: Key = telemetry_static_key!("block-index"); impl DbPollThreadWorker @@ -457,6 +454,7 @@ where let db_fetcher = DbFetcher::new( db.clone(), + config.db_polling_interval_ms, db_fetcher_readiness_indicator.clone(), sharding_strategy.clone(), config.block_query_batch_size, diff --git a/fog/view/server/test-utils/src/lib.rs b/fog/view/server/test-utils/src/lib.rs index 38e34faf12..9769d900a5 100644 --- a/fog/view/server/test-utils/src/lib.rs +++ b/fog/view/server/test-utils/src/lib.rs @@ -227,6 +227,7 @@ impl RouterTestEnvironment { sharding_strategy, postgres_config: Default::default(), block_query_batch_size: 2, + db_polling_interval_ms: Duration::from_millis(100), }; let enclave = SgxViewEnclave::new(