Skip to content

Commit

Permalink
Add enriched_logbook flow
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Mar 2, 2024
1 parent d593368 commit 38fecc1
Show file tree
Hide file tree
Showing 2 changed files with 684 additions and 61 deletions.
296 changes: 247 additions & 49 deletions datascience/src/pipeline/flows/enrich_logbook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from logging import Logger
from pathlib import Path

import duckdb
import pandas as pd
import prefect
from prefect import Flow, Parameter, case, task, unmapped
Expand Down Expand Up @@ -42,9 +43,10 @@ def reset_pnos(period: Period):
" enriched = false,"
" trip_gears = NULL,"
" pno_types = NULL,"
" trip_segments = NULL"
"WHERE p.date_time >= :start "
"AND p.date_time <= :end;"
" trip_segments = NULL "
"WHERE p.operation_datetime_utc >= :start "
"AND p.operation_datetime_utc <= :end "
"AND log_type = 'PNO';"
),
{
"start": period.start,
Expand Down Expand Up @@ -97,13 +99,221 @@ def extract_pno_species_and_gears(
def compute_pno_segments(
pno_species_and_gears: pd.DataFrame, segments: pd.DataFrame
) -> pd.DataFrame:
pass
"""
Computes the segments of the input PNO species and gears.
Args:
pno_species_and_gears (pd.DataFrame): DataFrame of PNO species. 1 line = catch.
Must have columns :
- logbook_reports_pno_id `int` `1`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
- species `str` `'COD'`
- fao_area `str` `'27.7.d'`
- year `int` `2022`
segments (pd.DataFrame): DataFrame of segments definitions. 1 line = 1 segment.
Must have columns :
- year `int` `2022`
- segment `str` `SWW1`
- segment_name `str` `Nom du segment`
- gears `List[str]` `["OTB", ...]`
- fao_areas `List[str]` `["27.8", ...]`
- species `List[str]` `["COD", ...]`
Returns:
pd.DataFrame: DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO.
Has columns:
- logbook_reports_pno_id `int` `1`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
- pno_types `List[dict]`
```[
{
"pno_type_name": "Type 1",
"minimum_notification_period": 4.0,
"has_designated_ports": True
},
{...}
]```
"""

db = duckdb.connect()

res = db.sql(
"""
WITH trip_gear_codes AS (
SELECT logbook_reports_pno_id, ARRAY_AGG(DISTINCT gear->>'gear') AS trip_gear_codes
FROM pno_species_and_gears, unnest(trip_gears) AS t(gear)
GROUP BY logbook_reports_pno_id
),
trip_ids AS (
SELECT DISTINCT logbook_reports_pno_id
FROM pno_species_and_gears
),
pnos_with_segments AS (
SELECT
sg.logbook_reports_pno_id,
LIST_SORT(ARRAY_AGG(DISTINCT {
'segment': s.segment,
'segment_name': s.segment_name
})) AS trip_segments
FROM pno_species_and_gears sg
LEFT JOIN trip_gear_codes tgc
ON tgc.logbook_reports_pno_id = sg.logbook_reports_pno_id
JOIN segments s
ON
(sg.species = ANY(s.species) OR s.species = '[]'::VARCHAR[]) AND
(list_has_any(tgc.trip_gear_codes, s.gears) OR s.gears = '[]'::VARCHAR[]) AND
(length(filter(s.fao_areas, a -> sg.fao_area LIKE a || '%')) > 0 OR s.fao_areas = '[]'::VARCHAR[]) AND
s.year = sg.year
GROUP BY 1
)
SELECT t.logbook_reports_pno_id, s.trip_segments
FROM trip_ids t
LEFT JOIN pnos_with_segments s
ON t.logbook_reports_pno_id = s.logbook_reports_pno_id
ORDER BY 1
"""
).to_df()

return res


def compute_pno_types(
pno_species_and_gears: pd.DataFrame, pno_types: pd.DataFrame
) -> pd.DataFrame:
breakpoint()
"""
Computes the PNO types of the input PNO species and gears.
Args:
pno_species_and_gears (pd.DataFrame): DataFrame of PNO species. 1 line = catch.
Must have columns :
- logbook_reports_pno_id `int` `1`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
- species `str` `'COD'`
- fao_area `str` `'27.7.d'`
- flag_state `str` `'FRA'`
- weight `float` `150.5`
pno_types (pd.DataFrame): DataFrame of pno_types definitions. 1 line = 1 rule.
Must have columns :
- pno_type_id `int` `1`
- pno_type_name `str` `"Ports désignés thon rouge"`
- minimum_notification_period `float` `4.0`
- has_designated_ports `bool` `True`
- pno_type_rule_id `int` `1`
- species `List[str]` `["COD", ...]`
- gears `List[str]` `["OTB", ...]`
- fao_areas `List[str]` `["27.8", ...]`
- flag_states `List[str]` `["GBR", ...]`
- minimum_quantity_kg `float` `2500.0`
Returns:
pd.DataFrame: DataFrame of PNOs with attributed PNO types. 1 line = 1 PNO.
Has columns:
- logbook_reports_pno_id `int` `1`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
- pno_types `List[dict]`
```[
{
"pno_type_name": "Type 1",
"minimum_notification_period": 4.0,
"has_designated_ports": True
},
{...}
]```
"""
db = duckdb.connect()

res = db.sql(
"""
WITH trip_gear_codes AS (
SELECT logbook_reports_pno_id, ARRAY_AGG(DISTINCT gear->>'gear') AS trip_gear_codes
FROM pno_species_and_gears, unnest(trip_gears) AS t(gear)
GROUP BY logbook_reports_pno_id
),
pnos_pno_types_tmp AS (
SELECT
sg.logbook_reports_pno_id,
t.pno_type_name,
t.minimum_notification_period,
t.has_designated_ports,
t.minimum_quantity_kg,
SUM(COALESCE(weight, 0)) OVER (PARTITION BY sg.logbook_reports_pno_id, pno_type_rule_id) AS pno_quantity_kg
FROM pno_species_and_gears sg
LEFT JOIN trip_gear_codes tgc
ON tgc.logbook_reports_pno_id = sg.logbook_reports_pno_id
JOIN pno_types t
ON
(sg.species = ANY(t.species) OR t.species = '[]'::VARCHAR[]) AND
(list_has_any(tgc.trip_gear_codes, t.gears) OR t.gears = '[]'::VARCHAR[]) AND
(length(filter(t.fao_areas, a -> sg.fao_area LIKE a || '%')) > 0 OR t.fao_areas = '[]'::VARCHAR[]) AND
(sg.flag_state = ANY(t.flag_states) OR t.flag_states = '[]'::VARCHAR[])
),
pnos_pno_types AS (
SELECT
logbook_reports_pno_id,
LIST_SORT(ARRAY_AGG(DISTINCT {
'pno_type_name': pno_type_name,
'minimum_notification_period': minimum_notification_period,
'has_designated_ports': has_designated_ports
})) AS pno_types
FROM pnos_pno_types_tmp
WHERE pno_quantity_kg >= minimum_quantity_kg
GROUP BY logbook_reports_pno_id
),
pnos_trip_gears AS (
SELECT DISTINCT ON (logbook_reports_pno_id)
logbook_reports_pno_id,
LIST_SORT(trip_gears) AS trip_gears
FROM pno_species_and_gears
ORDER BY logbook_reports_pno_id
)
SELECT
g.logbook_reports_pno_id,
g.trip_gears,
t.pno_types
FROM pnos_trip_gears g
LEFT JOIN pnos_pno_types t
ON g.logbook_reports_pno_id = t.logbook_reports_pno_id
ORDER BY g.logbook_reports_pno_id
"""
).to_df()

return res


def merge_segments_and_types(
pnos_with_types: pd.DataFrame, pnos_with_segments: pd.DataFrame
) -> pd.DataFrame:
"""
Merges the input DataFrames on `logbook_reports_pno_id`
Args:
pnos_with_types (pd.DataFrame): DataFrame of PNOs with their types
pnos_with_segments (pd.DataFrame): DataFrame of PNOs with their segments
Returns:
pd.DataFrame: DataFrame of PNOs with their types and segments
"""

return pd.merge(pnos_with_types, pnos_with_segments, on="logbook_reports_pno_id")


def load_enriched_pnos(enriched_pnos: pd.DataFrame, period: Period, logger: Logger):
Expand All @@ -114,13 +324,11 @@ def load_enriched_pnos(enriched_pnos: pd.DataFrame, period: Period, logger: Logg
connection.execute(
text(
"CREATE TEMP TABLE tmp_enriched_pnos("
" id INTEGER PRIMARY KEY,"
" is_at_port BOOLEAN,"
" meters_from_previous_position REAL,"
" time_since_previous_position INTERVAL,"
" average_speed REAL,"
" is_fishing BOOLEAN,"
" time_emitting_at_sea INTERVAL"
" logbook_reports_pno_id INTEGER PRIMARY KEY,"
" enriched BOOLEAN,"
" trip_gears JSONB,"
" pno_types JSONB,"
" trip_segments JSONB"
")"
"ON COMMIT DROP;"
)
Expand All @@ -129,16 +337,14 @@ def load_enriched_pnos(enriched_pnos: pd.DataFrame, period: Period, logger: Logg
enriched_pnos = prepare_df_for_loading(
enriched_pnos,
logger,
jsonb_columns=["trip_gears", "pno_types", "trip_segments"],
)

columns_to_load = [
"id",
"is_at_port",
"meters_from_previous_position",
"time_since_previous_position",
"average_speed",
"is_fishing",
"time_emitting_at_sea",
"logbook_reports_pno_id",
"trip_gears",
"pno_types",
"trip_segments",
]

logger.info("Loading to temporary table")
Expand All @@ -155,33 +361,17 @@ def load_enriched_pnos(enriched_pnos: pd.DataFrame, period: Period, logger: Logg

connection.execute(
text(
"UPDATE public.logbook p "
"SET "
" is_at_port = ep.is_at_port, "
" meters_from_previous_position = COALESCE( "
" ep.meters_from_previous_position, "
" p.meters_from_previous_position "
" ), "
" time_since_previous_position = COALESCE( "
" ep.time_since_previous_position, "
" p.time_since_previous_position "
" ), "
" average_speed = COALESCE( "
" ep.average_speed, "
" p.average_speed "
" ), "
" is_fishing = COALESCE( "
" ep.is_fishing, "
" p.is_fishing "
" ),"
" time_emitting_at_sea = COALESCE( "
" ep.time_emitting_at_sea, "
" p.time_emitting_at_sea "
" )"
"UPDATE public.logbook_reports r "
"SET"
" enriched = true,"
" trip_gears = COALESCE(ep.trip_gears, '[]'::jsonb),"
" pno_types = COALESCE(ep.pno_types, '[]'::jsonb),"
" trip_segments = COALESCE(ep.trip_segments, '[]'::jsonb) "
"FROM tmp_enriched_pnos ep "
"WHERE p.id = ep.id "
"AND p.date_time >= :start "
"AND p.date_time <= :end;"
"WHERE r.id = ep.logbook_reports_pno_id "
"AND r.operation_datetime_utc >= :start "
"AND r.operation_datetime_utc <= :end "
"AND log_type = 'PNO';"
),
{
"start": period.start,
Expand All @@ -205,21 +395,29 @@ def extract_enrich_load_logbook(
logger = prefect.context.get("logger")
logger.info(f"Processing pnos for period {period.start} - {period.end}.")

trips_period = extract_pno_trips_period()
trips_period = extract_pno_trips_period(period)

logger.info("Extracting PNO...")
logger.info("Extracting PNOs...")
pnos_species_and_gears = extract_pno_species_and_gears(
pno_emission_period=period, trips_period=trips_period
)
logger.info(
f"Extracted {len(pnos_species_and_gears)} PNO species from {pnos_species_and_gears.id.nunique()} PNOs."
f"Extracted {len(pnos_species_and_gears)} PNO species from {pnos_species_and_gears.logbook_reports_pno_id.nunique()} PNOs."
)

logger.info("Computing PNO segments...")
pnos = compute_pno_segments(
pnos_with_segments = compute_pno_segments(
pno_species_and_gears=pnos_species_and_gears, segments=segments
)

logger.info("Computing PNO types...")
pnos_with_types = compute_pno_types(
pno_species_and_gears=pnos_species_and_gears, pno_types=pno_types
)

logger.info("Merging PNO types and segments...")
pnos = merge_segments_and_types(pnos_with_types, pnos_with_segments)

logger.info("Loading")
load_enriched_pnos(pnos, period, logger)

Expand Down
Loading

0 comments on commit 38fecc1

Please sign in to comment.