Skip to content

Commit

Permalink
fix(db): fix number of files in db, startup hang, ram issues and flus…
Browse files Browse the repository at this point in the history
…hing issues (#379)

Co-authored-by: antiyro <74653697+antiyro@users.noreply.github.com>
  • Loading branch information
cchudant and antiyro authored Nov 20, 2024
1 parent e140ab3 commit 018b6cc
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- fix(db): fix number of files in db, startup hang, ram issues and flushing issues
- fix: FeePayment conversion
- fix(block_production): get l2-to-l1 messages recursively from the call tree
- refactor: replace starknet-rs BlockId with types-rs BlockId and remove redundant mp_block::BlockId
Expand Down
71 changes: 7 additions & 64 deletions crates/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use db_metrics::DbMetrics;
use mp_chain_config::ChainConfig;
use mp_utils::service::Service;
use rocksdb::backup::{BackupEngine, BackupEngineOptions};
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBCompressionType, DBWithThreadMode, Env, FlushOptions, MultiThreaded,
Options, SliceTransform,
};
use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, Env, FlushOptions, MultiThreaded};
use rocksdb_options::rocksdb_global_options;
use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
Expand All @@ -28,6 +26,7 @@ pub mod db_metrics;
pub mod devnet_db;
mod error;
pub mod l1_db;
mod rocksdb_options;
pub mod storage_updates;
pub mod tests;

Expand All @@ -38,39 +37,8 @@ pub type WriteBatchWithTransaction = rocksdb::WriteBatchWithTransaction<false>;

const DB_UPDATES_BATCH_SIZE: usize = 1024;

#[allow(clippy::identity_op)] // allow 1 * MiB
#[allow(non_upper_case_globals)] // allow KiB/MiB/GiB names
pub fn open_rocksdb(path: &Path, create: bool) -> Result<Arc<DB>> {
const KiB: usize = 1024;
const MiB: usize = 1024 * KiB;
const GiB: usize = 1024 * MiB;

let mut opts = Options::default();
opts.set_report_bg_io_stats(true);
opts.set_use_fsync(false);
opts.create_if_missing(create);
opts.create_missing_column_families(true);
opts.set_keep_log_file_num(1);
opts.optimize_level_style_compaction(4 * GiB);
opts.set_compression_type(DBCompressionType::Zstd);
let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1);
opts.increase_parallelism(cores);

opts.set_atomic_flush(true);
opts.set_manual_wal_flush(true);
opts.set_max_subcompactions(cores as _);

opts.set_max_log_file_size(1 * MiB);
opts.set_max_open_files(512); // 512 is the value used by substrate for reference
opts.set_keep_log_file_num(3);
opts.set_log_level(rocksdb::LogLevel::Warn);

let mut env = Env::new().context("Creating rocksdb env")?;
// env.set_high_priority_background_threads(cores); // flushes
env.set_low_priority_background_threads(cores); // compaction

opts.set_env(&env);

pub fn open_rocksdb(path: &Path) -> Result<Arc<DB>> {
let opts = rocksdb_global_options()?;
tracing::debug!("opening db at {:?}", path.display());
let db = DB::open_cf_descriptors(
&opts,
Expand Down Expand Up @@ -265,31 +233,6 @@ impl Column {
Devnet => "devnet",
}
}

/// Per column rocksdb options, like memory budget, compaction profiles, block sizes for hdd/sdd
/// etc. TODO: add basic sensible defaults
pub(crate) fn rocksdb_options(&self) -> Options {
let mut opts = Options::default();
match self {
Column::ContractStorage => {
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(
contract_db::CONTRACT_STORAGE_PREFIX_EXTRACTOR,
));
}
Column::ContractToClassHashes => {
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(
contract_db::CONTRACT_CLASS_HASH_PREFIX_EXTRACTOR,
));
}
Column::ContractToNonces => {
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(
contract_db::CONTRACT_NONCES_PREFIX_EXTRACTOR,
));
}
_ => {}
}
opts
}
}

pub trait DatabaseExt {
Expand Down Expand Up @@ -386,7 +329,7 @@ impl MadaraBackend {
let temp_dir = tempfile::TempDir::with_prefix("madara-test").unwrap();
Arc::new(Self {
backup_handle: None,
db: open_rocksdb(temp_dir.as_ref(), true).unwrap(),
db: open_rocksdb(temp_dir.as_ref()).unwrap(),
last_flush_time: Default::default(),
chain_config,
db_metrics: DbMetrics::register().unwrap(),
Expand Down Expand Up @@ -425,7 +368,7 @@ impl MadaraBackend {
None
};

let db = open_rocksdb(&db_path, true)?;
let db = open_rocksdb(&db_path)?;

let backend = Arc::new(Self {
db_metrics: DbMetrics::register().context("Registering db metrics")?,
Expand Down
73 changes: 73 additions & 0 deletions crates/client/db/src/rocksdb_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#![allow(clippy::identity_op)] // allow 1 * MiB
#![allow(non_upper_case_globals)] // allow KiB/MiB/GiB names

use crate::{contract_db, Column};
use anyhow::{Context, Result};
use rocksdb::{DBCompressionType, Env, Options, SliceTransform};

const KiB: usize = 1024;
const MiB: usize = 1024 * KiB;
const GiB: usize = 1024 * MiB;

pub fn rocksdb_global_options() -> Result<Options> {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1);
options.increase_parallelism(cores);
options.set_max_background_jobs(cores);

options.set_atomic_flush(true);
options.set_max_subcompactions(cores as _);

options.set_max_log_file_size(10 * MiB);
options.set_max_open_files(2048);
options.set_keep_log_file_num(3);
options.set_log_level(rocksdb::LogLevel::Warn);

let mut env = Env::new().context("Creating rocksdb env")?;
// env.set_high_priority_background_threads(cores); // flushes
env.set_low_priority_background_threads(cores); // compaction

options.set_env(&env);

Ok(options)
}

impl Column {
/// Per column rocksdb options, like memory budget, compaction profiles, block sizes for hdd/sdd
/// etc.
pub(crate) fn rocksdb_options(&self) -> Options {
let mut options = Options::default();

match self {
Column::ContractStorage => {
options.set_prefix_extractor(SliceTransform::create_fixed_prefix(
contract_db::CONTRACT_STORAGE_PREFIX_EXTRACTOR,
));
}
Column::ContractToClassHashes => {
options.set_prefix_extractor(SliceTransform::create_fixed_prefix(
contract_db::CONTRACT_CLASS_HASH_PREFIX_EXTRACTOR,
));
}
Column::ContractToNonces => {
options.set_prefix_extractor(SliceTransform::create_fixed_prefix(
contract_db::CONTRACT_NONCES_PREFIX_EXTRACTOR,
));
}
_ => {}
}

options.set_compression_type(DBCompressionType::Zstd);
match self {
Column::BlockNToBlockInfo | Column::BlockNToBlockInner => {
options.optimize_universal_style_compaction(1 * GiB);
}
_ => {
options.optimize_universal_style_compaction(100 * MiB);
}
}
options
}
}
9 changes: 7 additions & 2 deletions crates/node/src/cli/chain_config_overrides.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use mp_chain_config::{
};
use mp_utils::parsers::parse_key_value_yaml;
use mp_utils::serde::{deserialize_duration, deserialize_private_key, serialize_duration};
use url::Url;

/// Override chain config parameters.
/// Format: "--chain-config-override chain_id=SN_MADARA,chain_name=MADARA,block_time=1500ms,bouncer_config.block_max_capacity.n_steps=100000000"
Expand All @@ -28,6 +29,8 @@ pub struct ChainConfigOverrideParams {
pub struct ChainConfigOverridesInner {
pub chain_name: String,
pub chain_id: ChainId,
pub feeder_gateway_url: Url,
pub gateway_url: Url,
pub native_fee_token_address: ContractAddress,
pub parent_fee_token_address: ContractAddress,
#[serde(deserialize_with = "deserialize_starknet_version", serialize_with = "serialize_starknet_version")]
Expand Down Expand Up @@ -66,6 +69,8 @@ impl ChainConfigOverrideParams {
eth_core_contract_address: chain_config.eth_core_contract_address,
eth_gps_statement_verifier: chain_config.eth_gps_statement_verifier,
private_key: chain_config.private_key,
feeder_gateway_url: chain_config.feeder_gateway_url,
gateway_url: chain_config.gateway_url,
})
.context("Failed to convert ChainConfig to Value")?;

Expand Down Expand Up @@ -101,8 +106,8 @@ impl ChainConfigOverrideParams {
Ok(ChainConfig {
chain_name: chain_config_overrides.chain_name,
chain_id: chain_config_overrides.chain_id,
feeder_gateway_url: chain_config.feeder_gateway_url,
gateway_url: chain_config.gateway_url,
feeder_gateway_url: chain_config_overrides.feeder_gateway_url,
gateway_url: chain_config_overrides.gateway_url,
native_fee_token_address: chain_config_overrides.native_fee_token_address,
parent_fee_token_address: chain_config_overrides.parent_fee_token_address,
latest_protocol_version: chain_config_overrides.latest_protocol_version,
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/service/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl SyncService {
) -> anyhow::Result<Self> {
let fetch_config = config.block_fetch_config(chain_config.chain_id.clone(), chain_config.clone());

tracing::info!("🛰️ Using feeder gateway URL: {}", fetch_config.feeder_gateway.as_str());
tracing::info!("🛰️ Using feeder gateway URL: {}", fetch_config.feeder_gateway.as_str());

Ok(Self {
db_backend: Arc::clone(db.backend()),
Expand Down

0 comments on commit 018b6cc

Please sign in to comment.