Skip to content

Commit

Permalink
fix(meta): metastore availability improvements
Browse files Browse the repository at this point in the history
- `zstor monitor` not failed when the  metastore failed
- `zstor store` check metastore.writetable status before storing the data
  • Loading branch information
iwanbk committed Nov 14, 2024
1 parent 6d8698b commit b81264c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
19 changes: 17 additions & 2 deletions zstor/src/actors/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ pub struct DeleteFailure {
pub fm: FailureMeta,
}

#[derive(Message)]
#[rtype(result = "bool")]
/// Message to check if the metastore is writable.
pub struct CheckWritable;

#[derive(Message)]
#[rtype(result = "Result<Vec<FailureMeta>, MetaStoreError>")]
/// Message for retrieving all [`FailureMeta`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`].
Expand Down Expand Up @@ -111,10 +116,11 @@ pub struct MetaStoreActor {

impl MetaStoreActor {
/// Create a new [`MetaStoreActor`] from a given [`MetaStore`].
pub fn new(meta_store: Box<dyn MetaStore>) -> MetaStoreActor {
pub fn new(meta_store: Box<dyn MetaStore>, writeable: bool) -> MetaStoreActor {
log::info!("metastore actor writeable: {}", writeable);
Self {
meta_store: Arc::from(meta_store),
writeable: true,
writeable,
}
}
}
Expand Down Expand Up @@ -221,6 +227,15 @@ impl Handler<SaveFailure> for MetaStoreActor {
}
}

impl Handler<CheckWritable> for MetaStoreActor {
type Result = ResponseFuture<bool>;

fn handle(&mut self, _: CheckWritable, _: &mut Self::Context) -> Self::Result {
let writeable = self.writeable;
Box::pin(async move { writeable })
}
}

impl Handler<DeleteFailure> for MetaStoreActor {
type Result = ResponseFuture<Result<(), MetaStoreError>>;

Expand Down
11 changes: 10 additions & 1 deletion zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::actors::{
config::{ConfigActor, GetConfig},
meta::{LoadMeta, LoadMetaByKey, MetaStoreActor, SaveMeta, SaveMetaByKey},
meta::{CheckWritable, LoadMeta, LoadMetaByKey, MetaStoreActor, SaveMeta, SaveMetaByKey},
metrics::{MetricsActor, ZstorCommandFinsihed, ZstorCommandId},
pipeline::{PipelineActor, RebuildData, RecoverFile, StoreFile},
};
Expand Down Expand Up @@ -138,8 +138,17 @@ impl Handler<Store> for ZstorActor {
let pipeline = self.pipeline.clone();
let config = self.cfg.clone();
let meta = self.meta.clone();

AtomicResponse::new(Box::pin(
async move {
let meta_writeable = meta.send(CheckWritable).await.unwrap();
if !meta_writeable {
return Err(ZstorError::new_io(
"Metastore is not writable".to_string(),
std::io::Error::from(std::io::ErrorKind::PermissionDenied),
));
}

let ft = fs::metadata(&msg.file)
.await
.map_err(|e| ZstorError::new_io("Could not load file metadata".into(), e))?
Expand Down
10 changes: 6 additions & 4 deletions zstor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use compression::CompressorError;
use config::ConfigError;
use encryption::EncryptionError;
use erasure::EncodingError;
use futures::future::try_join_all;
use futures::future::join_all;
use meta::MetaStoreError;
use std::fmt;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -63,17 +63,18 @@ pub type ZstorResult<T> = Result<T, ZstorError>;
pub async fn setup_system(cfg_path: PathBuf, cfg: &Config) -> ZstorResult<Addr<ZstorActor>> {
let metastore = match cfg.meta() {
Meta::Zdb(zdb_cfg) => {
let backends = try_join_all(
let backends = join_all(
zdb_cfg
.backends()
.iter()
.map(|ci| UserKeyZdb::new(ci.clone())),
)
.await?;
.await;
let encryptor = match cfg.encryption() {
Encryption::Aes(key) => encryption::AesGcm::new(key.clone()),
};
let encoder = zdb_cfg.encoder();
let backends = backends.into_iter().filter_map(|res| res.ok()).collect();
Box::new(ZdbMetaStore::new(
backends,
encoder,
Expand All @@ -88,7 +89,8 @@ pub async fn setup_system(cfg_path: PathBuf, cfg: &Config) -> ZstorResult<Addr<Z
if let Some(mountpoint) = cfg.zdbfs_mountpoint() {
let _ = ZdbFsStatsProxyActor::new(mountpoint.to_path_buf(), metrics_addr.clone()).start();
}
let meta_addr = MetaStoreActor::new(metastore).start();
let meta_writable = metastore.writable();
let meta_addr = MetaStoreActor::new(metastore, meta_writable).start();
let cfg_addr = ConfigActor::new(cfg_path, cfg.clone()).start();
let pipeline_addr = SyncArbiter::start(1, || PipelineActor);

Expand Down
5 changes: 5 additions & 0 deletions zstor/src/zdb_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ where
}
}

/// writable returns true if the metastore is writable
pub fn writable(&self) -> bool {
self.writeable
}

/// helper functions to encrypt and write data to backends.
async fn write_value(&self, key: &str, value: &[u8]) -> ZdbMetaStoreResult<()> {
debug!("Writing data to zdb metastore");
Expand Down

0 comments on commit b81264c

Please sign in to comment.