Skip to content

Commit

Permalink
Improve data reconstruction flow
Browse files Browse the repository at this point in the history
Assign shards to their correct slots immediately instead of remembering
their index and then sorting them
Fix #29: Harden reconstruction in case of missing shard metadata
Close #30: Verify shard checksum before reconstructing to allow
identification of corrupt shards

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
  • Loading branch information
LeeSmet committed Mar 17, 2021
1 parent 7e7051d commit ed56bbe
Showing 1 changed file with 23 additions and 12 deletions.
35 changes: 23 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use blake2::{digest::VariableOutput, VarBlake2b};
use futures::future::{join_all, try_join_all};
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use std::convert::TryInto;
use std::fs::File;
use std::io::{Cursor, Read};
Expand All @@ -10,7 +10,7 @@ use tokio::task::JoinHandle;
use zstor_v2::compression::{Compressor, Snappy};
use zstor_v2::config::{Config, Meta};
use zstor_v2::encryption::{Encryptor, AESGCM};
use zstor_v2::erasure::Encoder;
use zstor_v2::erasure::{Encoder, Shard};
use zstor_v2::etcd::Etcd;
use zstor_v2::meta::{Checksum, MetaData, ShardInfo, CHECKSUM_LENGTH};
use zstor_v2::zdb::{Zdb, ZdbError, ZdbResult};
Expand Down Expand Up @@ -389,7 +389,7 @@ fn real_main() -> ZstorResult<()> {

async fn recover_data(metadata: &MetaData) -> ZstorResult<Vec<u8>> {
// attempt to retrieve all shards
let mut shard_loads: Vec<JoinHandle<(usize, Result<_, ZstorError>)>> =
let mut shard_loads: Vec<JoinHandle<(usize, Result<(_, _), ZstorError>)>> =
Vec::with_capacity(metadata.shards().len());
for si in metadata.shards().iter().cloned() {
shard_loads.push(tokio::spawn(async move {
Expand All @@ -399,7 +399,7 @@ async fn recover_data(metadata: &MetaData) -> ZstorResult<Vec<u8>> {
};
match db.get(si.key()).await {
Ok(potential_shard) => match potential_shard {
Some(shard) => (si.index(), Ok(shard)),
Some(shard) => (si.index(), Ok((shard, *si.checksum()))),
None => (
si.index(),
// TODO: Proper error here?
Expand All @@ -414,17 +414,28 @@ async fn recover_data(metadata: &MetaData) -> ZstorResult<Vec<u8>> {
}));
}

let mut indexed_shards: Vec<(usize, Option<Vec<u8>>)> = Vec::with_capacity(shard_loads.len());
// Since this is the amount of actual shards needed to pass to the encoder, we calculate the
// amount we will have from the amount of parity and data shards. Reason is that the `shards()`
// might not have all data shards, due to a bug on our end, or later in case we allow for
// degraded writes.
let mut shards: Vec<Option<Vec<u8>>> =
vec![None; metadata.data_shards() + metadata.parity_shards()];
for shard_info in join_all(shard_loads).await {
let (idx, shard) = shard_info?;
indexed_shards.push((idx, shard.ok())); // don't really care about errors here
match shard {
Err(e) => warn!("could not download shard {}: {}", idx, e),
Ok((raw_shard, saved_checksum)) => {
let shard = Shard::from(raw_shard);
let checksum = shard.checksum();
if saved_checksum != checksum {
warn!("shard {} checksum verification failed", idx);
continue;
}
shards[idx] = Some(shard.into_inner());
}
}
}

// sort the shards
indexed_shards.sort_by(|(a, _), (b, _)| a.cmp(b));

let shards = indexed_shards.into_iter().map(|(_, shard)| shard).collect();

let encoder = Encoder::new(metadata.data_shards(), metadata.parity_shards());
let decoded = encoder.decode(shards)?;

Expand Down Expand Up @@ -459,7 +470,7 @@ async fn store_data(data: Vec<u8>, checksum: Checksum, cfg: &Config) -> ZstorRes
let ns_info = db.ns_info().await?;
match ns_info.free_space() {
insufficient if insufficient < shard_len => Err(ZdbError::new_storage_size(
db.connection_info().address().clone(),
*db.connection_info().address(),
shard_len,
ns_info.free_space(),
)),
Expand Down

0 comments on commit ed56bbe

Please sign in to comment.