-
-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempt to limit _out_ferc714__hourly_demand_matrix
concurrency
#3541
Conversation
@click.option( | ||
"--epacems-workers", | ||
default=2, | ||
type=int, | ||
help=( | ||
"Max number of processes Dagster can launch for EPA CEMS assets. Defaults " | ||
"to max number of processes our typical local machines can handle." | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this because I dont' think we are actually changing the number of workers using the command option. We could add an argument that can accept an arbitrary number of asset_name and concurrency limits:
pudl_etl src/pudl/package_data/settings/etl_full.yml --tag-concurrency-limits a 1 --tag-concurrency-limits b 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long term I'd like to just remove the pudl_etl
command in favor of preconfigured dagster jobs executed using dagster job execute
(sorry I still need to flesh out this issue) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that we aren't really using this option, and switching to the dagster native CLI would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this asset's CPU usage can be affected by the Dagster configuration.
src/pudl/etl/__init__.py
Outdated
}, | ||
{ | ||
"key": "datasource", | ||
"value": "ferc714", | ||
"limit": 1, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will have any effect. The FERC-714 job is parallelized internally, not by virtue of the Dagster infrastructure. Right now it will use all available CPUs no matter what. I think it's the function deep down inside that needs to be told to do something different. By contrast the CEMS work units are being managed by Dagster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If I run
_out_ferc714__hourly_demand_matrix
by itself, it uses 1 CPU and ~4GB of memory, and takes ~3 minutes to complete. - And then
_out_ferc714__hourly_imputed_demand
is the asset that tries to use lots of cores, but all within a single asset (the parallelism has nothing to do with Dagster). It's often running at 4-600% (on my laptop) with one process that seems to need ~1-2 GB of memory. It takes ~15 min to run in isolation locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, it's the numpy.linalg.svd
and numpy.einsum
functions which rely on multithreaded BLAS and LAPACK libraries, whose behavior is controlled by environment variables or other libraries...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But again the uses-lots-of-cores asset doesn't seem to be a uses-lots-of-memory asset so I'm confused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_out_ferc714__hourly_imputed_demand
wasn't running when we got the OOM error though. Assuming _out_ferc714__hourly_demand_matrix
pushed us over the memory edge I thought we could limit the number of processes running when the asset is run. Maybe this is just playing wack a mole though.
I should probably do a full memory profile locally to understand what the culprit is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that the way these tags work is that no more than 2 CEMS tagged assets can be run at the same time. The change proposed would mean that no more than 1 _out_ferc714__hourly_demand_matrix
could be run at once, but if there's only the one asset that's causing trouble this won't change anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that these are tag level concurrency and that @e-belfer is right. I think we have a couple options:
- make a
high-mem
op_tag, use it to tag our high memory assets, set its tag concurrency to $LOW_NUM - set
max_concurrent
(which is the global concurrency limit) to $MEDIUM_NUM - we already have machinery for this inetl/__init__.py
:diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 0a6223ae2..24de83ef4 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -215,7 +215,8 @@ default_tag_concurrency_limits = [ } ] default_config = pudl.helpers.get_dagster_execution_config( - tag_concurrency_limits=default_tag_concurrency_limits + num_workers=5, + tag_concurrency_limits=default_tag_concurrency_limits, ) default_config |= pudl.analysis.ml_tools.get_ml_models_config()
We could combine these, also - say our highest memory usage asset uses 30GB memory, and our high-mem
threshold is ~8GB. Then, if the high-mem
concurrency limit is 1, we could set the global concurrency limit to 5 and still expect our peak memory usage to be less than (30 * 1 + 8 * 4) = 62 GB. Though that will probably end up being too conservative and leave a lot of our memory unused most of the time.
We don't have to have an exhaustively-correct high-mem
tag either - I think just picking off a few culprits will improve our reliability here.
Finally, it's maybe worth looking at some of these recommendations for reducing memory usage - but that seems like more effort than the high-mem
bandaid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the generic nature of the high-mem
tag, at least as a short-term fix.
Probably a lot of our timeseries processing could be parallelized thoughtfully with Dask, and it seems like there could be a lot more timeseries in our future. But it'll take much more specific effort.
If/When we switch to using a Dask cluster as the executor (or anything other than One Big Machine) is there a way to use something like the high-mem
tag to make sure that the node that gets that job has enough memory, without requiring every node to have a lot of memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also like the high-mem
tag idea. Would it protect us against a scenario where we have one high-mem
asset running and a bunch of other medium - low memory assets running?
We could try just using the high-mem
tag and if it doesn't work then we can limit global concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If/When we switch to using a Dask cluster as the executor (or anything other than One Big Machine) is there a way to use something like the
high-mem
tag to make sure that the node that gets that job has enough memory, without requiring every node to have a lot of memory?
Yeah, it looks like Dask supports differently-resourced worker types/pools: https://blog.dask.org/2022/02/17/helm-multiple-worker-groups
I also like the high-mem tag idea. Would it protect us against a scenario where we have one high-mem asset running and a bunch of other medium - low memory assets running?
Not really, if we have 64 GB and our 30GB task is running, there's nothing to stop dagster from scheduling 100 4GB tasks also. So the global limit is necessary to fully protect us. Though in practice I think a limit on the high-mem
tasks will help most of the time.
@click.option( | ||
"--epacems-workers", | ||
default=2, | ||
type=int, | ||
help=( | ||
"Max number of processes Dagster can launch for EPA CEMS assets. Defaults " | ||
"to max number of processes our typical local machines can handle." | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that we aren't really using this option, and switching to the dagster native CLI would be good.
src/pudl/etl/cli.py
Outdated
{ | ||
"key": "datasource", | ||
"value": "ferc714", | ||
"limit": 1, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, I don't think changed Dagster's configuration will have any impact on what happens inside the imputation function, which is doing a bunch of tensor math that's internally parallelized.
src/pudl/analysis/state_demand.py
Outdated
@asset( | ||
compute_kind="Python", | ||
op_tags={"memory-use": "high"}, | ||
) | ||
def _out_ferc714__hourly_imputed_demand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a tag to this asset because it is parallelized internally and gobbles up a lot of memory and cpus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try running the full ETL locally and see what this does to the overall run time. I'm worried that because this process is running for almost the entire duration of the ETL, having it tagged as high memory usage will mean that the EPA CEMS ends up taking 2x as long as it does now.
When I ran this process in isolation locally it used 4-6 CPUs, but only 1-2 GB of memory, so I think we might not need for it to be held out as a high memory use asset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. It also wasn't running when the VM failed so I'm down to not restrict the concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hunting down a bunch of other high memory assets now. There are some doozies! The EIA930 tops out at 11GB, and the EIA860M changelog table at 8GB.
# Limit the number of concurrent workers when launch assets that use a lot of memory. | ||
tag_concurrency_limits = [ | ||
{ | ||
"key": "datasource", | ||
"value": "epacems", | ||
"limit": epacems_workers, | ||
} | ||
"key": "memory-use", | ||
"value": "high", | ||
"limit": 2, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this means no more than two high-memory processes launched by dagster will be running at the same time. Hopefully this is enough to keep our nightly builds chugging along. If it doesn't work we can limit the concurrency globally and/or add the tag to more high memory assets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add some tags to other high memory assets that were running when the VM failed?
_core_eia860__boilers
_core_eia860__ownership
_core_eia860__plants
_core_eia860__utilities
_core_eia923__boiler_fuel
_core_eia923__coalmine
_core_eia923__cooling_system_information
_core_eia923__fgd_operation_maintenance
_out_ferc1__yearly_steam_plants_sched402_with_plant_ids.ferc_to_ferc.merge_steam_fuel_dfs
_out_ferc714__hourly_demand_matrix
core_demand_side_management_eia861
core_eia861__yearly_balancing_authority
core_eia861__yearly_dynamic_pricing
extract_phmsagas
raw_eia930__balance
raw_eia930__interchange
raw_eia930__subregion
Given CEMS and _out_ferc714__hourly_demand_matrix
are the only tagged assets it's unlikely they will be executed at the same time and limit concurrency.
6dbaa75
to
53fe6cd
Compare
I ran a bunch of assets in isolation locally, which I thought were likely to be high memory use, and added the memory-use: high tag when appropriate. I used 4GB as my threshold, since the nightly build machine has 4GB per CPU. Unfortunately, there are some graph-backed assets that I could not use the
I'm going to merge this in just so we have a chance of a successful build before the end of the week/sprint. |
Overview
Closes #3533.
Attempts to limit max concurrency to 1 when the
_out_ferc714__hourly_demand_matrix
asset is running in an attempt to reduce memory usage on the nightly build VM.I'm still a little confused about how dagster's concurrency limiting with tags works, see me comment here.
Also, if we keep needing to limit concurrency when certain assets are run because of memory issues, we should come up with a more generalization method so we can restrict the concurrency for any asset. This PR just adds some hard coded values to the an asset decorator and the ETL scripts.
Testing
How did you make sure this worked? How can a reviewer verify this?
To-do list