Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-version snapshot support #9980

Merged
merged 17 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions core/src/snapshot_packager_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ mod tests {
snapshot_package::AccountsPackage,
snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME},
};
use solana_runtime::{
accounts_db::AccountStorageEntry, bank::BankSlotDelta, bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
};
use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta};
use solana_sdk::hash::Hash;
use std::{
fs::{self, remove_dir_all, OpenOptions},
Expand Down Expand Up @@ -186,7 +184,6 @@ mod tests {
let dummy_slot_deltas: Vec<BankSlotDelta> = vec![];
snapshot_utils::serialize_snapshot_data_file(
&snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, &dummy_slot_deltas)?;
Ok(())
Expand Down
1 change: 0 additions & 1 deletion core/tests/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ mod tests {
&saved_snapshots_dir
.path()
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME),
solana_runtime::bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, &dummy_slot_deltas)?;
Ok(())
Expand Down
219 changes: 142 additions & 77 deletions ledger/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use log::*;
use regex::Regex;
use solana_measure::measure::Measure;
use solana_runtime::{
accounts_db::{SnapshotStorage, SnapshotStorages},
bank::{
self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta,
MAX_SNAPSHOT_DATA_FILE_SIZE,
bank::{Bank, BankSlotDelta},
serde_snapshot::{
bankrc_from_stream, bankrc_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
},
};
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey};
Expand All @@ -22,6 +21,8 @@ use std::{
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
process::ExitStatus,
str::FromStr,
sync::Arc,
};
use tar::Archive;
use tempfile::TempDir;
Expand All @@ -32,7 +33,47 @@ pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
pub const TAR_VERSION_FILE: &str = "version";

pub const SNAPSHOT_VERSION: &str = "1.1.0";
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
const VERSION_STRING_V1_1_0: &str = "1.1.0";
const VERSION_STRING_V1_2_0: &str = "1.2.0";
const OUTPUT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum SnapshotVersion {
V1_1_0,
V1_2_0,
}

impl From<SnapshotVersion> for &'static str {
fn from(snapshot_version: SnapshotVersion) -> &'static str {
match snapshot_version {
SnapshotVersion::V1_1_0 => VERSION_STRING_V1_1_0,
SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
}
}
}

impl FromStr for SnapshotVersion {
type Err = &'static str;

fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
match version_string {
VERSION_STRING_V1_1_0 => Ok(SnapshotVersion::V1_1_0),
VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
_ => Err("unsupported snapshot version"),
}
}
}

impl SnapshotVersion {
pub fn as_str(self) -> &'static str {
<&str as From<Self>>::from(self)
}

fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
version_string.parse::<Self>().ok()
}
}

#[derive(PartialEq, Ord, Eq, Debug)]
pub struct SlotSnapshotPaths {
Expand Down Expand Up @@ -192,7 +233,7 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
// Write version file
{
let mut f = std::fs::File::create(staging_version_file)?;
f.write_all(&SNAPSHOT_VERSION.to_string().into_bytes())?;
f.write_all(OUTPUT_SNAPSHOT_VERSION.as_str().as_bytes())?;
}

let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression);
Expand Down Expand Up @@ -289,13 +330,35 @@ where
}
}

pub fn serialize_snapshot_data_file<F>(
pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
where
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
{
serialize_snapshot_data_file_capped::<F>(
data_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
serializer,
)
}

pub fn deserialize_snapshot_data_file<F, T>(data_file_path: &Path, deserializer: F) -> Result<T>
where
F: FnOnce(&mut BufReader<File>) -> Result<T>,
{
deserialize_snapshot_data_file_capped::<F, T>(
data_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
deserializer,
)
}

fn serialize_snapshot_data_file_capped<F>(
data_file_path: &Path,
maximum_file_size: u64,
mut serializer: F,
serializer: F,
) -> Result<u64>
where
F: FnMut(&mut BufWriter<File>) -> Result<()>,
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
{
let data_file = File::create(data_file_path)?;
let mut data_file_stream = BufWriter::new(data_file);
Expand All @@ -313,13 +376,13 @@ where
Ok(consumed_size)
}

pub fn deserialize_snapshot_data_file<F, T>(
fn deserialize_snapshot_data_file_capped<F, T>(
data_file_path: &Path,
maximum_file_size: u64,
mut deserializer: F,
deserializer: F,
) -> Result<T>
where
F: FnMut(&mut BufReader<File>) -> Result<T>,
F: FnOnce(&mut BufReader<File>) -> Result<T>,
{
let file_size = fs::metadata(&data_file_path)?.len();

Expand Down Expand Up @@ -367,21 +430,18 @@ pub fn add_snapshot<P: AsRef<Path>>(
);

let mut bank_serialize = Measure::start("bank-serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_bank_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream.by_ref(), &*bank)?;
serialize_into(
stream.by_ref(),
&BankRcSerialize {
bank_rc: &bank.rc,
snapshot_storages,
},
)?;
Ok(())
},
)?;
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
serialize_into(stream.by_ref(), bank)?;
bankrc_to_stream(
SerdeStyle::NEWER,
stream.by_ref(),
&bank.rc,
snapshot_storages,
)?;
Ok(())
};
let consumed_size =
serialize_snapshot_data_file(&snapshot_bank_file_path, bank_snapshot_serializer)?;
bank_serialize.stop();

// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
Expand Down Expand Up @@ -414,14 +474,10 @@ pub fn serialize_status_cache(
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);

let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
let consumed_size = serialize_snapshot_data_file(
&snapshot_status_cache_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
},
)?;
let consumed_size = serialize_snapshot_data_file(&snapshot_status_cache_file_path, |stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
})?;
status_cache_serialize.stop();

// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
Expand Down Expand Up @@ -624,6 +680,13 @@ where
{
info!("snapshot version: {}", snapshot_version);

let snapshot_version_enum =
SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| {
get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version
))
})?;
let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
if snapshot_paths.len() > 1 {
return Err(get_io_error("invalid snapshot format"));
Expand All @@ -633,45 +696,47 @@ where
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;

info!("Loading bank from {:?}", &root_paths.snapshot_file_path);
let bank = deserialize_snapshot_data_file(
&root_paths.snapshot_file_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
let mut bank: Bank = match snapshot_version {
SNAPSHOT_VERSION => deserialize_from_snapshot(stream.by_ref())?,
_ => {
return Err(get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version
)));
}
};
bank.operating_mode = Some(genesis_config.operating_mode);
info!("Rebuilding accounts...");
let rc = bank::BankRc::from_stream(
let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| {
let mut bank: Bank = bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(&mut stream)?;

info!("Rebuilding accounts...");

let mut bankrc = match snapshot_version_enum {
SnapshotVersion::V1_1_0 => bankrc_from_stream(
SerdeStyle::OLDER,
account_paths,
bank.slot(),
&bank.ancestors,
frozen_account_pubkeys,
stream.by_ref(),
&mut stream,
&append_vecs_path,
)?;
bank.set_bank_rc(rc, bank::StatusCacheRc::default());
bank.finish_init();
Ok(bank)
},
)?;
),
SnapshotVersion::V1_2_0 => bankrc_from_stream(
SerdeStyle::NEWER,
account_paths,
bank.slot(),
&mut stream,
&append_vecs_path,
),
}?;
Arc::get_mut(&mut Arc::get_mut(&mut bankrc.accounts).unwrap().accounts_db)
.unwrap()
.freeze_accounts(&bank.ancestors, frozen_account_pubkeys);

bank.rc = bankrc;
bank.operating_mode = Some(genesis_config.operating_mode);
bank.finish_init();
Ok(bank)
})?;

let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let slot_deltas = deserialize_snapshot_data_file(
&status_cache_path,
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
info!("Rebuilding status cache...");
let slot_deltas: Vec<BankSlotDelta> = deserialize_from_snapshot(stream)?;
Ok(slot_deltas)
},
)?;
let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| {
info!("Rebuilding status cache...");
let slot_deltas: Vec<BankSlotDelta> = bincode::config()
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
.deserialize_from(stream)?;
Ok(slot_deltas)
})?;

bank.src.append(&slot_deltas);

Expand Down Expand Up @@ -726,7 +791,7 @@ mod tests {
fn test_serialize_snapshot_data_file_under_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let consumed_size = serialize_snapshot_data_file(
let consumed_size = serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
Expand All @@ -742,7 +807,7 @@ mod tests {
fn test_serialize_snapshot_data_file_over_limit() {
let temp_dir = tempfile::TempDir::new().unwrap();
let expected_consumed_size = size_of::<u32>() as u64;
let result = serialize_snapshot_data_file(
let result = serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| {
Expand All @@ -759,7 +824,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;

let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
Expand All @@ -769,7 +834,7 @@ mod tests {
)
.unwrap();

let actual_data = deserialize_snapshot_data_file(
let actual_data = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
Expand All @@ -784,7 +849,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;

let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size,
|stream| {
Expand All @@ -794,7 +859,7 @@ mod tests {
)
.unwrap();

let result = deserialize_snapshot_data_file(
let result = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size - 1,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
Expand All @@ -808,7 +873,7 @@ mod tests {
let expected_consumed_size = size_of::<u32>() as u64;

let temp_dir = tempfile::TempDir::new().unwrap();
serialize_snapshot_data_file(
serialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| {
Expand All @@ -819,7 +884,7 @@ mod tests {
)
.unwrap();

let result = deserialize_snapshot_data_file(
let result = deserialize_snapshot_data_file_capped(
&temp_dir.path().join("data-file"),
expected_consumed_size * 2,
|stream| Ok(deserialize_from::<_, u32>(stream)?),
Expand Down
7 changes: 6 additions & 1 deletion programs/bpf/tests/programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,12 @@ mod bpf {

assert!(bank_client
.send_message(
&[&mint_keypair, &argument_keypair, &invoked_argument_keypair, &from_keypair],
&[
&mint_keypair,
&argument_keypair,
&invoked_argument_keypair,
&from_keypair
],
message,
)
.is_ok());
Expand Down
Loading