Skip to content

Commit

Permalink
records-txs
Browse files Browse the repository at this point in the history
  • Loading branch information
seanyoung committed Jun 11, 2024
1 parent 475e918 commit 52acdfc
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub fn load_and_process_ledger_or_exit(
process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>) {
load_and_process_ledger(
arg_matches,
Expand All @@ -108,6 +109,7 @@ pub fn load_and_process_ledger_or_exit(
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
transaction_status_sender,
)
.unwrap_or_else(|err| {
eprintln!("Exiting. Failed to load and process ledger: {err}");
Expand All @@ -122,6 +124,7 @@ pub fn load_and_process_ledger(
process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<(Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>), LoadAndProcessLedgerError> {
let bank_snapshots_dir = if blockstore.is_primary_access() {
blockstore.ledger_path().join("snapshot")
Expand Down Expand Up @@ -387,7 +390,7 @@ pub fn load_and_process_ledger(
Some(transaction_status_service),
)
} else {
(None, None)
(transaction_status_sender, None)
};

let result = blockstore_processor::process_blockstore_from_root(
Expand Down
151 changes: 135 additions & 16 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ use {
solana_ledger::{
blockstore::{create_new_ledger, Blockstore},
blockstore_options::{AccessType, LedgerColumnOptions},
blockstore_processor::ProcessSlotCallback,
blockstore_processor::{
ProcessSlotCallback, TransactionStatusMessage, TransactionStatusSender,
},
use_snapshot_archives_at_startup,
},
solana_measure::{measure, measure::Measure},
solana_runtime::{
bank::{bank_hash_details, Bank, RewardCalculationEvent},
bank::{
bank_hash_details::{self, SlotDetails, TransactionDetails},
Bank, RewardCalculationEvent,
},
bank_forks::BankForks,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_bank_utils,
Expand Down Expand Up @@ -73,6 +78,7 @@ use {
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader},
},
solana_stake_program::{points::PointValue, stake_state},
solana_transaction_status::UiInstruction,
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::{
self,
Expand All @@ -83,6 +89,7 @@ use {
ffi::OsStr,
fs::File,
io::{self, Write},
mem::swap,
path::{Path, PathBuf},
process::{exit, Command, Stdio},
str::FromStr,
Expand Down Expand Up @@ -1067,10 +1074,15 @@ fn main() {
.arg(
Arg::with_name("record_slots_config")
.long("record-slots-config")
.default_value("hash-only")
.possible_values(&["hash-only", "accounts"])
.multiple(true)
.takes_value(true)
.possible_values(&["accounts", "tx"])
.requires("record_slots")
.help("In the slot recording, include bank details or not"),
.conflicts_with_all(&[
"enable_rpc_transaction_history",
"geyser_plugin_config",
])
.help("In addition to the bank hash, optionally include accounts and/or transactions details for the slot"),
),
)
.subcommand(
Expand Down Expand Up @@ -1597,6 +1609,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);

println!(
Expand All @@ -1622,6 +1635,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
println!("{}", &bank_forks.read().unwrap().working_bank().hash());
}
Expand Down Expand Up @@ -1654,6 +1668,9 @@ fn main() {
exit(1);
}

let mut transaction_status_sender = None;
let mut tx_receiver = None;

let (slot_callback, record_slots_file, recorded_slots) = if arg_matches
.occurrences_of("record_slots")
> 0
Expand All @@ -1665,29 +1682,61 @@ fn main() {
exit(1);
});

let include_bank =
match arg_matches.value_of("record_slots_config").unwrap() {
"hash-only" => false,
"accounts" => true,
_ => unreachable!(),
};
let mut include_bank = false;
let mut include_tx = false;

if let Some(args) = arg_matches.values_of("record_slots_config") {
for arg in args {
match arg {
"tx" => include_tx = true,
"accounts" => include_bank = true,
_ => unreachable!(),
}
}
}

let slot_hashes = Arc::new(Mutex::new(Vec::new()));

if include_tx {
let (sender, receiver) = crossbeam_channel::unbounded();

transaction_status_sender = Some(TransactionStatusSender { sender });

let slots = Arc::clone(&slot_hashes);

tx_receiver = Some(std::thread::spawn(move || {
record_transactions(receiver, slots);
}));
}

let slot_callback = Arc::new({
let slots = Arc::clone(&slot_hashes);
move |bank: &Bank| {
let slot_details = if include_bank {
bank_hash_details::BankHashSlotDetails::try_from(bank).unwrap()
let mut details = if include_bank {
bank_hash_details::SlotDetails::try_from(bank).unwrap()
} else {
bank_hash_details::BankHashSlotDetails {
bank_hash_details::SlotDetails {
slot: bank.slot(),
bank_hash: bank.hash().to_string(),
..Default::default()
}
};

slots.lock().unwrap().push(slot_details);
let mut slots = slots.lock().unwrap();

if let Some(recorded_slot) =
slots.iter_mut().find(|f| f.slot == details.slot)
{
// copy all fields except transactions
swap(
&mut recorded_slot.transactions,
&mut details.transactions,
);

*recorded_slot = details;
} else {
slots.push(details);
}
}
});

Expand Down Expand Up @@ -1722,7 +1771,7 @@ fn main() {
bank.hash()
);
} else {
let bank_hash_details::BankHashSlotDetails {
let bank_hash_details::SlotDetails {
slot: expected_slot,
bank_hash: expected_hash,
..
Expand Down Expand Up @@ -1764,6 +1813,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
transaction_status_sender,
);

if print_accounts_stats {
Expand All @@ -1779,6 +1829,10 @@ fn main() {
.ok();
}

if let Some(tx_receiver) = tx_receiver {
tx_receiver.join().unwrap();
}

if let Some(recorded_slots_file) = record_slots_file {
if let Ok(recorded_slots) = recorded_slots.clone().unwrap().lock() {
let bank_hashes =
Expand Down Expand Up @@ -1821,6 +1875,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);

let dot = graph_forks(&bank_forks.read().unwrap(), &graph_config);
Expand Down Expand Up @@ -1984,6 +2039,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let mut bank = bank_forks
.read()
Expand Down Expand Up @@ -2373,6 +2429,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let bank = bank_forks.read().unwrap().working_bank();

Expand Down Expand Up @@ -2425,6 +2482,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let bank_forks = bank_forks.read().unwrap();
let slot = bank_forks.working_bank().slot();
Expand Down Expand Up @@ -2947,3 +3005,64 @@ fn main() {
measure_total_execution_time.stop();
info!("{}", measure_total_execution_time);
}

fn record_transactions(
recv: crossbeam_channel::Receiver<TransactionStatusMessage>,
slots: Arc<Mutex<Vec<SlotDetails>>>,
) {
for tsm in recv {
if let TransactionStatusMessage::Batch(batch) = tsm {
let slot = batch.bank.slot();

assert_eq!(batch.transactions.len(), batch.execution_results.len());

let transactions: Vec<_> = batch
.transactions
.iter()
.zip(batch.execution_results)
.zip(batch.transaction_indexes)
.map(|((tx, execution_results), index)| {
let message = tx.message();

let accounts: Vec<String> = message
.account_keys()
.iter()
.map(|acc| acc.to_string())
.collect();

let instructions = message
.instructions()
.iter()
.map(|ix| UiInstruction::parse(ix, &message.account_keys(), None))
.collect();

let is_simple_vote_tx = tx.is_simple_vote_transaction();

TransactionDetails {
accounts,
instructions,
is_simple_vote_tx,
execution_results,
index,
}
})
.collect();

let mut slots = slots.lock().unwrap();

if let Some(recorded_slot) = slots.iter_mut().find(|f| f.slot == slot) {
recorded_slot.transactions.extend(transactions);
} else {
slots.push(SlotDetails {
slot,
transactions,
..Default::default()
});
}
}
}

for slot in slots.lock().unwrap().iter_mut() {
slot.transactions.sort_by(|a, b| a.index.cmp(&b.index));
}
}
1 change: 1 addition & 0 deletions ledger-tool/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn load_blockstore(ledger_path: &Path, arg_matches: &ArgMatches<'_>) -> Arc<Bank
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let bank = bank_forks.read().unwrap().working_bank();
bank
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ solana-sdk = { workspace = true }
solana-stake-program = { workspace = true }
solana-svm = { workspace = true }
solana-system-program = { workspace = true }
solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
Expand Down
Loading

0 comments on commit 52acdfc

Please sign in to comment.