Skip to content

Commit

Permalink
Add configurable limit to number of blocks to check before Bigtable u…
Browse files Browse the repository at this point in the history
…pload (#24716)

* Add ConfirmedBlockUploadConfig, no behavior changes

* Add comment

* A little DRY cleanup

* Add configurable limit to number of blocks to check in Blockstore and Bigtable before uploading

* Limit blockstore and bigtable look-ahead

* Exit iterator early when reach ending_slot

* Use rooted_slot_iterator instead of slot_meta_iterator

* Only check blocks in the ledger

(cherry picked from commit bc005e3)
  • Loading branch information
Tyera Eulberg authored and mergify[bot] committed May 13, 2022
1 parent 727707b commit 86ecd7a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 31 deletions.
15 changes: 13 additions & 2 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use {
display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation,
OutputFormat,
},
solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType},
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore,
blockstore_db::AccessType,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
solana_storage_bigtable::CredentialType,
solana_transaction_status::{
Expand All @@ -41,15 +44,23 @@ async fn upload(
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;

let config = ConfirmedBlockUploadConfig {
force_reupload,
..ConfirmedBlockUploadConfig::default()
};

solana_ledger::bigtable_upload::upload_confirmed_blocks(
Arc::new(blockstore),
bigtable,
starting_slot,
ending_slot,
force_reupload,
config,
Arc::new(AtomicBool::new(false)),
)
.await
.map(|last_slot_uploaded| {
info!("last slot uploaded: {}", last_slot_uploaded);
})
}

async fn delete_slots(
Expand Down
67 changes: 44 additions & 23 deletions ledger/src/bigtable_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
solana_measure::measure::Measure,
solana_sdk::clock::Slot,
std::{
cmp::min,
collections::HashSet,
result::Result,
sync::{
Expand All @@ -15,32 +16,46 @@ use {
},
};

// Attempt to upload this many blocks in parallel
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
#[derive(Clone)]
pub struct ConfirmedBlockUploadConfig {
pub force_reupload: bool,
pub max_num_slots_to_check: usize,
pub num_blocks_to_upload_in_parallel: usize,
pub block_read_ahead_depth: usize, // should always be >= `num_blocks_to_upload_in_parallel`
}

// Read up to this many blocks from blockstore before blocking on the upload process
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2;
impl Default for ConfirmedBlockUploadConfig {
fn default() -> Self {
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
ConfirmedBlockUploadConfig {
force_reupload: false,
max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4,
num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL,
block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2,
}
}
}

pub async fn upload_confirmed_blocks(
blockstore: Arc<Blockstore>,
bigtable: solana_storage_bigtable::LedgerStorage,
starting_slot: Slot,
ending_slot: Option<Slot>,
force_reupload: bool,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<Slot, Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload");

info!("Loading ledger slots starting at {}...", starting_slot);
let blockstore_slots: Vec<_> = blockstore
.slot_meta_iterator(starting_slot)
.rooted_slot_iterator(starting_slot)
.map_err(|err| {
format!(
"Failed to load entries starting from slot {}: {:?}",
starting_slot, err
)
})?
.filter_map(|(slot, _slot_meta)| {
.map_while(|slot| {
if let Some(ending_slot) = &ending_slot {
if slot > *ending_slot {
return None;
Expand All @@ -58,27 +73,31 @@ pub async fn upload_confirmed_blocks(
.into());
}

let first_blockstore_slot = blockstore_slots.first().unwrap();
let last_blockstore_slot = blockstore_slots.last().unwrap();
info!(
"Found {} slots in the range ({}, {})",
blockstore_slots.len(),
blockstore_slots.first().unwrap(),
blockstore_slots.last().unwrap()
first_blockstore_slot,
last_blockstore_slot,
);

// Gather the blocks that are already present in bigtable, by slot
let bigtable_slots = if !force_reupload {
let bigtable_slots = if !config.force_reupload {
let mut bigtable_slots = vec![];
let first_blockstore_slot = *blockstore_slots.first().unwrap();
let last_blockstore_slot = *blockstore_slots.last().unwrap();
info!(
"Loading list of bigtable blocks between slots {} and {}...",
first_blockstore_slot, last_blockstore_slot
);

let mut start_slot = *blockstore_slots.first().unwrap();
while start_slot <= last_blockstore_slot {
let mut start_slot = *first_blockstore_slot;
while start_slot <= *last_blockstore_slot {
let mut next_bigtable_slots = loop {
match bigtable.get_confirmed_blocks(start_slot, 1000).await {
let num_bigtable_blocks = min(1000, config.max_num_slots_to_check * 2);
match bigtable
.get_confirmed_blocks(start_slot, num_bigtable_blocks)
.await
{
Ok(slots) => break slots,
Err(err) => {
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
Expand All @@ -95,7 +114,7 @@ pub async fn upload_confirmed_blocks(
}
bigtable_slots
.into_iter()
.filter(|slot| *slot <= last_blockstore_slot)
.filter(|slot| slot <= last_blockstore_slot)
.collect::<Vec<_>>()
} else {
Vec::new()
Expand All @@ -112,25 +131,27 @@ pub async fn upload_confirmed_blocks(
.cloned()
.collect::<Vec<_>>();
blocks_to_upload.sort_unstable();
blocks_to_upload.truncate(config.max_num_slots_to_check);
blocks_to_upload
};

if blocks_to_upload.is_empty() {
info!("No blocks need to be uploaded to bigtable");
return Ok(());
return Ok(*last_blockstore_slot);
}
let last_slot = *blocks_to_upload.last().unwrap();
info!(
"{} blocks to be uploaded to the bucket in the range ({}, {})",
blocks_to_upload.len(),
blocks_to_upload.first().unwrap(),
blocks_to_upload.last().unwrap()
last_slot
);

// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = {
let exit = exit.clone();

let (sender, receiver) = bounded(BLOCK_READ_AHEAD_DEPTH);
let (sender, receiver) = bounded(config.block_read_ahead_depth);
(
std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread");
Expand All @@ -150,7 +171,7 @@ pub async fn upload_confirmed_blocks(
}
};

if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 {
if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 {
info!(
"{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(),
Expand All @@ -170,7 +191,7 @@ pub async fn upload_confirmed_blocks(
use futures::stream::StreamExt;

let mut stream =
tokio_stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
tokio_stream::iter(receiver.into_iter()).chunks(config.num_blocks_to_upload_in_parallel);

while let Some(blocks) = stream.next().await {
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -205,6 +226,6 @@ pub async fn upload_confirmed_blocks(
if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into())
} else {
Ok(())
Ok(last_slot)
}
}
41 changes: 36 additions & 5 deletions ledger/src/bigtable_upload_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use {
crate::{bigtable_upload, blockstore::Blockstore},
crate::{
bigtable_upload::{self, ConfirmedBlockUploadConfig},
blockstore::Blockstore,
},
solana_runtime::commitment::BlockCommitmentCache,
std::{
cmp::min,
Expand All @@ -24,6 +27,26 @@ impl BigTableUploadService {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
exit: Arc<AtomicBool>,
) -> Self {
Self::new_with_config(
runtime,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
max_complete_transaction_status_slot,
ConfirmedBlockUploadConfig::default(),
exit,
)
}

pub fn new_with_config(
runtime: Arc<Runtime>,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>,
) -> Self {
info!("Starting BigTable upload service");
let thread = Builder::new()
Expand All @@ -35,6 +58,7 @@ impl BigTableUploadService {
blockstore,
block_commitment_cache,
max_complete_transaction_status_slot,
config,
exit,
)
})
Expand All @@ -49,18 +73,25 @@ impl BigTableUploadService {
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>,
) {
let mut start_slot = 0;
let mut start_slot = blockstore.get_first_available_block().unwrap_or_default();
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let end_slot = min(
// The highest slot eligible for upload is the highest root that has complete
// transaction-status metadata
let highest_complete_root = min(
max_complete_transaction_status_slot.load(Ordering::SeqCst),
block_commitment_cache.read().unwrap().root(),
);
let end_slot = min(
highest_complete_root,
start_slot.saturating_add(config.max_num_slots_to_check as u64 * 2),
);

if end_slot <= start_slot {
std::thread::sleep(std::time::Duration::from_secs(1));
Expand All @@ -72,12 +103,12 @@ impl BigTableUploadService {
bigtable_ledger_storage.clone(),
start_slot,
Some(end_slot),
false,
config.clone(),
exit.clone(),
));

match result {
Ok(()) => start_slot = end_slot,
Ok(last_slot_uploaded) => start_slot = last_slot_uploaded,
Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
Expand Down
4 changes: 3 additions & 1 deletion rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
solana_client::rpc_cache::LargestAccountsCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig,
bigtable_upload_service::BigTableUploadService, blockstore::Blockstore,
leader_schedule_cache::LeaderScheduleCache,
},
Expand Down Expand Up @@ -395,12 +396,13 @@ impl JsonRpcService {
info!("BigTable ledger storage initialized");

let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload {
Some(Arc::new(BigTableUploadService::new(
Some(Arc::new(BigTableUploadService::new_with_config(
runtime.clone(),
bigtable_ledger_storage.clone(),
blockstore.clone(),
block_commitment_cache.clone(),
current_transaction_status_slot.clone(),
ConfirmedBlockUploadConfig::default(),
exit_bigtable_ledger_upload_service.clone(),
)))
} else {
Expand Down

0 comments on commit 86ecd7a

Please sign in to comment.