Skip to content

Commit

Permalink
Merge pull request #183 from knaaptime/main
Browse files Browse the repository at this point in the history
  • Loading branch information
knaaptime authored Aug 22, 2023
2 parents 65a72aa + de2e9e5 commit ce6fcb9
Showing 1 changed file with 63 additions and 65 deletions.
128 changes: 63 additions & 65 deletions tobler/area_weighted/area_interpolate_dask.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
'''
"""
Area Weighted Interpolation, out-of-core and parallel through Dask
'''
"""

import pandas
import geopandas
import numpy as np
from .area_interpolate import _area_interpolate_binning as area_interpolate
try:
import dask_geopandas
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph
except ImportError:
raise ImportError(
"Area interpolation with Dask requires `dask` and "
"`dask_geopandas` installed to run. Please install them "
"before importing this functionality."
)


def area_interpolate_dask(
source_dgdf,
Expand All @@ -24,11 +15,11 @@ def area_interpolate_dask(
extensive_variables=None,
intensive_variables=None,
categorical_variables=None,
categorical_frequency=True
categorical_frequency=True,
):
'''
"""
Out-of-core and parallel area interpolation for categorical variables.
Parameters
----------
source_dgdf : dask_geopandas.GeoDataFrame
Expand All @@ -40,7 +31,7 @@ def area_interpolate_dask(
Dask-geopandas GeoDataFrame
IMPORTANT: the table needs to be spatially shuffled and with spatial partitions.
This is required so only overlapping partitions are checked for interpolation. See
more on spatial shuffling at: https://dask-geopandas.readthedocs.io/en/stable/guide/spatial-partitioning.html
more on spatial shuffling at: https://dask-geopandas.readthedocs.io/en/stable/guide/spatial-partitioning.html
id_col : str
Name of the column in `target_dgdf` with unique IDs to be used in output table
extensive_variables : list
Expand All @@ -50,7 +41,7 @@ def area_interpolate_dask(
[Optional. Default=None] Columns in `source_dgdf` for intensive variables
IMPORTANT: currently NOT implemented.
categorical_variables : list
[Optional. Default=None] Columns in `source_dgdf` for categorical variables
[Optional. Default=None] Columns in `source_dgdf` for categorical variables
IMPORTANT: categorical variables must be of type `'category[known]'`. This is so
all categories are known ahead of time and Dask can run lazily.
categorical_frequency : Boolean
Expand All @@ -65,42 +56,55 @@ def area_interpolate_dask(
estimates : dask_geopandas.GeoDataFrame
new dask-geopandas geodaraframe with interpolated variables and `id_col` as
columns and target_df geometry as output geometry
'''
"""
try:
import dask_geopandas
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph
except ImportError:
raise ImportError(
"Area interpolation with Dask requires `dask` and "
"`dask_geopandas` installed to run. Please install them "
"before importing this functionality."
)

if intensive_variables is not None:
raise NotImplementedError((
"Dask-based interpolation of intensive variables is "
"not implemented yet. Please remove intensive variables to "
"be able to run the rest."
))
raise NotImplementedError(
(
"Dask-based interpolation of intensive variables is "
"not implemented yet. Please remove intensive variables to "
"be able to run the rest."
)
)
if extensive_variables is not None:
raise NotImplementedError((
"Dask-based interpolation of extensive variables is "
"not implemented yet. Please remove intensive variables to "
"be able to run the rest."
))
raise NotImplementedError(
(
"Dask-based interpolation of extensive variables is "
"not implemented yet. Please remove intensive variables to "
"be able to run the rest."
)
)
# Categoricals must be Dask's known categorical
if categorical_variables is not None:
category_vars = []
for cat_var in categorical_variables:
var_names = [f'{cat_var}_{c}' for c in source_dgdf[cat_var].cat.categories]
var_names = [f"{cat_var}_{c}" for c in source_dgdf[cat_var].cat.categories]
category_vars.extend(var_names)
else:
category_vars = None
# Build tasks by joining pairs of chunks from left/right
dsk = {}
new_spatial_partitions = []
parts = geopandas.sjoin(
source_dgdf.spatial_partitions.to_frame('geometry'),
target_dgdf.spatial_partitions.to_frame('geometry'),
how='inner',
predicate='intersects'
source_dgdf.spatial_partitions.to_frame("geometry"),
target_dgdf.spatial_partitions.to_frame("geometry"),
how="inner",
predicate="intersects",
)
parts_left = np.asarray(parts.index)
parts_right = np.asarray(parts['index_right'].values)
name = 'area_interpolate-' + tokenize(
target_dgdf, source_dgdf
)
parts_right = np.asarray(parts["index_right"].values)
name = "area_interpolate-" + tokenize(target_dgdf, source_dgdf)
for i, (l, r) in enumerate(zip(parts_left, parts_right)):
dsk[(name, i)] = (
id_area_interpolate,
Expand All @@ -111,7 +115,7 @@ def area_interpolate_dask(
intensive_variables,
None,
True,
'auto',
"auto",
1,
categorical_variables,
category_vars,
Expand All @@ -137,23 +141,19 @@ def area_interpolate_dask(
intensive_variables=intensive_variables,
table=None,
allocate_total=True,
spatial_index='auto',
spatial_index="auto",
n_jobs=1,
categorical_variables=categorical_variables,
category_vars=category_vars,
)
# Build output table
transferred = dask_geopandas.GeoDataFrame(
graph,
name,
meta,
[None] * (len(dsk) + 1),
new_spatial_partitions
graph, name, meta, [None] * (len(dsk) + 1), new_spatial_partitions
)
# Merge chunks
out = target_dgdf[[id_col, 'geometry']]
out = target_dgdf[[id_col, "geometry"]]
## Extensive --> Not implemented (DAB: the below does not match single-core)
'''
"""
if extensive_variables is not None:
out_extensive = (
transferred
Expand All @@ -162,25 +162,23 @@ def area_interpolate_dask(
.agg({v: 'sum' for v in extensive_variables})
)
out = out.join(out_extensive, on=id_col)
'''
"""
## Intensive --> Weight by area of the chunk (Not implemented)
## Categorical --> Add up proportions
if categorical_variables is not None:
out_categorical = (
transferred
[category_vars]
transferred[category_vars]
.astype(float)
.groupby(transferred[id_col])
.agg({v: 'sum' for v in category_vars})
)
.agg({v: "sum" for v in category_vars})
)
out = out.join(out_categorical, on=id_col)
if categorical_frequency is True:
cols = out_categorical.columns.tolist()
out[cols] = out[cols].div(
out.area, axis='index'
)
out[cols] = out[cols].div(out.area, axis="index")
return out


def id_area_interpolate(
source_df,
target_df,
Expand All @@ -189,20 +187,20 @@ def id_area_interpolate(
intensive_variables=None,
table=None,
allocate_total=True,
spatial_index='auto',
spatial_index="auto",
n_jobs=1,
categorical_variables=None,
category_vars=None
category_vars=None,
):
'''
"""
Light wrapper around single-core area interpolation to be run on distributed workers
Parameters
----------
source_df : geopandas.GeoDataFrame
target_df : geopandas.GeoDataFrame
id_col : str
Name of the column in `target_dgdf` with unique IDs to be used in output table
Name of the column in `target_dgdf` with unique IDs to be used in output table
extensive_variables : list
[Optional. Default=None] Columns in dataframes for extensive variables
intensive_variables : list
Expand Down Expand Up @@ -241,8 +239,8 @@ def id_area_interpolate(
estimates : geopandas.GeoDataFrame
new geodaraframe with interpolated variables as columns and target_df geometry
as output geometry
'''
"""
estimates = area_interpolate(
source_df,
target_df,
Expand All @@ -253,16 +251,16 @@ def id_area_interpolate(
spatial_index=spatial_index,
n_jobs=n_jobs,
categorical_variables=categorical_variables,
categorical_frequency=False
categorical_frequency=False,
)
estimates[id_col] = target_df[id_col].values

if categorical_variables is not None:
category_vars_to_add = []
for category_var in category_vars:
if category_var not in estimates.columns:
category_vars_to_add.append(category_var)
estimates = estimates.join(
pandas.DataFrame(index=estimates.index, columns=category_vars_to_add)
)
)
return estimates

0 comments on commit ce6fcb9

Please sign in to comment.