Skip to content

Commit

Permalink
Merge pull request #11 from IFRCGo/feat/pdc-data-update
Browse files Browse the repository at this point in the history
Feat/pdc data update
  • Loading branch information
thenav56 authored Oct 9, 2024
2 parents 35045b5 + c4eec19 commit ecf0f0a
Show file tree
Hide file tree
Showing 21 changed files with 716 additions and 360 deletions.
5 changes: 2 additions & 3 deletions imminent/management/commands/check_pdc_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ class Command(BaseCommand):
@monitor(monitor_slug=SentryMonitor.CHECK_PDC_STATUS)
def handle(self, *args, **options):
today_date = timezone.now().date()
resp = Pdc.objects.filter(
queryset = Pdc.objects.filter(
status=Pdc.Status.ACTIVE,
end_date__lt=today_date,
).update(
status=Pdc.Status.EXPIRED,
)
resp = queryset.update(status=Pdc.Status.EXPIRED)
print(f"Updated: {resp}")
55 changes: 11 additions & 44 deletions imminent/management/commands/create_pdc_daily.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,26 @@
import logging

import requests
from django.conf import settings
from django.core.management.base import BaseCommand
from sentry_sdk.crons import monitor

from common.models import HazardType
from common.utils import logging_response_context
from imminent.models import Pdc
from imminent.sources.pdc import SentryPdcSource
from risk_module.sentry import SentryMonitor

from .create_pdc_data import Command as CreatePdcDataCommand

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Import Active Hazards"

def save_pdc_data(self, hazard_type: HazardType, data):
pdc_updated_at = CreatePdcDataCommand.parse_timestamp(data["last_Update"])

# XXX: This was only done for WILDFIRE before??
existing_qs = Pdc.objects.filter(
uuid=data["uuid"],
hazard_type=hazard_type,
pdc_updated_at=pdc_updated_at,
)
if existing_qs.exists():
return

pdc_data = {
"hazard_id": data["hazard_ID"],
"hazard_name": data["hazard_Name"],
"latitude": data["latitude"],
"longitude": data["longitude"],
"description": data["description"],
"hazard_type": hazard_type,
"uuid": data["uuid"],
"start_date": CreatePdcDataCommand.parse_timestamp(data["start_Date"]),
"end_date": CreatePdcDataCommand.parse_timestamp(data["end_Date"]),
"status": Pdc.Status.ACTIVE,
"pdc_created_at": CreatePdcDataCommand.parse_timestamp(data["create_Date"]),
"pdc_updated_at": pdc_updated_at,
# XXX: Severity was not saved here compare to create_pdc_data
}
Pdc.objects.get_or_create(**pdc_data)

@monitor(monitor_slug=SentryMonitor.CREATE_PDC_DAILY)
def handle(self, **_):
# NOTE: Use the search hazard api for the information download
# make sure to use filter the data
url = "https://sentry.pdc.org/hp_srv/services/hazards/t/json/get_active_hazards"
headers = {"Authorization": "Bearer {}".format(settings.PDC_ACCESS_TOKEN)}
response = requests.get(url, headers=headers)
response = requests.get(
SentryPdcSource.URL.PDC_ACTIVE_HAZARD,
headers=SentryPdcSource.authorization_headers(),
)
if response.status_code != 200:
logger.error(
"Error querying PDC data",
Expand All @@ -63,13 +30,13 @@ def handle(self, **_):

response_data = response.json()
for data in response_data:
# NOTE: Filter the active hazard only
# Update the hazard if it has expired
hazard_status = data["status"]
hazard_status = data["status"].upper()

uuid = data["uuid"]
# Tag expired hazards
if hazard_status == "E":
Pdc.objects.filter(uuid=data["uuid"]).update(status=Pdc.Status.EXPIRED)
Pdc.objects.filter(uuid=uuid).update(status=Pdc.Status.EXPIRED)

# Process active hazards
elif hazard_status == "A":
if hazard_type := CreatePdcDataCommand.HAZARD_TYPE_MAP.get(data["type_ID"].upper()):
self.save_pdc_data(hazard_type, data)
SentryPdcSource.save_pdc_data(uuid, SentryPdcSource.parse_response_data(data))
84 changes: 14 additions & 70 deletions imminent/management/commands/create_pdc_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
import logging

import requests
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import timezone
from sentry_sdk.crons import monitor

from common.models import HazardType
from common.utils import logging_response_context
from imminent.models import Pdc
from imminent.sources.pdc import SentryPdcSource
from risk_module.sentry import SentryMonitor

logger = logging.getLogger(__name__)
Expand All @@ -18,76 +16,22 @@
class Command(BaseCommand):
help = "Import Active Hazards"

SEVERITY_MAP = {
"WARNING": Pdc.Severity.WARNING,
"WATCH": Pdc.Severity.WATCH,
"ADVISORY": Pdc.Severity.ADVISORY,
"INFORMATION": Pdc.Severity.INFORMATION,
}

HAZARD_TYPE_MAP = {
"FLOOD": HazardType.FLOOD,
"CYCLONE": HazardType.CYCLONE,
"STORM": HazardType.STORM,
"DROUGHT": HazardType.DROUGHT,
"WIND": HazardType.WIND,
"TSUNAMI": HazardType.TSUNAMI,
"EARTHQUAKE": HazardType.EARTHQUAKE,
"WILDFIRE": HazardType.WILDFIRE,
}

@staticmethod
def parse_timestamp(timestamp):
# NOTE: all timestamp are in millisecond and with timezone `utc`
return timezone.make_aware(
# FIXME: Using deprecated function
datetime.datetime.utcfromtimestamp(int(timestamp) / 1000)
)

def save_pdc_data(self, hazard_type: HazardType, data):
pdc_updated_at = self.parse_timestamp(data["last_Update"])

existing_qs = Pdc.objects.filter(
uuid=data["uuid"],
hazard_type=hazard_type,
pdc_updated_at=pdc_updated_at,
)
if existing_qs.exists():
return

pdc_data = {
"hazard_id": data["hazard_ID"],
"hazard_name": data["hazard_Name"],
"latitude": data["latitude"],
"longitude": data["longitude"],
"description": data["description"],
"hazard_type": hazard_type,
"uuid": data["uuid"],
"start_date": self.parse_timestamp(data["start_Date"]),
"end_date": self.parse_timestamp(data["end_Date"]),
"status": Pdc.Status.ACTIVE,
"pdc_created_at": self.parse_timestamp(data["create_Date"]),
"pdc_updated_at": pdc_updated_at,
"severity": self.SEVERITY_MAP.get(data["severity_ID"].upper()),
}
Pdc.objects.get_or_create(**pdc_data)

@monitor(monitor_slug=SentryMonitor.CREATE_PDC_DATA)
def handle(self, **_):
# NOTE: Use the search hazard api for the information download
# make sure to use filter the data
url = "https://sentry.pdc.org/hp_srv/services/hazards/t/json/search_hazard"
headers = {"Authorization": "Bearer {}".format(settings.PDC_ACCESS_TOKEN)}
# make sure to use the datetime now and timestamp for the post data
# current date and time
# NOTE: Make sure to use the datetime now and timestamp for the post data, current date and time
now = datetime.datetime.now()
today_timestmap = str(datetime.datetime.timestamp(now)).replace(".", "")
data = {
"pagination": {"page": 1, "pagesize": 100},
"order": {"orderlist": {"updateDate": "DESC"}},
"restrictions": [[{"searchType": "LESS_THAN", "updateDate": today_timestmap}]],
}
response = requests.post(url, headers=headers, json=data)
response = requests.post(
SentryPdcSource.URL.PDC_SEARCH_HAZARD,
headers=SentryPdcSource.authorization_headers(),
json=data,
timeout=10 * 60,
)
if response.status_code != 200:
logger.error(
"Error querying PDC data",
Expand All @@ -97,13 +41,13 @@ def handle(self, **_):

response_data = response.json()
for data in response_data:
# NOTE: Filter the active hazard only
# Update the hazard if it has expired
hazard_status = data["status"]
hazard_status = data["status"].upper()

uuid = data["uuid"]
# Tag expired hazards
if hazard_status == "E":
Pdc.objects.filter(uuid=data["uuid"]).update(status=Pdc.Status.EXPIRED)
Pdc.objects.filter(uuid=uuid).update(status=Pdc.Status.EXPIRED)

# Process active hazards
elif hazard_status == "A":
if hazard_type := self.HAZARD_TYPE_MAP.get(data["type_ID"].upper()):
self.save_pdc_data(hazard_type, data)
SentryPdcSource.save_pdc_data(uuid, SentryPdcSource.parse_response_data(data))
48 changes: 23 additions & 25 deletions imminent/management/commands/create_pdc_displacement.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging

import requests
from django.conf import settings
from django.core.management.base import BaseCommand
from sentry_sdk.crons import monitor

from common.models import Country
from common.utils import logging_response_context
from imminent.models import Pdc, PdcDisplacement
from imminent.sources.pdc import SentryPdcSource
from imminent.sources.utils import CountryQuery
from risk_module.sentry import SentryMonitor

logger = logging.getLogger(__name__)
Expand All @@ -18,9 +18,10 @@ class Command(BaseCommand):

@staticmethod
def fetch_pdc_data(uuid):
url = f"https://sentry.pdc.org/hp_srv/services/hazard/{uuid}/exposure/latest/"
headers = {"Authorization": f"Bearer {settings.PDC_ACCESS_TOKEN}"}
response = requests.get(url, headers=headers)
response = requests.get(
SentryPdcSource.URL.get_hazard_exposure_latest(uuid),
headers=SentryPdcSource.authorization_headers(),
)

if response.status_code != 200:
logger.error(
Expand All @@ -32,20 +33,18 @@ def fetch_pdc_data(uuid):
return response.json()

@staticmethod
def create_pdc_displacement(pdc, hazard_type, data):
def create_pdc_displacement(country_query: CountryQuery, pdc: Pdc, total_by_country_data):
pdc_displacement_list = []

for d in data:
for d in total_by_country_data:
iso3 = d["country"].lower()
country = Country.objects.filter(iso3=iso3).first()

if country:
if country := country_query.get_by_iso3(iso3):
c_data = {
"country": country,
"hazard_type": hazard_type,
"hazard_type": pdc.hazard_type,
"pdc": pdc,
"population_exposure": d["population"],
"capital_exposure": d["capital"],
"pdc": pdc,
}
pdc_displacement_list.append(PdcDisplacement(**c_data))

Expand All @@ -54,16 +53,15 @@ def create_pdc_displacement(pdc, hazard_type, data):
@monitor(monitor_slug=SentryMonitor.CREATE_PDC_DISPLACEMENT)
def handle(self, **_):

uuids = Pdc.objects.filter(status=Pdc.Status.ACTIVE).values_list("uuid", "hazard_type", "pdc_updated_at")

for uuid, hazard_type, pdc_updated_at in uuids:
if not PdcDisplacement.objects.filter(
pdc__uuid=uuid,
pdc__hazard_type=hazard_type,
pdc__pdc_updated_at=pdc_updated_at,
).exists():
pdc = Pdc.objects.filter(uuid=uuid).last()
response_data = self.fetch_pdc_data(uuid)

if response_data:
self.create_pdc_displacement(pdc, hazard_type, response_data.get("totalByCountry"))
country_query = CountryQuery()
pdc_qs = Pdc.objects.filter(
status=Pdc.Status.ACTIVE,
stale_displacement=True,
)

for pdc in pdc_qs:
PdcDisplacement.objects.filter(pdc=pdc).delete()
if response_data := self.fetch_pdc_data(pdc.uuid):
self.create_pdc_displacement(country_query, pdc, response_data.get("totalByCountry"))
pdc.stale_displacement = False
pdc.save(update_fields=("stale_displacement",))
63 changes: 63 additions & 0 deletions imminent/management/commands/create_pdc_five_days_cou.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import typing
from collections import defaultdict

from django.core.management.base import BaseCommand
from django.db import models
from sentry_sdk.crons import monitor

from common.models import HazardType
from imminent.models import Pdc
from imminent.sources.pdc import ArcGisPdcSource
from risk_module.sentry import SentryMonitor


class Command(ArcGisPdcSource.CommandMixin, BaseCommand):
help = "Get PDC 5 days Cone of Uncertainty data"

def get_pdc_queryset(self) -> models.QuerySet[Pdc]:
return super().get_pdc_queryset().filter(hazard_type=HazardType.CYCLONE)

def save_pdc_using_uuid(self, session, uuid_list: typing.List[str]) -> bool:
"""
Query and save pdc polygon data from Arc GIS for given uuids
"""
# Fetch data for multiple uuids
url = (
"https://partners.pdc.org/arcgis/rest/services/partners/pdc_active_hazards_partners/MapServer/13/query" # noqa: E501
)
response = session.post(
url=url,
data={
**ArcGisPdcSource.ARC_GIS_DEFAULT_PARAMS,
# NOTE: Each new request has 6s+ response time, using where in query we reduce that latency
"where": self.generate_uuid_where_statement("uuid", uuid_list),
"outFields": (
"uuid,"
"hazard_name,storm_name,advisory_number,severity,objectid,shape,ESRI_OID,category_id,source,uuid,hazard_id"
),
},
)

# Save data for multiple uuids
response_data = response.json()

is_valid = self.is_valid_arc_response(response_data)
if not is_valid:
return False

# NOTE: Here multiple features are return per uuid as which are used as FeatureCollection in go-web-app
feature_by_uuid = defaultdict(list)
for feature in response_data["features"]:
_uuid = feature["properties"]["uuid"]
feature_by_uuid[_uuid].append(feature)

for _uuid, features in feature_by_uuid.items():
# XXX: Multiple row has same uuid
Pdc.objects.filter(uuid=_uuid).update(
cyclone_five_days_cou=features,
)
return True

@monitor(monitor_slug=SentryMonitor.CREATE_PDC_FIVE_DAYS_COU)
def handle(self, **_):
self.process()
Loading

0 comments on commit ecf0f0a

Please sign in to comment.