Skip to content

Commit

Permalink
feat: automatically create temp dir on init
Browse files Browse the repository at this point in the history
  • Loading branch information
NINNiT committed Sep 20, 2024
1 parent 2b4e212 commit 005d965
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 81 deletions.
14 changes: 7 additions & 7 deletions apps/xenbakd/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ port = 443
[[storage.local]]
enabled = true
name = "local"
path = "/var/run/sr-mount/a69d1361-c718-d46e-63fd-49cde4db1fca/exports"
compression = "zstd" # gzip, zstd or none
retention = 3 # Number of backups to keep
path = "/mnt/storage/local"
compression = "zstd" # gzip, zstd or none
retention = 3 # Number of backups to keep

# storage handler for local borg repositories (e.g. NFS, CIFS, local filesystem)
[[storage.borg]]
enabled = true
name = "borg"
binary_path = "/usr/bin/borg" # path to the borg binary
temp_dir = "/tmp/xenbakd" # borg needs a temporary directory to store the backup before it is uploaded to the repository
repository = "ssh://root@192.168.100.148:22/root/borgrepo" # path to the borg repository (can be local or remote)
ssh_key_path = "/etc/xenbakd/id_rsa" # (optional) path to the ssh key for remote borg repository, ignored on local
binary_path = "/usr/bin/borg" # path to the borg binary
temp_dir = "/mnt/storage/tmp" # borg needs a temporary directory to store the backup before it is uploaded to the repository
repository = "/mnt/storage/borgrepo" # path to the borg repository (can be local or remote)
#ssh_key_path = "" # (optional) path to the ssh key for remote borg repository, ignored on local
encryption = "none" # repokey-blake2, repokey, keyfile-blake2, keyfile, none
compression = "zstd"
retention = { daily = 2, weekly = 7, monthly = 4, yearly = 1 } # Number of backups to keep
Expand Down
20 changes: 2 additions & 18 deletions apps/xenbakd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,22 @@ pub struct XenbakdCli {
#[clap(short, long)]
pub config: String,
#[clap(subcommand)]
pub subcmd: Option<SubCommand>,
pub subcmd: SubCommand,
}

#[derive(Parser)]
pub enum SubCommand {
#[clap(name = "daemon", about = "Starts the xenbakd daemon")]
Daemon(DaemonSubCommand),
#[clap(name = "init-storage", about = "Initializes storage backends")]
InitStorage(InitalizeStorageSubCommand),
#[clap(name = "dry-run", about = "Runs jobs in dry-run mode")]
DryRun(DryRunSubCommand),
#[clap(name = "run", about = "Runs jobs once")]
Run(RunSubCommand),
}

#[derive(Parser)]
pub struct DaemonSubCommand {}

#[derive(Parser)]
pub struct InitalizeStorageSubCommand {
#[clap(short, long)]
pub storages: Option<Vec<String>>,
}

#[derive(Parser)]
pub struct RunSubCommand {
#[clap(short, long)]
pub jobs: Option<Vec<String>>,
}

#[derive(Parser)]
pub struct DryRunSubCommand {
#[clap(short, long)]
pub jobs: Option<Vec<String>>,
pub jobs: Vec<String>,
}
45 changes: 30 additions & 15 deletions apps/xenbakd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
jobs::{vm_backup::VmBackupJob, XenbakJob},
monitoring::healthchecks::HealthchecksManagementApiTrait,
scheduler::XenbakScheduler,
storage::StorageHandler,
};
use clap::Parser;
use colored::Colorize;
Expand All @@ -39,12 +40,6 @@ async fn main() -> eyre::Result<()> {
// parse cli args
let cli = cli::XenbakdCli::parse();
let config_path = cli.config;

// match clap cli
match cli.subcmd {
_ => {}
}

// load default config, then override/merge using config.toml
let mut config = Figment::from(Serialized::defaults(AppConfig::default()))
.merge(Toml::file(config_path))
Expand All @@ -61,6 +56,7 @@ async fn main() -> eyre::Result<()> {
_ => Level::INFO,
};
let subscriber = tracing_subscriber::fmt::Subscriber::builder()
.with_ansi(false)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_max_level(log_level)
.finish();
Expand Down Expand Up @@ -123,18 +119,37 @@ async fn main() -> eyre::Result<()> {
healthchecks_service,
});

// creating job scheduler and adding jobs
let mut scheduler = XenbakScheduler::new().await;
for job in config.jobs.clone() {
if !job.enabled {
continue;
// match clap cli
match cli.subcmd {
cli::SubCommand::Daemon(_) => {
let mut scheduler = XenbakScheduler::new().await;
for job in config.jobs.clone() {
if !job.enabled {
continue;
}
let backup_job = VmBackupJob::new(global_state.clone(), job.clone());
scheduler.add_job(backup_job, global_state.clone()).await?;
}
// start scheduler
scheduler.start().await;
tokio::signal::ctrl_c().await.unwrap();
}
cli::SubCommand::Run(run) => {
let mut scheduler = XenbakScheduler::new().await;

for job in run.jobs {
let job = config
.jobs
.iter()
.find(|j| j.name == job)
.expect("Given Job not found in config");

let backup_job = VmBackupJob::new(global_state.clone(), job.clone());
scheduler.run_once(backup_job, global_state.clone()).await?;
}
}
let backup_job = VmBackupJob::new(global_state.clone(), job.clone());
scheduler.add_job(backup_job, global_state.clone()).await?;
}

// start scheduler
scheduler.start().await;
tokio::signal::ctrl_c().await.unwrap();

Ok(())
Expand Down
92 changes: 54 additions & 38 deletions apps/xenbakd/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,47 @@ impl XenbakScheduler {
scheduler: JobScheduler::new().await.unwrap(),
}
}

async fn execute_job_with_monitoring<X: XenbakJob + Send + Clone + Sync + 'static>(
job: &mut X,
global_state: Arc<GlobalState>,
) {
let mut monitoring_services: Vec<Arc<dyn MonitoringTrait>> = vec![];
if let Some(mail_service) = global_state.mail_service.clone() {
monitoring_services.push(Arc::new(mail_service) as Arc<dyn MonitoringTrait>);
}
if let Some(healthchecks_service) = global_state.healthchecks_service.clone() {
monitoring_services
.push(Arc::new(healthchecks_service).clone() as Arc<dyn MonitoringTrait>);
}

for service in &monitoring_services {
service.start(job.get_name()).await.unwrap();
}

// run the job
let job_result = job.run().await;
let job_stats = job.get_job_stats();

// send success/failure notification
if let Err(e) = job_result {
error!("{:?}", e);
for service in &monitoring_services {
service
.failure(job_stats.config.name.clone(), job_stats.clone())
.await
.unwrap();
}
} else {
for service in &monitoring_services {
service
.success(job_stats.config.name.clone(), job_stats.clone())
.await
.unwrap();
}
}
}

pub async fn add_job<X: XenbakJob + Send + Clone + Sync + 'static>(
&mut self,
job: X,
Expand All @@ -34,44 +75,7 @@ impl XenbakScheduler {
let mut job = job.clone();
let global_state = global_state.clone();
Box::pin(async move {
let mut monitoring_services: Vec<Arc<dyn MonitoringTrait>> = vec![];
if let Some(mail_service) = global_state.mail_service.clone() {
monitoring_services
.push(Arc::new(mail_service) as Arc<dyn MonitoringTrait>);
}
if let Some(healthchecks_service) =
global_state.healthchecks_service.clone()
{
monitoring_services
.push(Arc::new(healthchecks_service).clone()
as Arc<dyn MonitoringTrait>);
}

for service in &monitoring_services {
service.start(job.get_name()).await.unwrap();
}

// run the job
let job_result = job.run().await;
let job_stats = job.get_job_stats();

// send success/failure notification
if let Err(e) = job_result {
error!("{:?}", e);
for service in &monitoring_services {
service
.failure(job_stats.config.name.clone(), job_stats.clone())
.await
.unwrap();
}
} else {
for service in &monitoring_services {
service
.success(job_stats.config.name.clone(), job_stats.clone())
.await
.unwrap();
}
}
Self::execute_job_with_monitoring(&mut job, global_state).await;
})
},
)?)
Expand All @@ -80,6 +84,18 @@ impl XenbakScheduler {
Ok(())
}

pub async fn run_once<X: XenbakJob + Send + Clone + Sync + 'static>(
&mut self,
job: X,
global_state: Arc<GlobalState>,
) -> eyre::Result<()> {
let span = tracing::span!(tracing::Level::DEBUG, "XenbakScheduler::run_once");
let _enter = span.enter();
info!("Running job '{}' once", job.get_name());
Self::execute_job_with_monitoring(&mut job.clone(), global_state).await;
Ok(())
}

pub async fn start(&mut self) {
self.scheduler.start().await.unwrap();
}
Expand Down
22 changes: 20 additions & 2 deletions apps/xenbakd/src/storage/borg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,20 @@ impl StorageHandler for BorgLocalStorage {
let span = tracing::span!(tracing::Level::DEBUG, "BorgLocalStorage::initialize");
let _enter = span.enter();

let result = async {
let temp_dir_result: eyre::Result<()> = async {
tokio::fs::create_dir_all(&self.storage_config.temp_dir)
.await
.wrap_err("Failed to create temporary directory for borg storage")?;

Ok(())
}
.await;

if let Err(e) = temp_dir_result {
return Err(e);
}

let borg_init_result: eyre::Result<()> = async {
let mut init_cmd = self.borg_base_cmd();
init_cmd.arg("init");

Expand Down Expand Up @@ -198,7 +211,11 @@ impl StorageHandler for BorgLocalStorage {
}
.await;

result
if let Err(e) = borg_init_result {
return Err(e);
}

borg_init_result
}

async fn list(
Expand Down Expand Up @@ -292,6 +309,7 @@ impl StorageHandler for BorgLocalStorage {
let mut temp_file = TempFile::new_in(PathBuf::from(&self.storage_config.temp_dir))
.await
.wrap_err("Failed to create temporary file for borg backup stream")?;

let tempfile_results = async {
debug!(
"Writing export stream to temporary file {}...",
Expand Down
5 changes: 4 additions & 1 deletion apps/xenbakd/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::str::FromStr;

use serde::{Deserialize, Serialize};

use crate::{config::JobConfig, jobs::JobType};
use crate::{
config::{AppConfig, JobConfig},
jobs::JobType,
};

pub mod borg;
pub mod local;
Expand Down

0 comments on commit 005d965

Please sign in to comment.