Skip to content

Commit

Permalink
fog-view poll timing (#4005)
Browse files Browse the repository at this point in the history
* add MC_DB_POLLING_INTERVAL_MS option to view store server
  • Loading branch information
jgreat authored Sep 5, 2024
1 parent 1a3a703 commit 9fde94f
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 17 deletions.
18 changes: 14 additions & 4 deletions .github/workflows/mobilecoin-workflow-dev-setup-environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,27 @@ jobs:
object_name: fog-recovery-reader-0-postgresql
src: ${{ env.PG_PATH }}/sec

- name: Generate PostgreSQL values file
run: |
mkdir -p "${VALUES_BASE_PATH}"
cat <<EOF > "${VALUES_BASE_PATH}/postgresql-values.yaml"
architecture: replication
global:
postgresql:
auth:
database: fog_recovery
existingSecret: fog-recovery-postgresql
postgresqlSharedPreloadLibraries: pgaudit,pg_stat_statements
EOF
- name: Deploy PostgreSQL instance
uses: mobilecoinofficial/gha-k8s-toolbox@v1
with:
action: helm-deploy
chart_repo: https://charts.bitnami.com/bitnami
chart_name: postgresql
chart_version: 15.2.2
chart_set: |
--set=global.postgresql.auth.existingSecret=fog-recovery-postgresql
--set=global.postgresql.auth.database=fog_recovery
--set=architecture=replication
chart_values: ${{ env.VALUES_BASE_PATH }}/postgresql-values.yaml
chart_wait_timeout: 5m
release_name: fog-recovery-postgresql
namespace: ${{ inputs.namespace }}
Expand Down
6 changes: 5 additions & 1 deletion fog/view/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use clap::Parser;
use mc_common::ResponderId;
use mc_fog_sql_recovery_db::SqlRecoveryDbConnectionConfig;
use mc_fog_uri::{FogViewRouterUri, FogViewStoreUri, FogViewUri};
use mc_util_parse::parse_duration_in_seconds;
use mc_util_parse::{parse_duration_in_millis, parse_duration_in_seconds};
use mc_util_uri::AdminUri;
use serde::Serialize;
use std::{str::FromStr, time::Duration};
Expand Down Expand Up @@ -69,6 +69,10 @@ pub struct MobileAcctViewConfig {
#[clap(long, default_value = "1000", env = "MC_BLOCK_QUERY_BATCH_SIZE")]
pub block_query_batch_size: usize,

/// Database polling interval in ms.
#[clap(long, default_value = "250", value_parser = parse_duration_in_millis, env = "MC_DB_POLLING_INTERVAL_MS")]
pub db_polling_interval_ms: Duration,

/// Determines which group of TxOuts the Fog View Store instance will
/// process.
#[clap(long, default_value = "default", env = "MC_SHARDING_STRATEGY")]
Expand Down
17 changes: 11 additions & 6 deletions fog/view/server/src/db_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ use std::{
time::Duration,
};

/// Time to wait between database fetch attempts.
pub const DB_POLL_INTERNAL: Duration = Duration::from_millis(100);

/// Approximate maximum number of ETxOutRecords we will collect inside
/// fetched_records before blocking and waiting for the enclave thread to pick
/// them up. Since DB fetching is significantlly faster than enclave insertion
/// them up. Since DB fetching is significantly faster than enclave insertion
/// we need a mechanism that prevents fetched_records from growing indefinitely.
/// This essentially caps the memory usage of the fetched_records array.
/// Assuming each ETxOutRecord is <256 bytes, this gives a worst case scenario
Expand Down Expand Up @@ -77,6 +74,7 @@ pub struct DbFetcher {
impl DbFetcher {
pub fn new<DB, SS>(
db: DB,
db_polling_interval: Duration,
readiness_indicator: ReadinessIndicator,
sharding_strategy: SS,
block_query_batch_size: usize,
Expand Down Expand Up @@ -104,6 +102,7 @@ impl DbFetcher {
.spawn(move || {
DbFetcherThread::start(
db,
db_polling_interval,
thread_stop_requested,
thread_shared_state,
thread_num_queued_records_limiter,
Expand Down Expand Up @@ -179,6 +178,7 @@ where
SS: ShardingStrategy + Clone + Send + Sync + 'static,
{
db: DB,
db_polling_interval: Duration,
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbFetcherSharedState>>,
block_tracker: BlockTracker<SS>,
Expand All @@ -197,6 +197,7 @@ where
{
pub fn start(
db: DB,
db_polling_interval: Duration,
stop_requested: Arc<AtomicBool>,
shared_state: Arc<Mutex<DbFetcherSharedState>>,
num_queued_records_limiter: Arc<(Mutex<usize>, Condvar)>,
Expand All @@ -211,6 +212,7 @@ where
);
let thread = Self {
db,
db_polling_interval,
stop_requested,
shared_state,
block_tracker: BlockTracker::new(logger.clone(), sharding_strategy),
Expand Down Expand Up @@ -243,7 +245,7 @@ where
// loaded into the queue.
self.readiness_indicator.set_ready();

sleep(DB_POLL_INTERNAL);
sleep(self.db_polling_interval);
}
}

Expand Down Expand Up @@ -382,7 +384,7 @@ where
// We might have more work to do, we aren't sure because of the error
may_have_more_work = true;
// Let's back off for one interval when there is an error
sleep(DB_POLL_INTERNAL);
sleep(self.db_polling_interval);
}
}
}
Expand Down Expand Up @@ -415,6 +417,7 @@ mod tests {
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(
db.clone(),
Duration::from_millis(100),
Default::default(),
EpochShardingStrategy::default(),
1,
Expand Down Expand Up @@ -651,6 +654,7 @@ mod tests {
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(
db.clone(),
Duration::from_millis(100),
Default::default(),
EpochShardingStrategy::default(),
1,
Expand Down Expand Up @@ -714,6 +718,7 @@ mod tests {
let db = db_test_context.get_db_instance();
let db_fetcher = DbFetcher::new(
db.clone(),
Duration::from_millis(100),
Default::default(),
EpochShardingStrategy::default(),
1,
Expand Down
10 changes: 4 additions & 6 deletions fog/view/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,6 @@ where
logger: Logger,
}

/// How long to wait between polling db
const DB_POLL_INTERNAL: Duration = Duration::from_millis(100);

impl<E, DB, SS> DbPollThread<E, DB, SS>
where
E: ViewEnclaveProxy,
Expand Down Expand Up @@ -348,7 +345,7 @@ where
logger: Logger,
) {
log::debug!(logger, "Db poll thread started");

let polling_interval = config.db_polling_interval_ms;
let mut worker = DbPollThreadWorker::new(
config,
stop_requested,
Expand All @@ -369,7 +366,7 @@ where
WorkerTickResult::HasMoreWork => {}

WorkerTickResult::Sleep => {
sleep(DB_POLL_INTERNAL);
sleep(polling_interval);
}
}
}
Expand Down Expand Up @@ -434,7 +431,7 @@ pub enum WorkerTickResult {
Sleep,
}

/// Telemetry: block indes currently being worked on.
/// Telemetry: block index currently being worked on.
const TELEMETRY_BLOCK_INDEX_KEY: Key = telemetry_static_key!("block-index");

impl<E, DB, SS> DbPollThreadWorker<E, DB, SS>
Expand All @@ -457,6 +454,7 @@ where

let db_fetcher = DbFetcher::new(
db.clone(),
config.db_polling_interval_ms,
db_fetcher_readiness_indicator.clone(),
sharding_strategy.clone(),
config.block_query_batch_size,
Expand Down
1 change: 1 addition & 0 deletions fog/view/server/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl RouterTestEnvironment {
sharding_strategy,
postgres_config: Default::default(),
block_query_batch_size: 2,
db_polling_interval_ms: Duration::from_millis(100),
};

let enclave = SgxViewEnclave::new(
Expand Down

0 comments on commit 9fde94f

Please sign in to comment.