Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriscalkoen committed Aug 1, 2024
1 parent 2557eef commit 40ff5d7
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions scripts/python/make_gcts.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,34 +458,42 @@ def generate_filtered_transects(

logging.info(f"Transects written to {TMP_BASE_URI}")

# logging.info("Restarting client...")
# client.restart()
# logging.info(f"Client dashboard link: {client.dashboard_link}")

transects = dask_geopandas.read_parquet(
TMP_BASE_URI, storage_options=storage_options
).compute()
)
zoom = 5
quadkey_grouper = f"quadkey_{zoom}"
transects[quadkey_grouper] = transects.quadkey.str[:zoom]

logging.info(
"Part 2: Adding Overture divisions (countries and regions) to transects..."
)

def wrapper(transects, countries_uri, regions_uri, max_distance):
def process(transects_group):
with fsspec.open(countries_uri, **storage_options) as f:
countries = gpd.read_parquet(
f, columns=["country", "common_country_name", "continent", "geometry"]
)
with fsspec.open(regions_uri, **storage_options) as f:
regions = gpd.read_parquet(f, columns=["common_region_name", "geometry"])

r = add_attributes_from_gdfs(
transects, [countries, regions], max_distance=max_distance
result = add_attributes_from_gdfs(
transects_group, [countries, regions], max_distance=20000
)
return result

logging.info("Part 2: adding attributes to transects...")
logging.info(f"Grouping the transects by quadkey zoom level {zoom}")

return r
# def wrapper(transects, countries_uri, regions_uri, max_distance):
# with fsspec.open(countries_uri, **storage_options) as f:
# countries = gpd.read_parquet(
# f, columns=["country", "common_country_name", "continent", "geometry"]
# )
# with fsspec.open(regions_uri, **storage_options) as f:
# regions = gpd.read_parquet(f, columns=["common_region_name", "geometry"])

# r = add_attributes_from_gdfs(
# transects, [countries, regions], max_distance=max_distance
# )

# return r

# with fsspec.open(countries_uri, **storage_options) as f:
# countries = gpd.read_parquet(
Expand All @@ -500,14 +508,17 @@ def wrapper(transects, countries_uri, regions_uri, max_distance):
# scattered_regions = client.scatter(regions, broadcast=True)

tasks = []
for _, tr in transects.groupby(quadkey_grouper):
t = dask.delayed(wrapper)(tr, countries_uri, regions_uri, max_distance=20000)
for _, group in transects.groupby(quadkey_grouper):
t = dask.delayed(process)(group, countries_uri, regions_uri, max_distance=20000)
tasks.append(t)

logging.info("Adding attributes to transects...")
logging.info("Computing the submitted tasks..")
transects = pd.concat(dask.compute(*tasks))
transects = transects.drop(columns=[quadkey_grouper])

logging.info(
f"Partitioning into equal partitions by quadkey at zoom level {MIN_ZOOM_QUADKEY}"
)
partitioner = QuadKeyEqualSizePartitioner(
transects,
out_dir=OUT_BASE_URI,
Expand All @@ -522,6 +533,9 @@ def wrapper(transects, countries_uri, regions_uri, max_distance):
)
partitioner.process()

logging.info("Closing client.")
client.close()

logging.info("Done!")
elapsed_time = time.time() - start_time
logging.info(
Expand Down

0 comments on commit 40ff5d7

Please sign in to comment.