Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/pdc data update #11

Merged
merged 15 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions imminent/management/commands/check_pdc_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ class Command(BaseCommand):
@monitor(monitor_slug=SentryMonitor.CHECK_PDC_STATUS)
def handle(self, *args, **options):
today_date = timezone.now().date()
resp = Pdc.objects.filter(
queryset = Pdc.objects.filter(
status=Pdc.Status.ACTIVE,
end_date__lt=today_date,
).update(
status=Pdc.Status.EXPIRED,
)
resp = queryset.update(status=Pdc.Status.EXPIRED)
print(f"Updated: {resp}")
55 changes: 11 additions & 44 deletions imminent/management/commands/create_pdc_daily.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,26 @@
import logging

import requests
from django.conf import settings
from django.core.management.base import BaseCommand
from sentry_sdk.crons import monitor

from common.models import HazardType
from common.utils import logging_response_context
from imminent.models import Pdc
from imminent.sources.pdc import SentryPdcSource
from risk_module.sentry import SentryMonitor

from .create_pdc_data import Command as CreatePdcDataCommand

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Import Active Hazards"

def save_pdc_data(self, hazard_type: HazardType, data):
pdc_updated_at = CreatePdcDataCommand.parse_timestamp(data["last_Update"])

# XXX: This was only done for WILDFIRE before??
existing_qs = Pdc.objects.filter(
uuid=data["uuid"],
hazard_type=hazard_type,
pdc_updated_at=pdc_updated_at,
)
if existing_qs.exists():
return

pdc_data = {
"hazard_id": data["hazard_ID"],
"hazard_name": data["hazard_Name"],
"latitude": data["latitude"],
"longitude": data["longitude"],
"description": data["description"],
"hazard_type": hazard_type,
"uuid": data["uuid"],
"start_date": CreatePdcDataCommand.parse_timestamp(data["start_Date"]),
"end_date": CreatePdcDataCommand.parse_timestamp(data["end_Date"]),
"status": Pdc.Status.ACTIVE,
"pdc_created_at": CreatePdcDataCommand.parse_timestamp(data["create_Date"]),
"pdc_updated_at": pdc_updated_at,
# XXX: Severity was not saved here compare to create_pdc_data
}
Pdc.objects.get_or_create(**pdc_data)

@monitor(monitor_slug=SentryMonitor.CREATE_PDC_DAILY)
def handle(self, **_):
# NOTE: Use the search hazard api for the information download
# make sure to use filter the data
url = "https://sentry.pdc.org/hp_srv/services/hazards/t/json/get_active_hazards"
headers = {"Authorization": "Bearer {}".format(settings.PDC_ACCESS_TOKEN)}
response = requests.get(url, headers=headers)
response = requests.get(
SentryPdcSource.URL.PDC_ACTIVE_HAZARD,
headers=SentryPdcSource.authorization_headers(),
)
if response.status_code != 200:
logger.error(
"Error querying PDC data",
Expand All @@ -63,13 +30,13 @@ def handle(self, **_):

response_data = response.json()
for data in response_data:
# NOTE: Filter the active hazard only
# Update the hazard if it has expired
hazard_status = data["status"]
hazard_status = data["status"].upper()

uuid = data["uuid"]
# Tag expired hazards
if hazard_status == "E":
Pdc.objects.filter(uuid=data["uuid"]).update(status=Pdc.Status.EXPIRED)
Pdc.objects.filter(uuid=uuid).update(status=Pdc.Status.EXPIRED)

# Process active hazards
elif hazard_status == "A":
if hazard_type := CreatePdcDataCommand.HAZARD_TYPE_MAP.get(data["type_ID"].upper()):
self.save_pdc_data(hazard_type, data)
SentryPdcSource.save_pdc_data(uuid, SentryPdcSource.parse_response_data(data))
84 changes: 14 additions & 70 deletions imminent/management/commands/create_pdc_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
import logging

import requests
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import timezone
from sentry_sdk.crons import monitor

from common.models import HazardType
from common.utils import logging_response_context
from imminent.models import Pdc
from imminent.sources.pdc import SentryPdcSource
from risk_module.sentry import SentryMonitor

logger = logging.getLogger(__name__)
Expand All @@ -18,76 +16,22 @@
class Command(BaseCommand):
help = "Import Active Hazards"

SEVERITY_MAP = {
"WARNING": Pdc.Severity.WARNING,
"WATCH": Pdc.Severity.WATCH,
"ADVISORY": Pdc.Severity.ADVISORY,
"INFORMATION": Pdc.Severity.INFORMATION,
}

HAZARD_TYPE_MAP = {
"FLOOD": HazardType.FLOOD,
"CYCLONE": HazardType.CYCLONE,
"STORM": HazardType.STORM,
"DROUGHT": HazardType.DROUGHT,
"WIND": HazardType.WIND,
"TSUNAMI": HazardType.TSUNAMI,
"EARTHQUAKE": HazardType.EARTHQUAKE,
"WILDFIRE": HazardType.WILDFIRE,
}

@staticmethod
def parse_timestamp(timestamp):
# NOTE: all timestamp are in millisecond and with timezone `utc`
return timezone.make_aware(
# FIXME: Using deprecated function
datetime.datetime.utcfromtimestamp(int(timestamp) / 1000)
)

def save_pdc_data(self, hazard_type: HazardType, data):
pdc_updated_at = self.parse_timestamp(data["last_Update"])

existing_qs = Pdc.objects.filter(
uuid=data["uuid"],
hazard_type=hazard_type,
pdc_updated_at=pdc_updated_at,
)
if existing_qs.exists():
return

pdc_data = {
"hazard_id": data["hazard_ID"],
"hazard_name": data["hazard_Name"],
"latitude": data["latitude"],
"longitude": data["longitude"],
"description": data["description"],
"hazard_type": hazard_type,
"uuid": data["uuid"],
"start_date": self.parse_timestamp(data["start_Date"]),
"end_date": self.parse_timestamp(data["end_Date"]),
"status": Pdc.Status.ACTIVE,
"pdc_created_at": self.parse_timestamp(data["create_Date"]),
"pdc_updated_at": pdc_updated_at,
"severity": self.SEVERITY_MAP.get(data["severity_ID"].upper()),
}
Pdc.objects.get_or_create(**pdc_data)

@monitor(monitor_slug=SentryMonitor.CREATE_PDC_DATA)
def handle(self, **_):
# NOTE: Use the search hazard api for the information download
# make sure to use filter the data
url = "https://sentry.pdc.org/hp_srv/services/hazards/t/json/search_hazard"
headers = {"Authorization": "Bearer {}".format(settings.PDC_ACCESS_TOKEN)}
# make sure to use the datetime now and timestamp for the post data
# current date and time
# NOTE: Make sure to use the datetime now and timestamp for the post data, current date and time
now = datetime.datetime.now()
today_timestmap = str(datetime.datetime.timestamp(now)).replace(".", "")
data = {
"pagination": {"page": 1, "pagesize": 100},
"order": {"orderlist": {"updateDate": "DESC"}},
"restrictions": [[{"searchType": "LESS_THAN", "updateDate": today_timestmap}]],
}
response = requests.post(url, headers=headers, json=data)
response = requests.post(
SentryPdcSource.URL.PDC_SEARCH_HAZARD,
headers=SentryPdcSource.authorization_headers(),
json=data,
timeout=10 * 60,
)
if response.status_code != 200:
logger.error(
"Error querying PDC data",
Expand All @@ -97,13 +41,13 @@ def handle(self, **_):

response_data = response.json()
for data in response_data:
# NOTE: Filter the active hazard only
# Update the hazard if it has expired
hazard_status = data["status"]
hazard_status = data["status"].upper()

uuid = data["uuid"]
# Tag expired hazards
if hazard_status == "E":
Pdc.objects.filter(uuid=data["uuid"]).update(status=Pdc.Status.EXPIRED)
Pdc.objects.filter(uuid=uuid).update(status=Pdc.Status.EXPIRED)

# Process active hazards
elif hazard_status == "A":
if hazard_type := self.HAZARD_TYPE_MAP.get(data["type_ID"].upper()):
self.save_pdc_data(hazard_type, data)
SentryPdcSource.save_pdc_data(uuid, SentryPdcSource.parse_response_data(data))
48 changes: 23 additions & 25 deletions imminent/management/commands/create_pdc_displacement.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging

import requests
from django.conf import settings
from django.core.management.base import BaseCommand
from sentry_sdk.crons import monitor

from common.models import Country
from common.utils import logging_response_context
from imminent.models import Pdc, PdcDisplacement
from imminent.sources.pdc import SentryPdcSource
from imminent.sources.utils import CountryQuery
from risk_module.sentry import SentryMonitor

logger = logging.getLogger(__name__)
Expand All @@ -18,9 +18,10 @@ class Command(BaseCommand):

@staticmethod
def fetch_pdc_data(uuid):
url = f"https://sentry.pdc.org/hp_srv/services/hazard/{uuid}/exposure/latest/"
headers = {"Authorization": f"Bearer {settings.PDC_ACCESS_TOKEN}"}
response = requests.get(url, headers=headers)
response = requests.get(
SentryPdcSource.URL.get_hazard_exposure_latest(uuid),
headers=SentryPdcSource.authorization_headers(),
)

if response.status_code != 200:
logger.error(
Expand All @@ -32,20 +33,18 @@ def fetch_pdc_data(uuid):
return response.json()

@staticmethod
def create_pdc_displacement(pdc, hazard_type, data):
def create_pdc_displacement(country_query: CountryQuery, pdc: Pdc, total_by_country_data):
pdc_displacement_list = []

for d in data:
for d in total_by_country_data:
iso3 = d["country"].lower()
country = Country.objects.filter(iso3=iso3).first()

if country:
if country := country_query.get_by_iso3(iso3):
c_data = {
"country": country,
"hazard_type": hazard_type,
"hazard_type": pdc.hazard_type,
"pdc": pdc,
"population_exposure": d["population"],
"capital_exposure": d["capital"],
"pdc": pdc,
}
pdc_displacement_list.append(PdcDisplacement(**c_data))

Expand All @@ -54,16 +53,15 @@ def create_pdc_displacement(pdc, hazard_type, data):
@monitor(monitor_slug=SentryMonitor.CREATE_PDC_DISPLACEMENT)
def handle(self, **_):

uuids = Pdc.objects.filter(status=Pdc.Status.ACTIVE).values_list("uuid", "hazard_type", "pdc_updated_at")

for uuid, hazard_type, pdc_updated_at in uuids:
if not PdcDisplacement.objects.filter(
pdc__uuid=uuid,
pdc__hazard_type=hazard_type,
pdc__pdc_updated_at=pdc_updated_at,
).exists():
pdc = Pdc.objects.filter(uuid=uuid).last()
response_data = self.fetch_pdc_data(uuid)

if response_data:
self.create_pdc_displacement(pdc, hazard_type, response_data.get("totalByCountry"))
country_query = CountryQuery()
pdc_qs = Pdc.objects.filter(
status=Pdc.Status.ACTIVE,
stale_displacement=True,
)

for pdc in pdc_qs:
PdcDisplacement.objects.filter(pdc=pdc).delete()
if response_data := self.fetch_pdc_data(pdc.uuid):
self.create_pdc_displacement(country_query, pdc, response_data.get("totalByCountry"))
pdc.stale_displacement = False
pdc.save(update_fields=("stale_displacement",))
63 changes: 63 additions & 0 deletions imminent/management/commands/create_pdc_five_days_cou.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import typing
from collections import defaultdict

from django.core.management.base import BaseCommand
from django.db import models
from sentry_sdk.crons import monitor

from common.models import HazardType
from imminent.models import Pdc
from imminent.sources.pdc import ArcGisPdcSource
from risk_module.sentry import SentryMonitor


class Command(ArcGisPdcSource.CommandMixin, BaseCommand):
help = "Get PDC 5 days Cone of Uncertainty data"

def get_pdc_queryset(self) -> models.QuerySet[Pdc]:
return super().get_pdc_queryset().filter(hazard_type=HazardType.CYCLONE)

def save_pdc_using_uuid(self, session, uuid_list: typing.List[str]) -> bool:
"""
Query and save pdc polygon data from Arc GIS for given uuids
"""
# Fetch data for multiple uuids
url = (
"https://partners.pdc.org/arcgis/rest/services/partners/pdc_active_hazards_partners/MapServer/13/query" # noqa: E501
)
response = session.post(
url=url,
data={
**ArcGisPdcSource.ARC_GIS_DEFAULT_PARAMS,
# NOTE: Each new request has 6s+ response time, using where in query we reduce that latency
"where": self.generate_uuid_where_statement("uuid", uuid_list),
"outFields": (
"uuid,"
"hazard_name,storm_name,advisory_number,severity,objectid,shape,ESRI_OID,category_id,source,uuid,hazard_id"
),
},
)

# Save data for multiple uuids
response_data = response.json()

is_valid = self.is_valid_arc_response(response_data)
if not is_valid:
return False

# NOTE: Here multiple features are return per uuid as which are used as FeatureCollection in go-web-app
feature_by_uuid = defaultdict(list)
for feature in response_data["features"]:
_uuid = feature["properties"]["uuid"]
feature_by_uuid[_uuid].append(feature)

for _uuid, features in feature_by_uuid.items():
# XXX: Multiple row has same uuid
Pdc.objects.filter(uuid=_uuid).update(
cyclone_five_days_cou=features,
)
return True

@monitor(monitor_slug=SentryMonitor.CREATE_PDC_FIVE_DAYS_COU)
def handle(self, **_):
self.process()
Loading
Loading