-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tpch: Dask vs PySpark and PySpark, Polars and DuckDB single node #1044
Conversation
This reverts commit a654c8e.
Was shutting down w/o tasks after 20mins
Ah, you are indeed correct, I found from the 8 and 20 node clusters that dask was faster 3 out of the 7 queries. (1, 2 and 6) |
Also, while I'm asking for things. I'll also be very curious how to changes at different scale levels. Common wisdom today says "Dask is ok at medium scales, but for large scale you really have to use Spark". I'm curious how true this is today. |
51ba62a
to
c0db646
Compare
e5ae393
to
f308a10
Compare
I was looking at this from the other direction. We will have to update them when we eventually want to use them (and more importantly we have to actually remember this when adding new queries), so better to copy as needed |
I don't have a strong preference |
Just remove the |
These queries change regularly over In the polars repo, so I wanted to avoid keeping stuff in synch that we are not using anyway |
I think that Patrick is saying that by the time we implement these queries it's decently likely that the upstream implementations will also have changed, so we'll have to go back and look at them anyway. If so, then I guess this depends on how much work there is to modify the upstream queries, and how frequently they change. |
I'm curious if anyone has thoughts on moving the entire
|
I don't understand, then don't update them? They're being skipped anyway. Anyhow, it appears you feel strongly about this so I'll remove them. |
It's likely that folks would just add our implementation without looking at the upstream changes. I want to avoid the cognitive load here since copy-paste should be quick at a later point.
No preference one way or the other |
Anything stopping us from merging? I'm sensitive to having this PR stay around over the weekend. |
Nothing from my end stopping a merge. 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some suggestions for follow up work but I would like to not do this on this PR. There is one question regarding configuration of the dask TCPH benchmarks and queuing that we should address. This PR should not change any other benchmarks IMO
If that is a no-op, we're good to go from my POV
skip_benchmarks = pytest.mark.skip(reason="need --tpch-non-dask option to run") | ||
for item in items: | ||
if not config.getoption("--tpch-non-dask") and not ( | ||
str(item.path).startswith( | ||
str(TEST_DIR / "benchmarks" / "tpch" / "test_dask") | ||
) | ||
): | ||
item.add_marker(skip_benchmarks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please open a follow up ticket for this. We don't need this for this PR but I want this to run somewhat regularly (every commit, once a day, etc.)
def _(): | ||
spark = get_or_create_spark(f"query{module.__name__}") | ||
|
||
# scale1000 stored as timestamp[ns] which spark parquet | ||
# can't use natively. | ||
if ENABLED_DATASET == "scale 1000": | ||
module.query = fix_timestamp_ns_columns(module.query) | ||
|
||
module.setup(spark) # read spark tables query will select from | ||
if hasattr(module, "ddl"): | ||
spark.sql(module.ddl) # might create temp view | ||
q_final = spark.sql(module.query) # benchmark query | ||
try: | ||
# trigger materialization of df | ||
return q_final.toJSON().collect() | ||
finally: | ||
spark.catalog.clearCache() | ||
spark.sparkContext.stop() | ||
spark.stop() | ||
|
||
return await asyncio.to_thread(_) | ||
|
||
if not is_local: | ||
rows = tpch_pyspark_client.run_on_scheduler(_run_tpch) | ||
else: | ||
rows = asyncio.run(_run_tpch(None)) # running locally | ||
print(f"Received {len(rows)} rows") | ||
|
||
|
||
class SparkMaster(SchedulerPlugin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also suggest as a follow up to
- Move the plugins into a spark.py submodule or something like this. That makes it easier to import if one runs a couple of ad-hoc tests. At least until we have this in coiled
- I would refactor the
run_tpch_pyspark
function slightly to allow it being used asrun_spark_query(dask_client, query)
for similar reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that for pyspark we merge it as-is, fix up spark outside of this repo, and then make this file like the others, where the query is just defined in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future work regardless though.
@@ -526,8 +585,9 @@ def tpch_cluster(request, dask_env_variables, cluster_kwargs, github_cluster_tag | |||
**cluster_kwargs["tpch"], | |||
) | |||
dump_cluster_kwargs(kwargs, f"tpch.{module}") | |||
with Cluster(**kwargs) as cluster: | |||
yield cluster | |||
with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. This should be on main already, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think this is a rendering issue
Lines 518 to 531 in b8f0f0e
@pytest.fixture(scope="module") | |
def tpch_cluster(request, dask_env_variables, cluster_kwargs, github_cluster_tags): | |
module = os.path.basename(request.fspath).split(".")[0] | |
module = module.replace("test_", "") | |
kwargs = dict( | |
name=f"{module}-{uuid.uuid4().hex[:8]}", | |
environ=dask_env_variables, | |
tags=github_cluster_tags, | |
**cluster_kwargs["tpch"], | |
) | |
dump_cluster_kwargs(kwargs, f"tpch.{module}") | |
with dask.config.set({"distributed.scheduler.worker-saturation": "inf"}): | |
with Cluster(**kwargs) as cluster: | |
yield cluster |
def fix_timestamp_ns_columns(query): | ||
""" | ||
scale100 stores l_shipdate/o_orderdate as timestamp[us] | ||
scale1000 stores l_shipdate/o_orderdate as timestamp[ns] which gives: | ||
Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)) | ||
so we set spark.sql.legacy.parquet.nanosAsLong and then convert to timestamp. | ||
""" | ||
for name in ("l_shipdate", "o_orderdate"): | ||
query = re.sub(rf"\b{name}\b", f"to_timestamp(cast({name} as string))", query) | ||
return query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@milesgranger please also open an issue for this. I don't feel comfortable with this string casting when we're comparing us to spark. If that means we have to regenerate the dataset, that's unfortunate but I wouldn't want us to bias spark too severely. At the very least we should confirm on smaller / other data what the impact here is and whether it can be ignored.
Merged #971 and #1027 together and added tests comparing each per tpch query; right now the first 7 queries as implemented in #971.
First run: https://cloud.coiled.io/clusters/284888/information?account=dask-engineering&tab=Metrics
Note that dask runs first, then pyspark for the same query.
One can pretty easily determine start/stop of each based on tasks being present in the first chart.
Initial impression is PySpark spills to disk less / uses less memory, and Dask is generally faster.
Dashboard artifact:
https://github.com/coiled/benchmarks/suites/16815018862/artifacts/960398346