Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add -o option to all e2e benches #5658

Merged
merged 7 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion::datasource::listing::{
use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::BenchmarkRun;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
Expand All @@ -51,6 +52,9 @@ struct GroupBy {
/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,
/// Path to machine readable output file
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
}

#[tokio::main]
Expand All @@ -63,6 +67,7 @@ async fn main() -> Result<()> {
}

async fn group_by(opt: &GroupBy) -> Result<()> {
let mut rundata = BenchmarkRun::new();
let path = opt.path.to_str().unwrap();
let mut config = ConfigOptions::from_env()?;
config.execution.batch_size = 65535;
Expand Down Expand Up @@ -94,7 +99,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
ctx.register_csv("x", path, CsvReadOptions::default().schema(&schema))
.await?;
}

rundata.start_new_case(&opt.query.to_string());
let sql = match opt.query {
1 => "select id1, sum(v1) as v1 from x group by id1",
2 => "select id1, id2, sum(v1) as v1 from x group by id1, id2",
Expand All @@ -113,13 +118,17 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
let start = Instant::now();
let df = ctx.sql(sql).await?;
let batches = df.collect().await?;
let elapsed = start.elapsed().as_millis();

let elapsed = start.elapsed();
let numrows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jaylmiller for looking into this.

Noticed for other testcases you calc numrows before elapsed, perhaps to prevent numrows runtime to be part of benchmark runtime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Good catch thank youi... was a mistake by me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing I'm thinking is can it be calculating num rows triggers some system cache and benchmark will run faster, alhough its unexpected

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think num_rows is pretty fast (it doesn't actually do any work , it just returns a field's value): https://docs.rs/arrow-array/35.0.0/src/arrow_array/record_batch.rs.html#278

if opt.debug {
pretty::print_batches(&batches)?;
}

println!("h2o groupby query {} took {} ms", opt.query, elapsed);

rundata.write_iter(elapsed, numrows);
println!(
"h2o groupby query {} took {} ms",
opt.query,
elapsed.as_secs_f64() * 1000.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

);
rundata.maybe_write_json(opt.output_path.as_ref())?;
Ok(())
}
36 changes: 25 additions & 11 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::error::Result;
use datafusion::execution::context::{SessionConfig, SessionContext};

use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
use datafusion_benchmarks::BenchmarkRun;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -61,6 +62,10 @@ struct Opt {
/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,

/// Path to machine readable output file
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
}

#[tokio::main]
Expand Down Expand Up @@ -91,42 +96,51 @@ async fn main() -> Result<()> {
}
}

datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await
datafusion_sql_benchmarks(&mut ctx, opt).await
}

async fn datafusion_sql_benchmarks(
ctx: &mut SessionContext,
iterations: usize,
debug: bool,
) -> Result<()> {
async fn datafusion_sql_benchmarks(ctx: &mut SessionContext, opt: Opt) -> Result<()> {
let iterations = opt.iterations;
let debug = opt.debug;
let output = opt.output_path;
let mut rundata = BenchmarkRun::new();
let mut queries = HashMap::new();
queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count");
for (name, sql) in &queries {
println!("Executing '{name}'");
rundata.start_new_case(name);
for i in 0..iterations {
let start = Instant::now();
execute_sql(ctx, sql, debug).await?;
let (rows, elapsed) = execute_sql(ctx, sql, debug).await?;
println!(
"Query '{}' iteration {} took {} ms",
name,
i,
start.elapsed().as_millis()
elapsed.as_secs_f64() * 1000.0
);
rundata.write_iter(elapsed, rows);
}
}
rundata.maybe_write_json(output.as_ref())?;
Ok(())
}

async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<()> {
async fn execute_sql(
ctx: &SessionContext,
sql: &str,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let dataframe = ctx.sql(sql).await?;
if debug {
println!("Optimized logical plan:\n{:?}", dataframe.logical_plan());
}
let result = dataframe.collect().await?;
let elapsed = start.elapsed();
if debug {
pretty::print_batches(&result)?;
}
Ok(())
let rowcount = result.iter().map(|b| b.num_rows()).sum();
Ok((rowcount, elapsed))
}

fn nyctaxi_schema() -> Schema {
Expand Down
114 changes: 68 additions & 46 deletions benchmarks/src/bin/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_benchmarks::BenchmarkRun;
use parquet::file::properties::WriterProperties;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -72,6 +73,10 @@ struct Opt {
/// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 1GB parquet file
#[structopt(short = "s", long = "scale-factor", default_value = "1.0")]
scale_factor: f32,

/// Path to machine readable output file
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
}
impl Opt {
/// Initialize parquet test file given options.
Expand Down Expand Up @@ -114,6 +119,7 @@ async fn main() -> Result<()> {

async fn run_sort_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> {
use datafusion::physical_expr::expressions::col;
let mut rundata = BenchmarkRun::new();
let schema = test_file.schema();
let sort_cases = vec![
(
Expand Down Expand Up @@ -195,22 +201,30 @@ async fn run_sort_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()
];
for (title, expr) in sort_cases {
println!("Executing '{title}' (sorting by: {expr:?})");
rundata.start_new_case(title);
for i in 0..opt.iterations {
let config = SessionConfig::new().with_target_partitions(opt.partitions);
let ctx = SessionContext::with_config(config);
let start = Instant::now();
exec_sort(&ctx, &expr, test_file, opt.debug).await?;
println!(
"Iteration {} finished in {} ms",
i,
start.elapsed().as_millis()
);
let (rows, elapsed) = exec_sort(&ctx, &expr, test_file, opt.debug).await?;
let ms = elapsed.as_secs_f64() * 1000.0;
println!("Iteration {i} finished in {ms} ms");
rundata.write_iter(elapsed, rows);
}
println!("\n");
}
if let Some(path) = &opt.output_path {
std::fs::write(path, rundata.to_json())?;
}
Ok(())
}
fn parquet_scan_disp(opts: &ParquetScanOptions) -> String {
format!(
"pushdown_filters={}, reorder_filters={}, page_index={}",
opts.pushdown_filters, opts.reorder_filters, opts.enable_page_index
)
}
async fn run_filter_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> {
let mut rundata = BenchmarkRun::new();
let scan_options_matrix = vec![
ParquetScanOptions {
pushdown_filters: false,
Expand All @@ -230,54 +244,57 @@ async fn run_filter_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<
];

let filter_matrix = vec![
// Selective-ish filter
col("request_method").eq(lit("GET")),
// Non-selective filter
col("request_method").not_eq(lit("GET")),
// Basic conjunction
col("request_method")
.eq(lit("POST"))
.and(col("response_status").eq(lit(503_u16))),
// Nested filters
col("request_method").eq(lit("POST")).and(or(
col("response_status").eq(lit(503_u16)),
col("response_status").eq(lit(403_u16)),
)),
// Many filters
disjunction([
("Selective-ish filter", col("request_method").eq(lit("GET"))),
(
"Non-selective filter",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice to add the details into the output file

col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
col("service").eq(lit("backend")),
])
.unwrap(),
// Filter everything
col("response_status").eq(lit(429_u16)),
// Filter nothing
col("response_status").gt(lit(0_u16)),
),
(
"Basic conjunction",
col("request_method")
.eq(lit("POST"))
.and(col("response_status").eq(lit(503_u16))),
),
(
"Nested filters",
col("request_method").eq(lit("POST")).and(or(
col("response_status").eq(lit(503_u16)),
col("response_status").eq(lit(403_u16)),
)),
),
(
"Many filters",
disjunction([
col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
col("service").eq(lit("backend")),
])
.unwrap(),
),
("Filter everything", col("response_status").eq(lit(429_u16))),
("Filter nothing", col("response_status").gt(lit(0_u16))),
];

for filter_expr in &filter_matrix {
println!("Executing with filter '{filter_expr}'");
for (name, filter_expr) in &filter_matrix {
println!("Executing '{name}' (filter: {filter_expr})");
for scan_options in &scan_options_matrix {
println!("Using scan options {scan_options:?}");
rundata
.start_new_case(&format!("{name}: {}", parquet_scan_disp(scan_options)));
for i in 0..opt.iterations {
let start = Instant::now();

let config = scan_options.config().with_target_partitions(opt.partitions);
let ctx = SessionContext::with_config(config);

let rows =
let (rows, elapsed) =
exec_scan(&ctx, test_file, filter_expr.clone(), opt.debug).await?;
println!(
"Iteration {} returned {} rows in {} ms",
i,
rows,
start.elapsed().as_millis()
);
let ms = elapsed.as_secs_f64() * 1000.0;
println!("Iteration {} returned {} rows in {ms} ms", i, rows);
rundata.write_iter(elapsed, rows);
}
}
println!("\n");
}
rundata.maybe_write_json(opt.output_path.as_ref())?;
Ok(())
}

Expand All @@ -286,32 +303,37 @@ async fn exec_scan(
test_file: &TestParquetFile,
filter: Expr,
debug: bool,
) -> Result<usize> {
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let exec = test_file.create_scan(Some(filter)).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;

let elapsed = start.elapsed();
if debug {
pretty::print_batches(&result)?;
}
Ok(result.iter().map(|b| b.num_rows()).sum())
let rows = result.iter().map(|b| b.num_rows()).sum();
Ok((rows, elapsed))
}

async fn exec_sort(
ctx: &SessionContext,
expr: &[PhysicalSortExpr],
test_file: &TestParquetFile,
debug: bool,
) -> Result<()> {
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(None).await?;
let exec = Arc::new(SortExec::try_new(expr.to_owned(), scan, None)?);
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
if debug {
pretty::print_batches(&result)?;
}
Ok(())
let rows = result.iter().map(|b| b.num_rows()).sum();
Ok((rows, elapsed))
}

fn gen_data(
Expand Down
Loading