Skip to content

Commit

Permalink
Create database schema in its own command
Browse files Browse the repository at this point in the history
The schema code is now executed only with:
latte schema <workload.rn>

The other commands no longer run the schema code.

Fixes #26
  • Loading branch information
pkolaczk committed Jan 7, 2022
1 parent 9bf4111 commit 456ff69
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 28 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Latte is still early stage software under intensive development.
Start a Cassandra cluster somewhere (can be a local node). Then run:

```shell
latte schema <workload.rn> [<node address>] # create the database schema
latte load <workload.rn> [<node address>] # populate the database with data
latte run <workload.rn> [<node address>] # execute the workload and measure the performance
```
Expand Down Expand Up @@ -124,13 +125,16 @@ Instance functions on `ctx` are asynchronous, so you should call `await` on them

### Schema creation

You can create your own keyspaces and tables in the `schema` function:
You can (re)create your own keyspaces and tables needed by the benchmark in the `schema` function.
The `schema` function should also drop the old schema if present.
The `schema` function is executed by running `latte schema` command.

```rust
pub async fn schema(ctx) {
ctx.execute("CREATE KEYSPACE IF NOT EXISTS test \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }").await?;
ctx.execute("CREATE TABLE IF NOT EXISTS test.test(id bigint, data varchar").await?;
ctx.execute("DROP TABLE IF NOT EXISTS test.test").await?;
ctx.execute("CREATE TABLE test.test(id bigint, data varchar)").await?;
}
```

Expand Down
36 changes: 32 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ pub struct ConnectionConf {
pub addresses: Vec<String>,
}

#[derive(Parser, Debug, Serialize, Deserialize)]
#[clap(
setting(AppSettings::NextLineHelp),
setting(AppSettings::DeriveDisplayOrder)
)]
pub struct SchemaCommand {
/// Parameter values passed to the workload, accessible through param! macro.
#[clap(short('P'), parse(try_from_str = parse_key_val),
number_of_values = 1, multiple_occurrences = true)]
pub params: Vec<(String, String)>,

/// Path to the workload definition file.
#[clap(name = "workload", required = true, value_name = "PATH")]
pub workload: PathBuf,

// Cassandra connection settings.
#[clap(flatten)]
pub connection: ConnectionConf,
}

#[derive(Parser, Debug, Serialize, Deserialize)]
#[clap(
setting(AppSettings::NextLineHelp),
Expand Down Expand Up @@ -271,7 +291,17 @@ pub struct HdrCommand {
#[derive(Parser, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Command {
/// Generates the data needed for the benchmark (typically needed by read benchmarks).
/// Creates the database schema by invoking the `schema` function of the workload script.
///
/// The function should remove the old schema if present.
/// Calling this is likely to remove data from the database.
Schema(SchemaCommand),

/// Erases and generates fresh data needed for the benchmark by invoking the `erase` and `load`
/// functions of the workload script.
///
/// Running this command is typically needed by read benchmarks.
/// You need to create the schema before.
Load(LoadCommand),

/// Runs the benchmark.
Expand All @@ -285,13 +315,11 @@ pub enum Command {
/// Can compare two runs.
Show(ShowCommand),

/// Exports call- and response-time histograms as a compressed HDR interval log.
/// Exports histograms as a compressed HDR interval log.
///
/// To be used with HdrHistogram (https://github.com/HdrHistogram/HdrHistogram).
/// Timestamps are given in seconds since Unix epoch.
/// Response times are recorded in nanoseconds.
///
/// Each histogram is tagged by the benchmark name, parameters and benchmark tags.
Hdr(HdrCommand),
}

Expand Down
40 changes: 23 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio::runtime::{Builder, Runtime};
use config::RunCommand;

use crate::config::{
AppConfig, Command, ConnectionConf, HdrCommand, Interval, LoadCommand, ShowCommand,
AppConfig, Command, ConnectionConf, HdrCommand, Interval, LoadCommand, SchemaCommand,
ShowCommand,
};
use crate::context::*;
use crate::context::{CassError, CassErrorKind, Context, SessionStats};
Expand Down Expand Up @@ -105,17 +106,29 @@ async fn connect(conf: &ConnectionConf) -> Result<(Context, Option<ClusterInfo>)
Ok((session, cluster_info))
}

async fn load(conf: LoadCommand) -> Result<()> {
/// Runs the `schema` function of the workload script.
/// Exits with error if the `schema` function is not present or fails.
async fn schema(conf: SchemaCommand) -> Result<()> {
let mut program = load_workload_script(&conf.workload, &conf.params)?;
let (mut session, _) = connect(&conf.connection).await?;

if program.has_schema() {
eprintln!("info: Creating schema...");
if let Err(e) = program.schema(&mut session).await {
eprintln!("error: Failed to create schema: {}", e);
exit(255);
}
if !program.has_schema() {
eprintln!("error: Function `schema` not found in the workload script.");
exit(255);
}
eprintln!("info: Creating schema...");
if let Err(e) = program.schema(&mut session).await {
eprintln!("error: Failed to create schema: {}", e);
exit(255);
}
eprintln!("info: Schema created successfully");
Ok(())
}

/// Loads the data into the database.
/// Exits with error if the `load` function is not present or fails.
async fn load(conf: LoadCommand) -> Result<()> {
let mut program = load_workload_script(&conf.workload, &conf.params)?;
let (mut session, _) = connect(&conf.connection).await?;

if program.has_prepare() {
eprintln!("info: Preparing...");
Expand Down Expand Up @@ -184,14 +197,6 @@ async fn run(conf: RunCommand) -> Result<()> {
conf.cass_version = Some(cluster_info.cassandra_version);
}

if program.has_schema() {
eprintln!("info: Creating schema...");
if let Err(e) = program.schema(&mut session).await {
eprintln!("error: Failed to create schema: {}", e);
exit(255);
}
}

if program.has_prepare() {
eprintln!("info: Preparing...");
if let Err(e) = program.prepare(&mut session).await {
Expand Down Expand Up @@ -347,6 +352,7 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {

async fn async_main(command: Command) -> Result<()> {
match command {
Command::Schema(config) => schema(config).await?,
Command::Load(config) => load(config).await?,
Command::Run(config) => run(config).await?,
Command::Show(config) => show(config).await?,
Expand Down
3 changes: 2 additions & 1 deletion workloads/basic/read.rn
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const TABLE = "basic";
pub async fn schema(ctx) {
ctx.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
ctx.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
ctx.execute(`DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
ctx.execute(`CREATE TABLE ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(ctx) {
Expand Down
3 changes: 2 additions & 1 deletion workloads/basic/write-blob.rn
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const TABLE = "blob";
pub async fn schema(db) {
db.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
db.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY, data BLOB)`).await?;
db.execute(`DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
db.execute(`CREATE TABLE ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY, data BLOB)`).await?;
}

pub async fn erase(db) {
Expand Down
3 changes: 2 additions & 1 deletion workloads/basic/write.rn
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const TABLE = "basic";
pub async fn schema(db) {
db.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
db.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
db.execute(`DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
db.execute(`CREATE TABLE ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(db) {
Expand Down
4 changes: 3 additions & 1 deletion workloads/sai/new/common.rn
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ pub async fn init_schema(db) {
CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE}
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }`).await?;
db.execute(`
CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE} (
DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
db.execute(`
CREATE TABLE ${KEYSPACE}.${TABLE} (
par_id bigint,
row_id uuid,
time1 timestamp,
Expand Down
4 changes: 3 additions & 1 deletion workloads/sai/orig/lib.rn
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub async fn schema(ctx) {
CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE}
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }`).await?;
ctx.execute(`
CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE} (
DROP TABLE IF EXISTS ${KEYSPACE}.${TABLE}`).await?;
ctx.execute(`
CREATE TABLE ${KEYSPACE}.${TABLE} (
id uuid,
time timestamp,
value int,
Expand Down

0 comments on commit 456ff69

Please sign in to comment.