Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tpch: Dask vs PySpark and PySpark, Polars and DuckDB single node #1044

Merged
merged 60 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
a886336
Add tpch benchmark queries
phofl Sep 1, 2023
e394327
Remove anon
phofl Sep 1, 2023
a654c8e
Remove unnecessary files
phofl Sep 1, 2023
ba474eb
Update memory and query
phofl Sep 1, 2023
35d256f
Revert "Remove unnecessary files"
phofl Sep 1, 2023
fe26959
Fix query
phofl Sep 8, 2023
a650024
Update
phofl Sep 8, 2023
65f4457
Update client
phofl Sep 19, 2023
09f8c42
Merge remote-tracking branch 'origin/main' into phofl/tpch
phofl Sep 25, 2023
1b30944
Use compute
phofl Sep 26, 2023
083756e
Add pyspark tpch
milesgranger Sep 28, 2023
ac82ef1
Reduce setup calls and materialize df
milesgranger Sep 30, 2023
4e2285c
Fix bad method calls to plugins on teardown/close
milesgranger Sep 30, 2023
5bb4073
Bump to 12 workers
milesgranger Sep 30, 2023
11539b6
Add idle_timeout=None for scheduler
milesgranger Sep 30, 2023
7fd2ca5
Add retry logic for S3 403 sporadic errors on materialization
milesgranger Sep 30, 2023
bf03130
Resolve conflicts
milesgranger Oct 2, 2023
0dd202f
Make direct dask vs pyspark tpch comparison tests
milesgranger Oct 3, 2023
80d4bdd
DROP COMMIT: Filter for tpch dask vs pyspark tests
milesgranger Oct 3, 2023
b6c58eb
DROP COMMIT: Try again with 20 nodes
milesgranger Oct 3, 2023
600e166
DROP COMMIT: Try again with scale 1000 and 100 nodes
milesgranger Oct 3, 2023
07c8161
Rename scale 1000 s3 directory
milesgranger Oct 4, 2023
31a6547
Fix pyspark executor memory settings
milesgranger Oct 4, 2023
1bd9c28
Add dask-expr
milesgranger Oct 4, 2023
77972b4
Resolve conflicts - merge main
milesgranger Oct 4, 2023
fa11466
Formatting [skip ci]
milesgranger Oct 4, 2023
20dc6f8
Add dask-expr
milesgranger Oct 4, 2023
f5b1fca
Try again with 100 nodes
milesgranger Oct 4, 2023
a3b3769
Refine clearing pyspark session memory between queries
milesgranger Oct 4, 2023
bcf14d8
Formatting
milesgranger Oct 4, 2023
32a3481
Fix date casting in queries for scale 1000
milesgranger Oct 5, 2023
267e1d7
Change test method names under TestTpchDaskVsPySpark
milesgranger Oct 5, 2023
d2d315b
Re-run
milesgranger Oct 5, 2023
08adbdf
rerun
milesgranger Oct 5, 2023
94ffd51
rerun
milesgranger Oct 5, 2023
f036902
Try without AWS env var provider
milesgranger Oct 5, 2023
046e22b
Resolve conflicts - merge main [skip ci]
milesgranger Oct 5, 2023
bebeb22
Refactoring - Still sporadic S3 403 Forbidden [skip ci]
milesgranger Oct 5, 2023
7c2fedb
Fix sporadic S3 Forbidden errors - use EnvVar provider
milesgranger Oct 7, 2023
f1655c8
Handle timestamp[ns/us] diff between scale 100 and 1000
milesgranger Oct 9, 2023
1dd1f94
Initial impl of single VM w/ DuckDB [skip ci]
milesgranger Oct 10, 2023
87fd928
Demo: timing queries
milesgranger Oct 11, 2023
ca5b153
Refactor - bit flatter structure w/ explicit single VM test mod
milesgranger Oct 11, 2023
c3bd971
Add Polars and other refactorings/experiments [skip ci]
milesgranger Oct 11, 2023
4022df0
Add cluster comparision tests and other small fixes [skip ci]
milesgranger Oct 12, 2023
2daa27f
Flag to skip other tpch benchmarks
milesgranger Oct 12, 2023
16b7223
Fix PyTorch Optuna (#1050)
milesgranger Oct 10, 2023
a44785c
Disable task queuing for tpch (#1054)
phofl Oct 11, 2023
c95d1b1
Formatting [skip ci]
milesgranger Oct 12, 2023
78a3d47
Remove debug timing stuff
milesgranger Oct 12, 2023
2b3eddd
Add duckdb and polars to env
milesgranger Oct 12, 2023
c8ac8b1
Remove other debug timing [skip ci]
milesgranger Oct 12, 2023
3b5edc6
Restructure into tpch directory [skip ci]
milesgranger Oct 12, 2023
850f859
Flatten Polars (#1058)
mrocklin Oct 13, 2023
d14e1a9
Flatten duckdb [skip ci]
milesgranger Oct 13, 2023
f308a10
Reuse fixtures in tpch.conftest from test_polars [skip ci]
milesgranger Oct 13, 2023
8e3c684
Remove dask-expr from environment-test.yml [skip ci]
milesgranger Oct 13, 2023
a516003
Remove test_comparison_single_vm
milesgranger Oct 13, 2023
5d6da86
Remove duckdb queries 8-22 [skip ci]
milesgranger Oct 13, 2023
a0b2203
Remove pyspark queries 8-22 [skip ci]
milesgranger Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
channels:
- conda-forge
dependencies:
- python >=3.9
- conda
- python =3.11
- pyspark ==3.4.1
- openjdk ==20.0.2
- python-duckdb ==0.9.1
- polars ==0.19.8
- pip
- coiled >=0.2.54
- numpy ==1.24.4
Expand Down
21 changes: 21 additions & 0 deletions cluster_kwargs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,30 @@ uber_lyft_large:
n_workers: 50
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)

tpch_pyspark:
n_workers: 20
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)
# Keep the same, as spark.driver/executor.memory calculated from scheduler size
# pyspark doesn't automatically calculate available memory for executors,
# so we do it in pyspark_queries.utils, but this takes place on the scheduler
scheduler_vm_types: [m6i.xlarge]
scheduler_options:
idle_timeout: null # Scheduler won't get any Dask tasks.
backend_options:
ingress:
- ports: [8786, 8787, 7077, 8080, 4040, 9797]
cidr: "0.0.0.0/0"
spot: false
spot_on_demand_fallback: true
multizone: true

tpch:
n_workers: 20
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)
backend_options:
spot: false
spot_on_demand_fallback: true
multizone: true

# For tests/workflows/test_pytorch_optuna.py
pytorch_optuna:
Expand Down
1 change: 1 addition & 0 deletions tests/benchmarks/tpch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import test_dask, test_duckdb, test_polars, test_pyspark # noqa: F401
71 changes: 71 additions & 0 deletions tests/benchmarks/tpch/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import functools
import os

import coiled
import pytest

DATASETS = {
"local": "./tpch-data/scale10/",
"scale 100": "s3://coiled-runtime-ci/tpch_scale_100/",
"scale 1000": "s3://coiled-runtime-ci/tpch-scale-1000/",
}

ENABLED_DATASET = os.getenv("TPCH_SCALE")
if ENABLED_DATASET is not None:
if ENABLED_DATASET not in DATASETS:
raise ValueError("Unknown tpch dataset: ", ENABLED_DATASET)
else:
ENABLED_DATASET = "scale 100"

machine = {
"memory": "256 GiB",
}


@pytest.fixture(scope="module")
def warm_start():
@coiled.function(**machine)
def _():
pass

_() # run once to give us a warm start


@pytest.fixture(scope="function")
def restart(warm_start):
@coiled.function(**machine)
def _():
pass

_.client.restart()
yield


def coiled_function(**kwargs):
# Shouldn't be necessary
# See https://github.com/coiled/platform/issues/3519
def _(function):
return functools.wraps(function)(coiled.function(**kwargs, **machine)(function))

return _


@pytest.fixture
def tpch_dataset_name():
return ENABLED_DATASET


@pytest.fixture
def tpch_dataset_path(tpch_dataset_name):
return DATASETS[tpch_dataset_name]


@pytest.fixture
def vm_type():
return "m6i.16xlarge"


@pytest.fixture
def region():
# Region of the TPCH data
return "us-east-2"
25 changes: 25 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from . import ( # noqa: F401
q1,
q2,
q3,
q4,
q5,
q6,
q7,
q8,
q9,
q10,
q11,
q12,
q13,
q14,
q15,
q16,
q17,
q18,
q19,
q20,
q21,
q22,
utils,
)
28 changes: 28 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
query = """select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date('1998-09-02')
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus
"""


def setup(spark):
from .utils import read_parquet_spark

read_parquet_spark(spark, "lineitem", "lineitem")
52 changes: 52 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
query = """select
s_acctbal,
s_name,
n_name,
p_partkey,
p_mfgr,
s_address,
s_phone,
s_comment
from
part,
supplier,
partsupp,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = 15
and p_type like '%BRASS'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'EUROPE'
and ps_supplycost = (
select
min(ps_supplycost)
from
partsupp,
supplier,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'EUROPE'
)
order by
s_acctbal desc,
n_name,
s_name,
p_partkey
limit 100
"""


def setup(spark):
from .utils import read_parquet_spark

for name in ("part", "supplier", "partsupp", "nation", "region"):
read_parquet_spark(spark, name, name)
45 changes: 45 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q22.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
query = """
select
cntrycode,
count(*) as numcust,
sum(c_acctbal) as totacctbal
from (
select
substring(c_phone from 1 for 2) as cntrycode,
c_acctbal
from
customer
where
substring(c_phone from 1 for 2) in
("13","31","23", "29", "30", "18", "17")
and c_acctbal > (
select
avg(c_acctbal)
from
customer
where
c_acctbal > 0.00
and substring (c_phone from 1 for 2) in
("13","31","23", "29", "30", "18", "17")
)
and not exists (
select
*
from
orders
where
o_custkey = c_custkey
)
) as custsale
group by
cntrycode
order by
cntrycode
"""


def setup(spark):
from .utils import read_parquet_spark

for name in ("orders", "customer"):
read_parquet_spark(spark, name, name)
32 changes: 32 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
query = """
select
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'BUILDING'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < date '1995-03-15'
and l_shipdate > date '1995-03-15'
group by
l_orderkey,
o_orderdate,
o_shippriority
order by
revenue desc,
o_orderdate
limit 10
"""


def setup(spark):
from .utils import read_parquet_spark

for name in ("customer", "orders", "lineitem"):
read_parquet_spark(spark, name, name)
30 changes: 30 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
query = """
select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and exists (
select
*
from
lineitem
where
l_orderkey = o_orderkey
and l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority
"""


def setup(spark):
from .utils import read_parquet_spark

for name in ("orders", "lineitem"):
read_parquet_spark(spark, name, name)
40 changes: 40 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
query = """
select
n_name,
sum(l_extendedprice * (1 - l_discount)) as revenue
from
customer,
orders,
lineitem,
supplier,
nation,
region
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and l_suppkey = s_suppkey
and c_nationkey = s_nationkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'ASIA'
and o_orderdate >= date '1994-01-01'
and o_orderdate < date '1994-01-01' + interval '1' year
group by
n_name
order by
revenue desc
"""


def setup(spark):
from .utils import read_parquet_spark

for name in (
"customer",
"orders",
"lineitem",
"supplier",
"nation",
"region",
):
read_parquet_spark(spark, name, name)
17 changes: 17 additions & 0 deletions tests/benchmarks/tpch/pyspark_queries/q6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
query = """
select
sum(l_extendedprice * l_discount) as revenue
from
lineitem
where
l_shipdate >= date '1994-01-01'
and l_shipdate < date '1994-01-01' + interval '1' year
and l_discount between .06 - 0.01 and .06 + 0.01
and l_quantity < 24
"""


def setup(spark):
from .utils import read_parquet_spark

read_parquet_spark(spark, "lineitem", "lineitem")
Loading