Skip to content

Commit

Permalink
Merge pull request #66 from threefoldtech/development_add_clone_command
Browse files Browse the repository at this point in the history
Add clone command to move data between stores
  • Loading branch information
rawdaGastan authored Sep 22, 2024
2 parents a6e52b0 + 8cfeeb7 commit ad73f6c
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 0 deletions.
128 changes: 128 additions & 0 deletions rfs/src/clone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::{
cache::Cache,
fungi::{meta::Block, Reader, Result},
store::{BlockStore, Store},
};
use anyhow::Error;
use futures::lock::Mutex;
use hex::ToHex;
use std::sync::Arc;
use tokio::io::AsyncReadExt;

const WORKERS: usize = 10;

pub async fn clone<S: Store>(reader: Reader, store: S, cache: Cache<S>) -> Result<()> {
let failures = Arc::new(Mutex::new(Vec::new()));
let cloner = BlobCloner::new(cache, store.into(), failures.clone());
let mut workers = workers::WorkerPool::new(cloner, WORKERS);

let mut offset = 0;
loop {
if !failures.lock().await.is_empty() {
break;
}
let blocks = reader.all_blocks(1000, offset).await?;
if blocks.is_empty() {
break;
}
for block in blocks {
offset += 1;
let worker = workers.get().await;
worker.send(block)?;
}
}

workers.close().await;
let failures = failures.lock().await;

if failures.is_empty() {
return Ok(());
}

log::error!("failed to clone one or more blocks");
for (block, error) in failures.iter() {
log::error!(" - failed to clone block {}: {}", block, error);
}

Err(crate::fungi::Error::Anyhow(anyhow::anyhow!(
"failed to clone ({}) blocks",
failures.len()
)))
}

struct BlobCloner<S>
where
S: Store,
{
cache: Arc<Cache<S>>,
store: Arc<BlockStore<S>>,
failures: Arc<Mutex<Vec<(String, Error)>>>,
}

impl<S> Clone for BlobCloner<S>
where
S: Store,
{
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
store: self.store.clone(),
failures: self.failures.clone(),
}
}
}

impl<S> BlobCloner<S>
where
S: Store,
{
fn new(
cache: Cache<S>,
store: BlockStore<S>,
failures: Arc<Mutex<Vec<(String, Error)>>>,
) -> Self {
Self {
cache: Arc::new(cache),
store: Arc::new(store),
failures,
}
}
}

#[async_trait::async_trait]
impl<S> workers::Work for BlobCloner<S>
where
S: Store,
{
type Input = Block;
type Output = ();

async fn run(&mut self, block: Self::Input) -> Self::Output {
let mut file = match self.cache.get(&block).await {
Ok((_, f)) => f,
Err(err) => {
self.failures
.lock()
.await
.push((block.id.as_slice().encode_hex(), err));
return;
}
};

let mut data = Vec::new();
if let Err(err) = file.read_to_end(&mut data).await {
self.failures
.lock()
.await
.push((block.id.as_slice().encode_hex(), err.into()));
return;
}
if let Err(err) = self.store.set(&data).await {
self.failures
.lock()
.await
.push((block.id.as_slice().encode_hex(), err.into()));
return;
}
}
}
10 changes: 10 additions & 0 deletions rfs/src/fungi/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ impl Reader {
Ok(results)
}

pub async fn all_blocks(&self, limit: u32, offset: u64) -> Result<Vec<Block>> {
let results: Vec<Block> = sqlx::query_as("select id, key from block limit ? offset ?;")
.bind(limit)
.bind(offset as i64)
.fetch_all(&self.pool)
.await?;

Ok(results)
}

pub async fn tag(&self, tag: Tag<'_>) -> Result<Option<String>> {
let value: Option<(String,)> = sqlx::query_as("select value from tag where key = ?;")
.bind(tag.key())
Expand Down
2 changes: 2 additions & 0 deletions rfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ mod pack;
pub use pack::pack;
mod unpack;
pub use unpack::unpack;
mod clone;
pub use clone::clone;
pub mod config;

const PARALLEL_UPLOAD: usize = 10; // number of files we can upload in parallel
Expand Down
36 changes: 36 additions & 0 deletions rfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ enum Commands {
Pack(PackOptions),
/// unpack (downloads) content of an FL the provided location
Unpack(UnpackOptions),
/// clone copies the data from the stores of an FL to another stores
Clone(CloneOptions),
/// list or modify FL metadata and stores
Config(ConfigOptions),
}
Expand Down Expand Up @@ -97,6 +99,22 @@ struct UnpackOptions {
target: String,
}

#[derive(Args, Debug)]
struct CloneOptions {
/// path to metadata file (flist)
#[clap(short, long)]
meta: String,

/// store url in the format [xx-xx=]<url>. the range xx-xx is optional and used for
/// sharding. the URL is per store type, please check docs for more information
#[clap(short, long, action=ArgAction::Append)]
store: Vec<String>,

/// directory used as cache for downloaded file chunks
#[clap(short, long, default_value_t = String::from("/tmp/cache"))]
cache: String,
}

#[derive(Args, Debug)]
struct ConfigOptions {
/// path to metadata file (flist)
Expand Down Expand Up @@ -199,6 +217,7 @@ fn main() -> Result<()> {
Commands::Mount(opts) => mount(opts),
Commands::Pack(opts) => pack(opts),
Commands::Unpack(opts) => unpack(opts),
Commands::Clone(opts) => clone(opts),
Commands::Config(opts) => config(opts),
}
}
Expand Down Expand Up @@ -326,6 +345,23 @@ async fn get_router(meta: &fungi::Reader) -> Result<Router<Stores>> {
Ok(router)
}

fn clone(opts: CloneOptions) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;

rt.block_on(async move {
let store = store::parse_router(opts.store.as_slice()).await?;
let meta = fungi::Reader::new(opts.meta)
.await
.context("failed to initialize metadata database")?;

let router = get_router(&meta).await?;

let cache = cache::Cache::new(opts.cache, router);
rfs::clone(meta, store, cache).await?;

Ok(())
})
}
fn config(opts: ConfigOptions) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;

Expand Down

0 comments on commit ad73f6c

Please sign in to comment.