Skip to content

Commit

Permalink
Merge pull request #282 from hotosm/feature/post-processing
Browse files Browse the repository at this point in the history
Feature/post processing
  • Loading branch information
kshitijrajsharma authored Dec 1, 2024
2 parents bfe193f + a635537 commit 7d699b0
Show file tree
Hide file tree
Showing 17 changed files with 1,045 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ Pipfile.lock
#backend
backend/data
backend/.env

.DS_Store
85 changes: 79 additions & 6 deletions API/api_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

# Reader imports
from src.app import CustomExport, PolygonStats, RawData, S3FileTransfer
from src.post_processing.processor import PostProcessor
from src.config import ALLOW_BIND_ZIP_FILTER
from src.config import CELERY_BROKER_URL as celery_broker_uri
from src.config import CELERY_RESULT_BACKEND as celery_backend
Expand All @@ -39,6 +40,7 @@
RawDataCurrentParams,
RawDataOutputType,
)
from src.post_processing.processor import PostProcessor

if ENABLE_SOZIP:
# Third party imports
Expand Down Expand Up @@ -75,7 +77,12 @@ def create_readme_content(default_readme, polygon_stats):


def zip_binding(
working_dir, exportname_parts, geom_dump, polygon_stats, default_readme
working_dir,
exportname_parts,
geom_dump,
polygon_stats,
geojson_stats,
default_readme,
):
logging.debug("Zip Binding Started!")
upload_file_path = os.path.join(
Expand All @@ -88,6 +95,9 @@ def zip_binding(
),
}

if geojson_stats:
additional_files["stats.json"] = geojson_stats

for name, content in additional_files.items():
temp_path = os.path.join(working_dir, name)
with open(temp_path, "w") as f:
Expand Down Expand Up @@ -209,11 +219,60 @@ def process_raw_data(self, params, user=None):
file_parts,
)

geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)
inside_file_size = 0
# Post-proccessing: Generate GeoJSON/HTML stats and transliterations
polygon_stats = None
geojson_stats_html = None
geojson_stats_json = None
download_html_url = None
if "include_stats" or "include_translit" in params.dict():
post_processor = PostProcessor(
{
"include_stats": params.include_stats,
"include_translit": params.include_translit,
}
)

if params.include_stats:
post_processor.filters = params.filters

post_processor.init()

geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts, post_processor.post_process_line)

if params.include_stats:
geojson_stats_json = json.dumps(post_processor.geoJSONStats.dict())

# Create a HTML summary of stats
if params.include_stats_html:
tpl = "stats"
if "waterway" in post_processor.geoJSONStats.config.keys:
tpl = "stats_waterway"
elif "highway" in post_processor.geoJSONStats.config.keys:
tpl = "stats_highway"
elif "building" in post_processor.geoJSONStats.config.keys:
tpl = "stats_building"
project_root = pathlib.Path(__file__).resolve().parent
tpl_path = os.path.join(
project_root,
"../src/post_processing/{tpl}_tpl.html".format(tpl=tpl),
)
geojson_stats_html = post_processor.geoJSONStats.html(
tpl_path
).build()
upload_html_path = os.path.join(
working_dir, os.pardir, f"{exportname_parts[-1]}.html"
)
with open(upload_html_path, "w") as f:
f.write(geojson_stats_html)

else:
geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)

inside_file_size = 0
if "include_stats" in params.dict():
if params.include_stats:
feature = {
Expand All @@ -222,12 +281,14 @@ def process_raw_data(self, params, user=None):
"properties": {},
}
polygon_stats = PolygonStats(feature).get_summary_stats()

if bind_zip:
upload_file_path, inside_file_size = zip_binding(
working_dir=working_dir,
exportname_parts=exportname_parts,
geom_dump=geom_dump,
polygon_stats=polygon_stats,
geojson_stats=geojson_stats_json,
default_readme=DEFAULT_README_TEXT,
)

Expand All @@ -240,6 +301,7 @@ def process_raw_data(self, params, user=None):
upload_file_path = file_path
inside_file_size += os.path.getsize(file_path)
break # only take one file inside dir , if contains many it should be inside zip

# check if download url will be generated from s3 or not from config
if use_s3_to_upload:
file_transfer_obj = S3FileTransfer()
Expand All @@ -253,7 +315,6 @@ def process_raw_data(self, params, user=None):
pattern = r"(hotosm_project_)(\d+)"
match = re.match(pattern, exportname)
if match:
prefix = match.group(1)
project_number = match.group(2)
if project_number:
upload_name = f"TM/{project_number}/{exportname}"
Expand All @@ -272,6 +333,15 @@ def process_raw_data(self, params, user=None):
upload_name,
file_suffix="zip" if bind_zip else params.output_type.lower(),
)

# If there's an HTML file, upload it too
if geojson_stats_html:
download_html_url = file_transfer_obj.upload(
upload_html_path,
upload_name,
file_suffix="html",
)

else:
# give the static file download url back to user served from fastapi static export path
download_url = str(upload_file_path)
Expand All @@ -297,6 +367,9 @@ def process_raw_data(self, params, user=None):
}
if polygon_stats:
final_response["stats"] = polygon_stats
if download_html_url:
final_response["download_html_url"] = download_html_url

return final_response

except Exception as ex:
Expand Down
1 change: 0 additions & 1 deletion backend/field_update
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class Database:
try:
self.cursor.execute(query)
self.conn.commit()
# print(query)
try:
result = self.cursor.fetchall()

Expand Down
2 changes: 1 addition & 1 deletion backend/raw_backend
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ if __name__ == "__main__":

if not args.replication:
osm2pgsql.append("--drop")
print(osm2pgsql)

run_subprocess_cmd(osm2pgsql)

basic_index_cmd = [
Expand Down
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ psutil==5.9.8

## logging
tqdm==4.66.2

# stats for geojson data
geojson-stats==0.2.4

# transliterations
transliterate==1.10.2

33 changes: 29 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from psycopg2.extras import DictCursor
from slugify import slugify
from tqdm import tqdm
from .post_processing.processor import PostProcessor

# Reader imports
from src.config import (
Expand Down Expand Up @@ -640,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params):
os.remove(query_path)

@staticmethod
def query2geojson(con, extraction_query, dump_temp_file_path):
def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn=None):
"""Function written from scratch without being dependent on any library, Provides better performance for geojson binding"""
# creating geojson file
pre_geojson = """{"type": "FeatureCollection","features": ["""
Expand All @@ -660,10 +661,12 @@ def query2geojson(con, extraction_query, dump_temp_file_path):
for row in cursor:
if first:
first = False
f.write(row[0])
else:
f.write(",")
f.write(row[0])
if plugin_fn:
f.write(plugin_fn(row[0]))
else:
f.write((row[0]))
cursor.close() # closing connection to avoid memory issues
# close the writing geojson with last part
f.write(post_geojson)
Expand Down Expand Up @@ -711,7 +714,7 @@ def get_grid_id(geom, cur):
country_export,
)

def extract_current_data(self, exportname):
def extract_current_data(self, exportname, plugin_fn=None):
"""Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump
Args:
exportname: takes filename as argument to create geojson file passed from routers
Expand Down Expand Up @@ -777,6 +780,7 @@ def extract_current_data(self, exportname):
country_export=country_export,
),
dump_temp_file_path,
plugin_fn,
) # uses own conversion class
if output_type == RawDataOutputType.SHAPEFILE.value:
(
Expand Down Expand Up @@ -1488,8 +1492,29 @@ def process_export_format(export_format):
layer_creation_options=layer_creation_options_str,
query_dump_path=export_format_path,
)

run_ogr2ogr_cmd(ogr2ogr_cmd)

# Post-processing GeoJSON files
# Adds: stats, HTML stats summary and transliterations
if export_format.driver_name == "GeoJSON" and (
self.params.include_stats or self.params.include_translit
):
post_processor = PostProcessor(
{
"include_stats": self.params.include_stats,
"include_translit": self.params.include_translit,
"include_stats_html": self.params.include_stats_html,
}
)
post_processor.init()
post_processor.custom(
categories=self.params.categories,
export_format_path=export_format_path,
export_filename=export_filename,
file_export_path=file_export_path,
)

zip_file_path = os.path.join(file_export_path, f"{export_filename}.zip")
zip_path = self.file_to_zip(export_format_path, zip_file_path)

Expand Down
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def not_raises(func, *args, **kwargs):
logging.error(
"Error creating HDX configuration: %s, Disabling the hdx exports feature", e
)

ENABLE_HDX_EXPORTS = False

if ENABLE_HDX_EXPORTS:
Expand Down
Empty file added src/post_processing/__init__.py
Empty file.
61 changes: 61 additions & 0 deletions src/post_processing/geojson_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from geojson_stats.stats import Stats
from geojson_stats.html import Html

CONFIG_AREA = ["building"]
CONFIG_LENGTH = ["highway", "waterway"]


class GeoJSONStats(Stats):
"""Used for collecting stats while processing GeoJSON files line by line"""

def __init__(self, filters, *args, **kwargs):
super().__init__(*args, **kwargs)

self.config.clean = True
self.config.properties_prop = "properties.tags"

if filters and filters.tags:
for tag in CONFIG_AREA:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.area = True

for tag in CONFIG_LENGTH:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.length = True

def check_filter(self, tags, tag):
"""
Check if a tag is present in tag filters
"""

if tags.all_geometry:
if tags.all_geometry.join_or and tag in tags.all_geometry.join_or:
return True
if tags.all_geometry.join_and and tag in tags.all_geometry.join_and:
return True
if tags.polygon:
if tags.polygon.join_or and tag in tags.polygon.join_or:
return True
if tags.polygon.join_and and tag in tags.polygon.join_and:
return True
if tags.line:
if tags.line.join_or and tag in tags.line.join_or:
return True
if tags.line.join_and and tag in tags.line.join_and:
return True

def raw_data_line_stats(self, json_object: dict):
"""
Process a GeoJSON line (for getting stats) and return that line
"""
self.get_object_stats(json_object)

def html(self, tpl):
"""
Returns stats Html object, generated from stats data using a template
"""
return Html(tpl, self)
Loading

0 comments on commit 7d699b0

Please sign in to comment.