From de2e9e56b8a36646ce2fb6fece93e0c6bdef2b56 Mon Sep 17 00:00:00 2001 From: eli knaap Date: Tue, 22 Aug 2023 08:58:57 -0700 Subject: [PATCH] move dask to function-level import --- tobler/area_weighted/area_interpolate_dask.py | 128 +++++++++--------- 1 file changed, 63 insertions(+), 65 deletions(-) diff --git a/tobler/area_weighted/area_interpolate_dask.py b/tobler/area_weighted/area_interpolate_dask.py index 7f3b26e..faf5e02 100755 --- a/tobler/area_weighted/area_interpolate_dask.py +++ b/tobler/area_weighted/area_interpolate_dask.py @@ -1,21 +1,12 @@ -''' +""" Area Weighted Interpolation, out-of-core and parallel through Dask -''' +""" import pandas import geopandas import numpy as np from .area_interpolate import _area_interpolate_binning as area_interpolate -try: - import dask_geopandas - from dask.base import tokenize - from dask.highlevelgraph import HighLevelGraph -except ImportError: - raise ImportError( - "Area interpolation with Dask requires `dask` and " - "`dask_geopandas` installed to run. Please install them " - "before importing this functionality." - ) + def area_interpolate_dask( source_dgdf, @@ -24,11 +15,11 @@ def area_interpolate_dask( extensive_variables=None, intensive_variables=None, categorical_variables=None, - categorical_frequency=True + categorical_frequency=True, ): - ''' + """ Out-of-core and parallel area interpolation for categorical variables. - + Parameters ---------- source_dgdf : dask_geopandas.GeoDataFrame @@ -40,7 +31,7 @@ def area_interpolate_dask( Dask-geopandas GeoDataFrame IMPORTANT: the table needs to be spatially shuffled and with spatial partitions. This is required so only overlapping partitions are checked for interpolation. See - more on spatial shuffling at: https://dask-geopandas.readthedocs.io/en/stable/guide/spatial-partitioning.html + more on spatial shuffling at: https://dask-geopandas.readthedocs.io/en/stable/guide/spatial-partitioning.html id_col : str Name of the column in `target_dgdf` with unique IDs to be used in output table extensive_variables : list @@ -50,7 +41,7 @@ def area_interpolate_dask( [Optional. Default=None] Columns in `source_dgdf` for intensive variables IMPORTANT: currently NOT implemented. categorical_variables : list - [Optional. Default=None] Columns in `source_dgdf` for categorical variables + [Optional. Default=None] Columns in `source_dgdf` for categorical variables IMPORTANT: categorical variables must be of type `'category[known]'`. This is so all categories are known ahead of time and Dask can run lazily. categorical_frequency : Boolean @@ -65,25 +56,40 @@ def area_interpolate_dask( estimates : dask_geopandas.GeoDataFrame new dask-geopandas geodaraframe with interpolated variables and `id_col` as columns and target_df geometry as output geometry - - ''' + + """ + try: + import dask_geopandas + from dask.base import tokenize + from dask.highlevelgraph import HighLevelGraph + except ImportError: + raise ImportError( + "Area interpolation with Dask requires `dask` and " + "`dask_geopandas` installed to run. Please install them " + "before importing this functionality." + ) + if intensive_variables is not None: - raise NotImplementedError(( - "Dask-based interpolation of intensive variables is " - "not implemented yet. Please remove intensive variables to " - "be able to run the rest." - )) + raise NotImplementedError( + ( + "Dask-based interpolation of intensive variables is " + "not implemented yet. Please remove intensive variables to " + "be able to run the rest." + ) + ) if extensive_variables is not None: - raise NotImplementedError(( - "Dask-based interpolation of extensive variables is " - "not implemented yet. Please remove intensive variables to " - "be able to run the rest." - )) + raise NotImplementedError( + ( + "Dask-based interpolation of extensive variables is " + "not implemented yet. Please remove intensive variables to " + "be able to run the rest." + ) + ) # Categoricals must be Dask's known categorical if categorical_variables is not None: category_vars = [] for cat_var in categorical_variables: - var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories] + var_names = [f"{cat_var}_{c}" for c in source_dgdf[cat_var].cat.categories] category_vars.extend(var_names) else: category_vars = None @@ -91,16 +97,14 @@ def area_interpolate_dask( dsk = {} new_spatial_partitions = [] parts = geopandas.sjoin( - source_dgdf.spatial_partitions.to_frame('geometry'), - target_dgdf.spatial_partitions.to_frame('geometry'), - how='inner', - predicate='intersects' + source_dgdf.spatial_partitions.to_frame("geometry"), + target_dgdf.spatial_partitions.to_frame("geometry"), + how="inner", + predicate="intersects", ) parts_left = np.asarray(parts.index) - parts_right = np.asarray(parts['index_right'].values) - name = 'area_interpolate-' + tokenize( - target_dgdf, source_dgdf - ) + parts_right = np.asarray(parts["index_right"].values) + name = "area_interpolate-" + tokenize(target_dgdf, source_dgdf) for i, (l, r) in enumerate(zip(parts_left, parts_right)): dsk[(name, i)] = ( id_area_interpolate, @@ -111,7 +115,7 @@ def area_interpolate_dask( intensive_variables, None, True, - 'auto', + "auto", 1, categorical_variables, category_vars, @@ -137,23 +141,19 @@ def area_interpolate_dask( intensive_variables=intensive_variables, table=None, allocate_total=True, - spatial_index='auto', + spatial_index="auto", n_jobs=1, categorical_variables=categorical_variables, category_vars=category_vars, ) # Build output table transferred = dask_geopandas.GeoDataFrame( - graph, - name, - meta, - [None] * (len(dsk) + 1), - new_spatial_partitions + graph, name, meta, [None] * (len(dsk) + 1), new_spatial_partitions ) # Merge chunks - out = target_dgdf[[id_col, 'geometry']] + out = target_dgdf[[id_col, "geometry"]] ## Extensive --> Not implemented (DAB: the below does not match single-core) - ''' + """ if extensive_variables is not None: out_extensive = ( transferred @@ -162,25 +162,23 @@ def area_interpolate_dask( .agg({v: 'sum' for v in extensive_variables}) ) out = out.join(out_extensive, on=id_col) - ''' + """ ## Intensive --> Weight by area of the chunk (Not implemented) ## Categorical --> Add up proportions if categorical_variables is not None: out_categorical = ( - transferred - [category_vars] + transferred[category_vars] .astype(float) .groupby(transferred[id_col]) - .agg({v: 'sum' for v in category_vars}) - ) + .agg({v: "sum" for v in category_vars}) + ) out = out.join(out_categorical, on=id_col) if categorical_frequency is True: cols = out_categorical.columns.tolist() - out[cols] = out[cols].div( - out.area, axis='index' - ) + out[cols] = out[cols].div(out.area, axis="index") return out + def id_area_interpolate( source_df, target_df, @@ -189,20 +187,20 @@ def id_area_interpolate( intensive_variables=None, table=None, allocate_total=True, - spatial_index='auto', + spatial_index="auto", n_jobs=1, categorical_variables=None, - category_vars=None + category_vars=None, ): - ''' + """ Light wrapper around single-core area interpolation to be run on distributed workers - + Parameters ---------- source_df : geopandas.GeoDataFrame target_df : geopandas.GeoDataFrame id_col : str - Name of the column in `target_dgdf` with unique IDs to be used in output table + Name of the column in `target_dgdf` with unique IDs to be used in output table extensive_variables : list [Optional. Default=None] Columns in dataframes for extensive variables intensive_variables : list @@ -241,8 +239,8 @@ def id_area_interpolate( estimates : geopandas.GeoDataFrame new geodaraframe with interpolated variables as columns and target_df geometry as output geometry - - ''' + + """ estimates = area_interpolate( source_df, target_df, @@ -253,10 +251,10 @@ def id_area_interpolate( spatial_index=spatial_index, n_jobs=n_jobs, categorical_variables=categorical_variables, - categorical_frequency=False + categorical_frequency=False, ) estimates[id_col] = target_df[id_col].values - + if categorical_variables is not None: category_vars_to_add = [] for category_var in category_vars: @@ -264,5 +262,5 @@ def id_area_interpolate( category_vars_to_add.append(category_var) estimates = estimates.join( pandas.DataFrame(index=estimates.index, columns=category_vars_to_add) - ) + ) return estimates