Skip to content

Commit

Permalink
Add ledger-tool dead-slots and improve purge a lot (#13065)
Browse files Browse the repository at this point in the history
* Add ledger-tool dead-slots and improve purge a lot

* Reduce batch size...

* Add --dead-slots-only and fixed purge ordering
  • Loading branch information
ryoqun authored Oct 21, 2020
1 parent e10de86 commit 0776fa0
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,8 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi

let end_slot = last_slot.unwrap();
info!("Purging slots {} to {}", start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
blockstore.purge_from_next_slots(start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!("Purging done, compacting db..");
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
warn!(
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = "2.33.1"
futures = "0.3.5"
futures-util = "0.3.5"
histogram = "*"
itertools = "0.9.0"
log = { version = "0.4.8" }
regex = "1"
serde_json = "1.0.56"
Expand Down
89 changes: 81 additions & 8 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg,
ArgMatches, SubCommand,
};
use itertools::Itertools;
use log::*;
use regex::Regex;
use serde_json::json;
Expand Down Expand Up @@ -889,6 +890,11 @@ fn main() {
)
.arg(&allow_dead_slots_arg)
)
.subcommand(
SubCommand::with_name("dead-slots")
.arg(&starting_slot_arg)
.about("Print all of dead slots")
)
.subcommand(
SubCommand::with_name("set-dead-slot")
.about("Mark one or more slots dead")
Expand Down Expand Up @@ -1203,13 +1209,28 @@ fn main() {
.value_name("SLOT")
.help("Ending slot to stop purging (inclusive) [default: the highest slot in the ledger]"),
)
.arg(
Arg::with_name("batch_size")
.long("batch-size")
.value_name("NUM")
.takes_value(true)
.default_value("1000")
.help("Removes at most BATCH_SIZE slots while purging in loop"),
)
.arg(
Arg::with_name("no_compaction")
.long("no-compaction")
.required(false)
.takes_value(false)
.help("Skip ledger compaction after purge")
)
.arg(
Arg::with_name("dead_slots_only")
.long("dead-slots-only")
.required(false)
.takes_value(false)
.help("Limit puring to dead slots only")
)
)
.subcommand(
SubCommand::with_name("list-roots")
Expand Down Expand Up @@ -1445,6 +1466,17 @@ fn main() {
true,
);
}
("dead-slots", Some(arg_matches)) => {
let blockstore = open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
for slot in blockstore.dead_slots_iterator(starting_slot).unwrap() {
println!("{}", slot);
}
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore =
Expand Down Expand Up @@ -2045,9 +2077,15 @@ fn main() {
("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t!(arg_matches, "end_slot", Slot).ok();
let no_compaction = arg_matches.is_present("no-compaction");
let blockstore =
open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode);
let no_compaction = arg_matches.is_present("no_compaction");
let dead_slots_only = arg_matches.is_present("dead_slots_only");
let batch_size = value_t_or_exit!(arg_matches, "batch_size", usize);
let access_type = if !no_compaction {
AccessType::PrimaryOnly
} else {
AccessType::PrimaryOnlyForMaintenance
};
let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode);

let end_slot = match end_slot {
Some(end_slot) => end_slot,
Expand All @@ -2074,13 +2112,48 @@ fn main() {
);
exit(1);
}
println!("Purging data from slots {} to {}", start_slot, end_slot);
if no_compaction {
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!(
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
start_slot,
end_slot,
end_slot - start_slot,
no_compaction,
dead_slots_only,
);
let purge_from_blockstore = |start_slot, end_slot| {
blockstore.purge_from_next_slots(start_slot, end_slot);
if no_compaction {
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
} else {
blockstore.purge_and_compact_slots(start_slot, end_slot);
}
};
if !dead_slots_only {
let slots_iter = &(start_slot..=end_slot).chunks(batch_size);
for slots in slots_iter {
let slots = slots.collect::<Vec<_>>();
assert!(!slots.is_empty());

let start_slot = *slots.first().unwrap();
let end_slot = *slots.last().unwrap();
info!(
"Purging chunked slots from {} to {} ({} slots)",
start_slot,
end_slot,
end_slot - start_slot
);
purge_from_blockstore(start_slot, end_slot);
}
} else {
blockstore.purge_and_compact_slots(start_slot, end_slot);
let dead_slots_iter = blockstore
.dead_slots_iterator(start_slot)
.unwrap()
.take_while(|s| *s <= end_slot);
for dead_slot in dead_slots_iter {
info!("Purging dead slot {}", dead_slot);
purge_from_blockstore(dead_slot, dead_slot);
}
}
blockstore.purge_from_next_slots(start_slot, end_slot);
}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(
Expand Down
6 changes: 5 additions & 1 deletion ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl Blockstore {
meta.next_slots
.retain(|slot| *slot < from_slot || *slot > to_slot);
if meta.next_slots.len() != original_len {
info!("purge_from_next_slots: adjusted meta for slot {}", slot);
info!(
"purge_from_next_slots: meta for slot {} no longer refers to slots {:?}",
slot,
from_slot..=to_slot
);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
Expand Down
54 changes: 35 additions & 19 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pub mod columns {

pub enum AccessType {
PrimaryOnly,
PrimaryOnlyForMaintenance, // this indicates no compaction
TryPrimaryThenSecondary,
}

Expand Down Expand Up @@ -217,37 +218,45 @@ impl Rocks {
fs::create_dir_all(&path)?;

// Use default database options
let mut db_options = get_db_options();
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update...");
}
let mut db_options = get_db_options(&access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}

// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type));
let duplicate_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type));
let transaction_status_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options());
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type));
let address_signatures_cf_descriptor =
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options());
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type));
let transaction_status_index_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options());
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type));
let rewards_cf_descriptor =
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type));
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));

let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
Expand All @@ -272,7 +281,7 @@ impl Rocks {

// Open the database
let db = match access_type {
AccessType::PrimaryOnly => Rocks(
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary,
),
Expand Down Expand Up @@ -1003,7 +1012,7 @@ impl<'a> WriteBatch<'a> {
}
}

fn get_cf_options() -> Options {
fn get_cf_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
// 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
options.set_max_write_buffer_number(8);
Expand All @@ -1017,10 +1026,14 @@ fn get_cf_options() -> Options {
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}

options
}

fn get_db_options() -> Options {
fn get_db_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
Expand All @@ -1029,6 +1042,9 @@ fn get_db_options() -> Options {

// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}

options
}

0 comments on commit 0776fa0

Please sign in to comment.