Skip to content

Commit

Permalink
fix(en): Fix connection starvation during snapshot recovery (#2836)
Browse files Browse the repository at this point in the history
## What ❔

- Fixes DB connection starvation during snapshot recovery. Caused by the
insufficient number of connections in the DB pool provided to recovery
logic.
- Additionally, fixes max concurrency of recovery not being set.

## Why ❔

Connection starvation errors degrade UX.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Sep 10, 2024
1 parent 6009499 commit 52f4f76
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,18 @@ impl WiringLayer for ExternalNodeInitStrategyLayer {
});
let snapshot_recovery = match self.snapshot_recovery_config {
Some(recovery_config) => {
// Add a connection for checking whether the storage is initialized.
let recovery_pool = input
.master_pool
.get_custom(self.max_postgres_concurrency.get() as u32)
.get_custom(self.max_postgres_concurrency.get() as u32 + 1)
.await?;
let recovery = Arc::new(ExternalNodeSnapshotRecovery {
let recovery: Arc<dyn InitializeStorage> = Arc::new(ExternalNodeSnapshotRecovery {
client: client.clone(),
pool: recovery_pool,
max_concurrency: self.max_postgres_concurrency,
recovery_config,
app_health,
}) as Arc<dyn InitializeStorage>;
});
Some(recovery)
}
None => None,
Expand Down
78 changes: 74 additions & 4 deletions core/node/node_storage_init/src/external_node/snapshot_recovery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, time::Instant};
use std::{num::NonZeroUsize, sync::Arc, time::Instant};

use anyhow::Context as _;
use tokio::sync::watch;
Expand All @@ -17,15 +17,25 @@ use crate::{InitializeStorage, SnapshotRecoveryConfig};
pub struct ExternalNodeSnapshotRecovery {
pub client: Box<DynClient<L2>>,
pub pool: ConnectionPool<Core>,
pub max_concurrency: NonZeroUsize,
pub recovery_config: SnapshotRecoveryConfig,
pub app_health: Arc<AppHealthCheck>,
}

#[async_trait::async_trait]
impl InitializeStorage for ExternalNodeSnapshotRecovery {
async fn initialize_storage(&self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let pool = self.pool.clone();
tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");

let pool_size = self.pool.max_size() as usize;
if pool_size < self.max_concurrency.get() + 1 {
tracing::error!(
"Connection pool has insufficient number of connections ({pool_size} vs concurrency {} + 1 connection for checks). \
This will likely lead to pool starvation during recovery.",
self.max_concurrency
);
}

let object_store_config =
self.recovery_config.object_store_config.clone().context(
"Snapshot object store must be presented if snapshot recovery is activated",
Expand All @@ -34,10 +44,13 @@ impl InitializeStorage for ExternalNodeSnapshotRecovery {
.create_store()
.await?;

let config = SnapshotsApplierConfig::default();
let config = SnapshotsApplierConfig {
max_concurrency: self.max_concurrency,
..SnapshotsApplierConfig::default()
};
let mut snapshots_applier_task = SnapshotsApplierTask::new(
config,
pool,
self.pool.clone(),
Box::new(self.client.clone().for_component("snapshot_recovery")),
object_store,
);
Expand Down Expand Up @@ -80,3 +93,60 @@ impl InitializeStorage for ExternalNodeSnapshotRecovery {
Ok(completed)
}
}

#[cfg(test)]
mod tests {
use std::future;

use zksync_types::{
tokens::{TokenInfo, TokenMetadata},
Address, L2BlockNumber,
};
use zksync_web3_decl::client::MockClient;

use super::*;

#[tokio::test]
async fn recovery_does_not_starve_pool_connections() {
let pool = ConnectionPool::constrained_test_pool(5).await;
let app_health = Arc::new(AppHealthCheck::new(None, None));
let client = MockClient::builder(L2::default())
.method("en_syncTokens", |_number: Option<L2BlockNumber>| {
Ok(vec![TokenInfo {
l1_address: Address::repeat_byte(1),
l2_address: Address::repeat_byte(2),
metadata: TokenMetadata {
name: "test".to_string(),
symbol: "TEST".to_string(),
decimals: 18,
},
}])
})
.build();
let recovery = ExternalNodeSnapshotRecovery {
client: Box::new(client),
pool,
max_concurrency: NonZeroUsize::new(4).unwrap(),
recovery_config: SnapshotRecoveryConfig {
snapshot_l1_batch_override: None,
drop_storage_key_preimages: false,
object_store_config: None,
},
app_health,
};

// Emulate recovery by indefinitely holding onto `max_concurrency` connections. In practice,
// the snapshot applier will release connections eventually, but it may require more time than the connection
// acquisition timeout configured for the DB pool.
for _ in 0..recovery.max_concurrency.get() {
let connection = recovery.pool.connection().await.unwrap();
tokio::spawn(async move {
future::pending::<()>().await;
drop(connection);
});
}

// The only token reported by the mock client isn't recovered
assert!(!recovery.is_initialized().await.unwrap());
}
}

0 comments on commit 52f4f76

Please sign in to comment.