Skip to content

Commit

Permalink
Add tag to limit concurrency for high memory use assets
Browse files Browse the repository at this point in the history
  • Loading branch information
bendnorman committed Apr 4, 2024
1 parent ff78c6f commit 6dbaa75
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 19 deletions.
7 changes: 5 additions & 2 deletions src/pudl/analysis/state_demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def melt_ferc714_hourly_demand_matrix(
),
),
},
op_tags={"datasource": "ferc714"},
op_tags={"memory-use": "high"},
)
def _out_ferc714__hourly_demand_matrix(
context, _out_ferc714__hourly_pivoted_demand_matrix: pd.DataFrame
Expand All @@ -473,7 +473,10 @@ def _out_ferc714__hourly_demand_matrix(
return df


@asset(compute_kind="Python")
@asset(
compute_kind="Python",
op_tags={"memory-use": "high"},
)
def _out_ferc714__hourly_imputed_demand(
_out_ferc714__hourly_demand_matrix: pd.DataFrame,
_out_ferc714__utc_offset: pd.DataFrame,
Expand Down
11 changes: 3 additions & 8 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,13 @@ def _get_keys_from_assets(
"epacems_io_manager": epacems_io_manager,
}

# By default, limit CEMS and FERC 714 processing concurrency to prevent memory overload.
# Limit the number of concurrent workers when launch assets that use a lot of memory.
default_tag_concurrency_limits = [
{
"key": "datasource",
"value": "epacems",
"key": "memory-use",
"value": "high",
"limit": 2,
},
{
"key": "datasource",
"value": "ferc714",
"limit": 1,
},
]
default_config = pudl.helpers.get_dagster_execution_config(
tag_concurrency_limits=default_tag_concurrency_limits
Expand Down
11 changes: 3 additions & 8 deletions src/pudl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,13 @@ def pudl_etl(
},
}

# By default, limit CEMS and FERC 714 processing concurrency to prevent memory overload.
# Limit the number of concurrent workers when launch assets that use a lot of memory.
tag_concurrency_limits = [
{
"key": "datasource",
"value": "epacems",
"key": "memory-use",
"value": "high",
"limit": 2,
},
{
"key": "datasource",
"value": "ferc714",
"limit": 1,
},
]

run_config.update(
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/etl/epacems_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_years_from_settings(context):

@op(
required_resource_keys={"datastore", "dataset_settings"},
tags={"datasource": "epacems"},
tags={"memory-use": "high"},
)
def process_single_year(
context,
Expand Down

0 comments on commit 6dbaa75

Please sign in to comment.