From 6d58b73c47294bfb93465d5a83cd2175660b6e6d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 20 May 2020 14:02:02 +0900 Subject: [PATCH] Confine snapshot 1.1 relic to versioned codepath --- runtime/src/serde_snapshot.rs | 358 ++++++++++++++++++++++++---------- 1 file changed, 254 insertions(+), 104 deletions(-) diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 4b6616d7d1fb09..965d4e7bc2ac1b 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -39,12 +39,45 @@ pub trait SerdeContext<'a> { + From<&'a AccountStorageEntry> + Into; + fn serialize_bank_rc_fields( + serializer: S, + serializable_bank: &SerializableBankRc<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized; + + fn serialize_accounts_db_fields( + serializer: S, + serializable_db: &SerializableAccountsDB<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized; + + fn deserialize_accounts_db_fields( + stream: &mut BufReader, + ) -> Result< + ( + HashMap>, + u64, + Slot, + BankHashInfo, + ), + IoError, + > + where + R: Read; + + // we might define fn (de)serialize_bank(...) -> Result for versionized bank serialization in the future + + // this fn can now be removed fn legacy_or_zero(x: T) -> T; + // this fn can now be removed fn legacy_serialize_byte_length(serializer: &mut S, x: u64) -> Result<(), S::Error> where S: SerializeTuple; + // this fn can now be removed fn legacy_deserialize_byte_length(stream: &mut R) -> Result; } @@ -52,6 +85,121 @@ pub struct SerdeContextV1_1_0 {} impl<'a> SerdeContext<'a> for SerdeContextV1_1_0 { type SerializableAccountStorageEntry = SerializableAccountStorageEntryV1_1_0; + fn serialize_bank_rc_fields( + serializer: S, + serializable_bank: &SerializableBankRc<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + // let's preserve the exact behavior for maximum compatibility + // there is no gurantee how bincode encode things by using serde constructs (like + // serialize_tuple()) + use serde::ser::Error; + let mut wr = Cursor::new(Vec::new()); + let accounts_db_serialize = SerializableAccountsDB::<'a, Self> { + accounts_db: &*serializable_bank.bank_rc.accounts.accounts_db, + slot: serializable_bank.bank_rc.slot, + account_storage_entries: serializable_bank.snapshot_storages, + phantom: std::marker::PhantomData::default(), + }; + serialize_into(&mut wr, &accounts_db_serialize).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } + + fn serialize_accounts_db_fields( + serializer: S, + serializable_db: &SerializableAccountsDB<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + // let's preserve the exact behavior for maximum compatibility + // there is no gurantee how bincode encode things by using serde constructs (like + // serialize_tuple()) + use serde::ser::Error; + let mut wr = Cursor::new(vec![]); + + // sample write version before serializing storage entries + let version = serializable_db + .accounts_db + .write_version + .load(Ordering::Relaxed); + + let entries = serializable_db + .account_storage_entries + .iter() + .map(|x| { + ( + x.first().unwrap().slot, + x.iter() + .map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())) + .collect::>(), + ) + }) + .collect::>(); + + let bank_hashes = serializable_db.accounts_db.bank_hashes.read().unwrap(); + + // write the list of account storage entry lists out as a map + serialize_into(&mut wr, &entries).map_err(Error::custom)?; + // write the current write version sampled before the account + // storage entries were written out + serialize_into(&mut wr, &version).map_err(Error::custom)?; + + serialize_into( + &mut wr, + &( + serializable_db.slot, + &*bank_hashes.get(&serializable_db.slot).unwrap_or_else(|| { + panic!("No bank_hashes entry for slot {}", serializable_db.slot) + }), + ), + ) + .map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } + + fn deserialize_accounts_db_fields( + mut stream: &mut BufReader, + ) -> Result< + ( + HashMap>, + u64, + Slot, + BankHashInfo, + ), + IoError, + > + where + R: Read, + { + // Read and discard the prepended serialized byte vector length + let _serialized_len: u64 = deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?; + + // read and discard u64 byte vector length + // (artifact from accountsdb_to_stream serializing first + // into byte vector and then into stream) + let serialized_len: u64 = deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?; + + // read map of slots to account storage entries + let storage: HashMap> = bincode::config() + .limit(min(serialized_len, MAX_ACCOUNTS_DB_STREAM_SIZE)) + .deserialize_from(&mut stream) + .map_err(accountsdb_to_io_error)?; + let version: u64 = deserialize_from(&mut stream) + .map_err(|e| format!("write version deserialize error: {}", e.to_string())) + .map_err(accountsdb_to_io_error)?; + + let (slot, bank_hash_info): (Slot, BankHashInfo) = deserialize_from(&mut stream) + .map_err(|e| format!("bank hashes deserialize error: {}", e.to_string())) + .map_err(accountsdb_to_io_error)?; + + Ok((storage, version, slot, bank_hash_info)) + } + fn legacy_or_zero(x: T) -> T { x } @@ -72,6 +220,79 @@ pub struct SerdeContextV1_2_0 {} impl<'a> SerdeContext<'a> for SerdeContextV1_2_0 { type SerializableAccountStorageEntry = SerializableAccountStorageEntryV1_2_0; + fn serialize_bank_rc_fields( + serializer: S, + serializable_bank: &SerializableBankRc<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + let accounts_db_serialize = SerializableAccountsDB::<'a, Self> { + accounts_db: &*serializable_bank.bank_rc.accounts.accounts_db, + slot: serializable_bank.bank_rc.slot, + account_storage_entries: serializable_bank.snapshot_storages, + phantom: std::marker::PhantomData::default(), + }; + + accounts_db_serialize.serialize(serializer) + } + + fn serialize_accounts_db_fields( + serializer: S, + serializable_db: &SerializableAccountsDB<'a, Self>, + ) -> std::result::Result + where + Self: std::marker::Sized, + { + // sample write version before serializing storage entries + let version = serializable_db + .accounts_db + .write_version + .load(Ordering::Relaxed); + + let entries = serializable_db + .account_storage_entries + .iter() + .map(|x| { + ( + x.first().unwrap().slot, + x.iter() + .map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())) + .collect::>(), + ) + }) + .collect::>(); + + let bank_hashes = serializable_db.accounts_db.bank_hashes.read().unwrap(); + + ( + entries, + version, + serializable_db.slot, + &*bank_hashes.get(&serializable_db.slot).unwrap_or_else(|| { + panic!("No bank_hashes entry for slot {}", serializable_db.slot) + }), + ) + .serialize(serializer) + } + + fn deserialize_accounts_db_fields( + mut stream: &mut BufReader, + ) -> Result< + ( + HashMap>, + u64, + Slot, + BankHashInfo, + ), + IoError, + > + where + R: Read, + { + deserialize_from(&mut stream).map_err(accountsdb_to_io_error) + } + fn legacy_or_zero(_x: T) -> T { T::default() } @@ -143,8 +364,8 @@ where R: Read, P: AsRef, { - context_accountsdb_from_stream::( - stream, + context_accountsdb_from_fields::( + DefaultSerdeContext::deserialize_accounts_db_fields(stream)?, account_paths, stream_append_vecs_path, ) @@ -162,7 +383,7 @@ where { serialize_into( stream, - &SerializableAccountsDatabaseUnversioned:: { + &SerializableAccountsDB:: { accounts_db, slot, account_storage_entries, @@ -175,7 +396,7 @@ where pub fn context_bankrc_from_stream<'a, C, R, P>( account_paths: &[PathBuf], slot: Slot, - mut stream: &mut BufReader, + stream: &mut BufReader, stream_append_vecs_path: P, ) -> std::result::Result where @@ -183,13 +404,9 @@ where R: Read, P: AsRef, { - // Possibly read and discard the prepended serialized byte vector length - let _serialized_len: u64 = - C::legacy_deserialize_byte_length(&mut stream).map_err(accountsdb_to_io_error)?; - // read and deserialise the accounts database directly from the stream - let accounts = Accounts::new_empty(context_accountsdb_from_stream::( - stream, + let accounts = Accounts::new_empty(context_accountsdb_from_fields::( + C::deserialize_accounts_db_fields(stream)?, account_paths, stream_append_vecs_path, )?); @@ -201,6 +418,21 @@ where }) } +pub struct SerializableBankRc<'a, C> { + bank_rc: &'a BankRc, + snapshot_storages: &'a [SnapshotStorage], + phantom: std::marker::PhantomData, +} + +impl<'a, C: SerdeContext<'a>> Serialize for SerializableBankRc<'a, C> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + C::serialize_bank_rc_fields(serializer, self) + } +} + pub fn context_bankrc_to_stream<'a, 'b, C, W>( stream: &'b mut BufWriter, bank_rc: &'a BankRc, @@ -210,36 +442,9 @@ where C: SerdeContext<'a>, W: Write, { - struct BankRcSerialize<'a, C> { - bank_rc: &'a BankRc, - snapshot_storages: &'a [SnapshotStorage], - phantom: std::marker::PhantomData, - } - - impl<'a, C: SerdeContext<'a>> Serialize for BankRcSerialize<'a, C> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::ser::Serializer, - { - let mut seq = serializer.serialize_tuple(C::legacy_or_zero(1) + 1)?; - - // Possibly write out a dummy vector length for backward compatibility - C::legacy_serialize_byte_length(&mut seq, MAX_ACCOUNTS_DB_STREAM_SIZE)?; - - seq.serialize_element(&SerializableAccountsDatabaseUnversioned::<'a, C> { - accounts_db: &*self.bank_rc.accounts.accounts_db, - slot: self.bank_rc.slot, - account_storage_entries: self.snapshot_storages, - phantom: std::marker::PhantomData::default(), - })?; - - seq.end() - } - } - serialize_into( stream, - &BankRcSerialize:: { + &SerializableBankRc:: { bank_rc, snapshot_storages, phantom: std::marker::PhantomData::default(), @@ -248,30 +453,22 @@ where .map_err(bankrc_to_io_error) } -fn context_accountsdb_from_stream<'a, C, R, P>( - mut stream: &mut BufReader, +fn context_accountsdb_from_fields<'a, C, P>( + (storage, version, slot, bank_hash_info): ( + HashMap>, + u64, + Slot, + BankHashInfo, + ), account_paths: &[PathBuf], stream_append_vecs_path: P, ) -> Result where C: SerdeContext<'a>, - R: Read, P: AsRef, { let accounts_db = AccountsDB::new(account_paths.to_vec()); - // read and discard u64 byte vector length - // (artifact from accountsdb_to_stream serializing first - // into byte vector and then into stream) - let serialized_len: u64 = - C::legacy_deserialize_byte_length(&mut stream).map_err(accountsdb_to_io_error)?; - - // read map of slots to account storage entries - let storage: HashMap> = bincode::config() - .limit(min(serialized_len, MAX_ACCOUNTS_DB_STREAM_SIZE)) - .deserialize_from(&mut stream) - .map_err(accountsdb_to_io_error)?; - // convert to two level map of slot -> id -> account storage entry let storage = { let mut map = HashMap::new(); @@ -345,18 +542,11 @@ where // but non-root stores should not be included in the snapshot storage.0.retain(|_slot, stores| !stores.is_empty()); - let version: u64 = deserialize_from(&mut stream) - .map_err(|e| format!("write version deserialize error: {}", e.to_string())) - .map_err(accountsdb_to_io_error)?; - - let (slot, bank_hash): (Slot, BankHashInfo) = deserialize_from(&mut stream) - .map_err(|e| format!("bank hashes deserialize error: {}", e.to_string())) - .map_err(accountsdb_to_io_error)?; accounts_db .bank_hashes .write() .unwrap() - .insert(slot, bank_hash); + .insert(slot, bank_hash_info); // Process deserialized data, set necessary fields in self let max_id: usize = *storage @@ -379,59 +569,19 @@ where Ok(accounts_db) } -struct SerializableAccountsDatabaseUnversioned<'a, C> { +pub struct SerializableAccountsDB<'a, C> { accounts_db: &'a AccountsDB, slot: Slot, account_storage_entries: &'a [SnapshotStorage], phantom: std::marker::PhantomData, } -impl<'a, C: SerdeContext<'a>> Serialize for SerializableAccountsDatabaseUnversioned<'a, C> { +impl<'a, C: SerdeContext<'a>> Serialize for SerializableAccountsDB<'a, C> { fn serialize(&self, serializer: S) -> std::result::Result where S: serde::ser::Serializer, { - // sample write version before serializing storage entries - let version = self.accounts_db.write_version.load(Ordering::Relaxed); - - let mut seq = serializer.serialize_tuple(C::legacy_or_zero(1) + 3)?; - - // Possibly write out a dummy vector length for backward compatibility - C::legacy_serialize_byte_length(&mut seq, MAX_ACCOUNTS_DB_STREAM_SIZE)?; - - // write the list of account storage entry lists out as a map - seq.serialize_element( - &(self - .account_storage_entries - .iter() - .map(|x| { - ( - x.first().unwrap().slot, - x.iter() - .map(|x| C::SerializableAccountStorageEntry::from(x.as_ref())) - .collect::>(), - ) - }) - .collect::>()), - )?; - - // write the current write version sampled before the account - // storage entries were written out - seq.serialize_element(&version)?; - - // write out bank hashes - seq.serialize_element(&( - self.slot, - &*self - .accounts_db - .bank_hashes - .read() - .unwrap() - .get(&self.slot) - .unwrap_or_else(|| panic!("No bank_hashes entry for slot {}", self.slot)), - ))?; - - seq.end() + C::serialize_accounts_db_fields(serializer, self) } }