Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into prepare-arrow-update
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 6, 2024
2 parents 16971c2 + 089b232 commit 6846513
Show file tree
Hide file tree
Showing 115 changed files with 8,260 additions and 4,091 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ parquet = { version = "52.0.0", default-features = false, features = [
] }
rand = "0.8"
regex = "1.8"
rstest = "0.20.0"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.45.0", features = ["visitor"] }
tempfile = "3"
Expand Down
10 changes: 8 additions & 2 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ Create / download a specific dataset (TPCH)

Data is placed in the `data` subdirectory.

## Select join algorithm
The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm.
To run TPCH benchmarks with join other than HASH:
```shell
PREFER_HASH_JOIN=false ./bench.sh run tpch
```

## Comparing performance of main and a branch

```shell
Expand Down Expand Up @@ -177,7 +184,6 @@ The benchmark program also supports CSV and Parquet input file formats and a uti
```bash
cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet
```

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

### Comparing results between runs
Expand Down Expand Up @@ -261,7 +267,7 @@ SUBCOMMANDS:

# Benchmarks

The output of `dfbench` help includes a descripion of each benchmark, which is reproducedd here for convenience
The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience

## ClickBench

Expand Down
58 changes: 20 additions & 38 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..}
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}

usage() {
echo "
Expand All @@ -52,8 +53,8 @@ Examples:
# Create the datasets for all benchmarks in $DATA_DIR
./bench.sh data
# Run the 'tpch' benchmark on the datafusion checkout in /source/arrow-datafusion
DATAFUSION_DIR=/source/arrow-datafusion ./bench.sh run tpch
# Run the 'tpch' benchmark on the datafusion checkout in /source/datafusion
DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch
**********
* Commands
Expand All @@ -66,9 +67,9 @@ compare: Compares results from benchmark runs
* Benchmarks
**********
all(default): Data/Run/Compare for all benchmarks
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table
tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), single parquet file per table, hash join
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
Expand All @@ -79,10 +80,11 @@ clickbench_extended: ClickBench "inspired" queries against a single parquet (
**********
* Supported Configuration (Environment Variables)
**********
DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm(default true)
"
exit 1
}
Expand Down Expand Up @@ -129,6 +131,7 @@ main() {
echo "BENCHMARK: ${BENCHMARK}"
echo "DATA_DIR: ${DATA_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}"
echo "***************************"
case "$BENCHMARK" in
all)
Expand Down Expand Up @@ -183,6 +186,7 @@ main() {
echo "DATA_DIR: ${DATA_DIR}"
echo "RESULTS_DIR: ${RESULTS_DIR}"
echo "CARGO_COMMAND: ${CARGO_COMMAND}"
echo "PREFER_HASH_JOIN": ${PREFER_HASH_JOIN}
echo "***************************"

# navigate to the appropriate directory
Expand Down Expand Up @@ -213,12 +217,6 @@ main() {
tpch_mem10)
run_tpch_mem "10"
;;
tpch_smj)
run_tpch_smj "1"
;;
tpch_smj10)
run_tpch_smj "10"
;;
parquet)
run_parquet
;;
Expand Down Expand Up @@ -304,7 +302,7 @@ data_tpch() {
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet
popd > /dev/null
fi
}
Expand All @@ -321,22 +319,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch benchmark with sort merge join
run_tpch_smj() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch SMJ benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
Expand All @@ -352,23 +335,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} -m --format parquet -o ${RESULTS_FILE}
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}


Expand Down Expand Up @@ -422,26 +405,25 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}


compare_benchmarks() {
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"
BRANCH1="$1"
Expand Down
38 changes: 23 additions & 15 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.11"
url = { workspace = true }
uuid = "1.7"

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.28.0", features = ["fs"] }
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ cargo run --example csv_sql
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
Expand Down
5 changes: 4 additions & 1 deletion datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ impl AggregateUDFImpl for GeoMeanUdaf {
true
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
Ok(Box::new(GeometricMeanGroupsAccumulator::new()))
}
}
Expand Down
Loading

0 comments on commit 6846513

Please sign in to comment.