Skip to content

Commit

Permalink
Log query errors through tracing crate
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Jul 23, 2024
1 parent 53264cc commit aafcbb7
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 53 deletions.
41 changes: 33 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ path = "src/main.rs"
[dependencies]
anyhow = "1.0"
base64 = "0.22"
rmp = "0.8.10"
rmp-serde = "1.0.0-beta.2"
chrono = { version = "0.4.18", features = ["serde"] }
rmp = "0.8"
rmp-serde = "1"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["derive", "cargo", "env"] }
console = "0.15.0"
cpu-time = "1.0.0"
Expand All @@ -41,8 +41,8 @@ rune = "0.12"
rust-embed = "8"
scylla = { version = "0.13", features = ["ssl"] }
search_path = "0.1"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
statrs = "0.17"
status-line = "0.2.0"
strum = { version = "0.26", features = ["derive"] }
Expand All @@ -52,6 +52,7 @@ thiserror = "1.0.26"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot", "signal"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = "0.3"
try-lock = "0.2.3"
uuid = { version = "1.1", features = ["v4"] }
Expand Down
40 changes: 15 additions & 25 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ pub struct RunCommand {

#[clap(skip)]
pub cass_version: Option<String>,

#[clap(skip)]
pub id: Option<String>,
}

impl RunCommand {
Expand All @@ -424,31 +427,6 @@ impl RunCommand {
.find(|(k, _)| k == key)
.and_then(|v| v.1.parse().ok())
}

/// Returns benchmark name
pub fn name(&self) -> String {
self.workload
.file_stem()
.unwrap()
.to_string_lossy()
.to_string()
}

/// Suggested file name where to save the results of the run.
pub fn default_output_file_name(&self, extension: &str) -> PathBuf {
let mut components = vec![self.name()];
components.extend(self.cluster_name.iter().map(|x| x.replace(' ', "_")));
components.extend(self.cass_version.iter().cloned());
components.extend(self.tags.iter().cloned());
components.extend(self.rate.map(|r| format!("r{r}")));
components.push(format!("p{}", self.concurrency));
components.push(format!("t{}", self.threads));
components.push(format!("c{}", self.connection.count));
let params = self.params.iter().map(|(k, v)| format!("{k}{v}"));
components.extend(params);
components.push(chrono::Local::now().format("%Y%m%d.%H%M%S").to_string());
PathBuf::from(format!("{}.{extension}", components.join(".")))
}
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -581,6 +559,18 @@ author = "Piotr Kołaczkowski <pkolaczk@datastax.com>",
version = clap::crate_version ! (),
)]
pub struct AppConfig {
/// Name of the log file.
///
/// If not given, the log file name will be created automatically based on the current timestamp.
/// If relative path given, the file will be placed in the directory determined by `log-dir`.
/// The log file will store detailed information about e.g. query errors.
#[clap(long("log-file"))]
pub log_file: Option<PathBuf>,

/// Directory where log files are stored.
#[clap(long("log-dir"), env("LATTE_LOG_DIR"), default_value = ".")]
pub log_dir: PathBuf,

#[clap(subcommand)]
pub command: Command,
}
Expand Down
15 changes: 12 additions & 3 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use scylla::transport::session::PoolSize;
use scylla::{ExecutionProfile, QueryResult, SessionBuilder};
use statrs::distribution::{Normal, Uniform};
use tokio::time::{Duration, Instant};
use tracing::error;
use try_lock::TryLock;
use uuid::{Variant, Version};

Expand Down Expand Up @@ -473,8 +474,11 @@ impl Context {

/// Executes an ad-hoc CQL statement with no parameters. Does not prepare.
pub async fn execute(&self, cql: &str) -> Result<(), CassError> {
let rs = self.execute_inner(|| self.session.query(cql, ())).await;
rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?;
if let Err(err) = self.execute_inner(|| self.session.query(cql, ())).await {
let err = CassError::query_execution_error(cql, &[], err);
error!("{}", err);
return Err(err);
}
Ok(())
}

Expand All @@ -490,7 +494,12 @@ impl Context {
.execute_inner(|| self.session.execute(statement, params.clone()))
.await;

rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), &params, e))?;
if let Err(err) = rs {
let err = CassError::query_execution_error(statement.get_statement(), &params, err);
error!("{}", err);
return Err(err);
}

Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum LatteError {
#[error(display = "Failed to create output file {:?}: {}", _0, _1)]
OutputFileCreate(PathBuf, std::io::Error),

#[error(display = "Failed to create log file {:?}: {}", _0, _1)]
LogFileCreate(PathBuf, std::io::Error),

#[error(display = "Error writing HDR log: {}", _0)]
HdrLogWrite(#[source] IntervalLogWriterError<V2DeflateSerializeError>),

Expand Down
56 changes: 47 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::env;
use std::ffi::OsStr;
use std::fs::File;
use std::io::{stdout, Write};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::time::Duration;
use std::{env, fs};

use clap::Parser;
use config::RunCommand;
Expand All @@ -17,6 +17,8 @@ use rune::Source;
use search_path::SearchPath;
use tokio::runtime::{Builder, Runtime};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing_appender::non_blocking::WorkerGuard;
use walkdir::WalkDir;

use crate::config::{
Expand Down Expand Up @@ -74,8 +76,11 @@ fn load_workload_script(workload: &Path, params: &[(String, String)]) -> Result<
.canonicalize()
.unwrap_or_else(|_| workload.to_path_buf());
eprintln!("info: Loading workload script {}...", workload.display());
let src = Source::from_path(&workload).map_err(|e| LatteError::ScriptRead(workload, e))?;
Program::new(src, params.iter().cloned().collect())
let src =
Source::from_path(&workload).map_err(|e| LatteError::ScriptRead(workload.clone(), e))?;
let program = Program::new(src, params.iter().cloned().collect())?;
info!("Loaded workload script {}", workload.display());
Ok(program)
}

/// Locates the workload and returns an absolute path to it.
Expand Down Expand Up @@ -291,7 +296,7 @@ async fn run(conf: RunCommand) -> Result<()> {
let path = conf
.output
.clone()
.unwrap_or_else(|| conf.default_output_file_name("json"));
.unwrap_or_else(|| PathBuf::from(format!("latte-{}.json", conf.id.as_ref().unwrap())));

let report = Report::new(conf, stats);
match report.save(&path) {
Expand Down Expand Up @@ -443,12 +448,15 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {
Ok(())
}

async fn async_main(command: Command) -> Result<()> {
async fn async_main(run_id: String, command: Command) -> Result<()> {
match command {
Command::Edit(config) => edit(config)?,
Command::Schema(config) => schema(config).await?,
Command::Load(config) => load(config).await?,
Command::Run(config) => run(config).await?,
Command::Run(mut config) => {
config.id = Some(run_id);
run(config).await?
}
Command::List(config) => list(config).await?,
Command::Show(config) => show(config).await?,
Command::Hdr(config) => export_hdr_log(config).await?,
Expand Down Expand Up @@ -488,16 +496,46 @@ fn init_runtime(thread_count: usize) -> std::io::Result<Runtime> {
}
}

fn setup_logging(run_id: &str, config: &AppConfig) -> Result<WorkerGuard> {
let log_file = match &config.log_file {
Some(file) if file.is_absolute() => file.clone(),
Some(file) => config.log_dir.clone().join(file),
None => config.log_dir.join(format!("latte-{}.log", run_id)),
};
fs::create_dir_all(&config.log_dir)
.map_err(|e| LatteError::LogFileCreate(log_file.clone(), e))?;
let log_file = File::create(&log_file).map_err(|e| LatteError::LogFileCreate(log_file, e))?;
let (non_blocking, guard) = tracing_appender::non_blocking(log_file);
tracing_subscriber::fmt()
.with_ansi(false)
.with_writer(non_blocking)
.init();
Ok(guard)
}

fn run_id() -> String {
chrono::Local::now().format("%Y%m%d-%H%M%S").to_string()
}

fn main() {
tracing_subscriber::fmt::init();
let command = AppConfig::parse().command;
let run_id = run_id();
let config = AppConfig::parse();
let _guard = match setup_logging(run_id.as_str(), &config) {
Ok(guard) => guard,
Err(e) => {
eprintln!("error: {e}");
exit(1);
}
};

let command = config.command;
let thread_count = match &command {
Command::Run(cmd) => cmd.threads.get(),
Command::Load(cmd) => cmd.threads.get(),
_ => 1,
};
let runtime = init_runtime(thread_count);
if let Err(e) = runtime.unwrap().block_on(async_main(command)) {
if let Err(e) = runtime.unwrap().block_on(async_main(run_id, command)) {
eprintln!("error: {e}");
exit(128);
}
Expand Down
8 changes: 5 additions & 3 deletions src/plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use plotters::prelude::*;
use plotters_svg::SVGBackend;
use std::collections::BTreeSet;
use std::ops::Range;
use std::path::PathBuf;
use std::process::exit;

#[derive(Eq, PartialEq, Copy, Clone, Debug, Ord, PartialOrd)]
Expand Down Expand Up @@ -142,9 +143,10 @@ pub async fn plot_graph(conf: PlotCommand) -> Result<()> {
}
};

let output_path = conf
.output
.unwrap_or(reports[0].conf.default_output_file_name("svg"));
let output_path = conf.output.unwrap_or(PathBuf::from(format!(
"latte-{}.svg",
reports[0].conf.id.as_ref().unwrap()
)));
let root = SVGBackend::new(&output_path, (2000, 1000)).into_drawing_area();
root.fill(&WHITE).unwrap();

Expand Down

0 comments on commit aafcbb7

Please sign in to comment.