Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diffusion des préavis auto [pipeline] - flaguer isBeingSent un préavis hors scope uniquement si des unités le ciblent #3448

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 4 additions & 16 deletions datascience/src/pipeline/flows/distribute_pnos.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
)
from src.pipeline.shared_tasks.dates import get_utcnow, make_timedelta
from src.pipeline.shared_tasks.infrastructure import execute_statement
from src.pipeline.shared_tasks.pnos import (
extract_pno_units_ports_and_segments_subscriptions,
extract_pno_units_targeting_vessels,
)


@task(checkpoint=False)
Expand Down Expand Up @@ -110,22 +114,6 @@ def extract_pnos_to_generate(
return (pnos, generation_needed)


@task(checkpoint=False)
def extract_pno_units_targeting_vessels() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_targeting_vessels.sql",
)


@task(checkpoint=False)
def extract_pno_units_ports_and_segments_subscriptions() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_ports_and_segments_subscriptions.sql",
)


@task(checkpoint=False)
def fetch_control_units_contacts() -> pd.DataFrame:
r = requests.get(MONITORENV_API_ENDPOINT + "control_units")
Expand Down
109 changes: 103 additions & 6 deletions datascience/src/pipeline/flows/enrich_logbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
from src.pipeline.processing import prepare_df_for_loading
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.dates import get_utcnow, make_periods
from src.pipeline.shared_tasks.pnos import (
extract_pno_units_ports_and_segments_subscriptions,
extract_pno_units_targeting_vessels,
)
from src.pipeline.shared_tasks.segments import extract_all_segments
from src.pipeline.utils import psql_insert_copy

Expand Down Expand Up @@ -266,12 +270,15 @@ def compute_pno_types(
- logbook_reports_pno_id `int` `1`
- cfr `str` `FRA000000000`
- `predicted_arrival_datetime_utc` `datetime`
- year `int` `2023`
- species `str` `'COD'`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
- species `str` `'COD'`
- fao_area `str` `'27.7.d'`
- flag_state `str` `'FRA'`
- weight `float` `150.5`
- flag_state `str` `'FRA'`
- locode `str` `CCXXX`
- facade `str` `NAMO`

pno_types (pd.DataFrame): DataFrame of pno_types definitions. 1 line = 1 rule.
Must have columns :
Expand All @@ -293,6 +300,7 @@ def compute_pno_types(

- logbook_reports_pno_id `int` `1`
- cfr `str` `FRA000000000`
- locode `str` `CCXXX`
- `predicted_arrival_datetime_utc` `datetime`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
Expand Down Expand Up @@ -320,6 +328,7 @@ def compute_pno_types(
[
"logbook_reports_pno_id",
"cfr",
"locode",
"flag_state",
"predicted_arrival_datetime_utc",
]
Expand Down Expand Up @@ -451,7 +460,10 @@ def compute_pno_risk_factors(


def flag_pnos_to_verify_and_send(
pnos: pd.DataFrame, predicted_arrival_threshold: datetime
pnos: pd.DataFrame,
pno_units_targeting_vessels: pd.DataFrame,
pno_units_ports_and_segments_subscriptions: pd.DataFrame,
predicted_arrival_threshold: datetime,
):
pnos = pnos.copy(deep=True)

Expand All @@ -462,8 +474,76 @@ def flag_pnos_to_verify_and_send(
pnos["is_verified"] = False
pnos["is_sent"] = False

pnos["is_being_sent"] = (~pnos.is_in_verification_scope) * (
pnos.predicted_arrival_datetime_utc >= predicted_arrival_threshold
segment_subscriptions = (
pno_units_ports_and_segments_subscriptions.explode("unit_subscribed_segments")[
["port_locode", "unit_subscribed_segments"]
]
.dropna()
.reset_index(drop=True)
.assign(is_target_segment=True)
)

target_ports = set(
pno_units_ports_and_segments_subscriptions.loc[
pno_units_ports_and_segments_subscriptions.receive_all_pnos_from_port,
"port_locode",
].tolist()
)

target_vessels = set(pno_units_targeting_vessels["cfr"].dropna().tolist())

pnos_with_segments = (
pnos[["logbook_reports_pno_id", "locode", "trip_segments"]]
.explode("trip_segments")
.assign(
trip_segment=lambda x: x.trip_segments.map(
lambda d: d.get("segment"), na_action="ignore"
)
)
.drop(columns=["trip_segments"])
.reset_index(drop=True)
.dropna()
)

if len(pnos_with_segments) > 0 and len(segment_subscriptions) > 0:
segment_matches = pd.merge(
pnos_with_segments,
segment_subscriptions,
how="left",
left_on=["locode", "trip_segment"],
right_on=["port_locode", "unit_subscribed_segments"],
)
logbook_reports_targeting_segment_ids = set(
segment_matches.loc[
segment_matches.is_target_segment.fillna(False),
"logbook_reports_pno_id",
]
)
else:
logbook_reports_targeting_segment_ids = set()

if len(target_ports) > 0:
logbook_reports_targeting_port_ids = set(
pnos.loc[pnos.locode.isin(target_ports), "logbook_reports_pno_id"].tolist()
)
else:
logbook_reports_targeting_port_ids = set()

if len(target_vessels) > 0:
logbook_reports_targeting_vessel_ids = set(
pnos.loc[pnos.cfr.isin(target_vessels), "logbook_reports_pno_id"].tolist()
)
else:
logbook_reports_targeting_vessel_ids = set()

logbook_reports_to_sent_ids = logbook_reports_targeting_segment_ids.union(
logbook_reports_targeting_port_ids
).union(logbook_reports_targeting_vessel_ids)

pnos["is_being_sent"] = (
(~pnos.is_in_verification_scope)
* (pnos.predicted_arrival_datetime_utc >= predicted_arrival_threshold)
* pnos.logbook_reports_pno_id.isin(logbook_reports_to_sent_ids)
)

return pnos
Expand Down Expand Up @@ -574,6 +654,8 @@ def extract_enrich_load_logbook(
pno_types: pd.DataFrame,
control_anteriority: pd.DataFrame,
all_control_priorities: pd.DataFrame,
pno_units_targeting_vessels: pd.DataFrame,
pno_units_ports_and_segments_subscriptions: pd.DataFrame,
utcnow: datetime,
):
"""Extract pnos for the given `Period`, enrich and update the `logbook` table.
Expand Down Expand Up @@ -622,7 +704,10 @@ def extract_enrich_load_logbook(

logger.info("Flagging PNOs to verify_and_distribute...")
pnos = flag_pnos_to_verify_and_send(
pnos=pnos_with_risk_factors, predicted_arrival_threshold=utcnow
pnos=pnos_with_risk_factors,
pno_units_targeting_vessels=pno_units_targeting_vessels,
pno_units_ports_and_segments_subscriptions=pno_units_ports_and_segments_subscriptions,
predicted_arrival_threshold=utcnow,
)

logger.info("Loading")
Expand All @@ -649,6 +734,10 @@ def extract_enrich_load_logbook(
all_control_priorities = extract_all_control_priorities()
pno_types = extract_pno_types()
control_anteriority = extract_control_anteriority()
pno_units_targeting_vessels = extract_pno_units_targeting_vessels()
pno_units_ports_and_segments_subscriptions = (
extract_pno_units_ports_and_segments_subscriptions()
)

with case(recompute_all, True):
reset = reset_pnos.map(periods)
Expand All @@ -658,6 +747,10 @@ def extract_enrich_load_logbook(
pno_types=unmapped(pno_types),
control_anteriority=unmapped(control_anteriority),
all_control_priorities=unmapped(all_control_priorities),
pno_units_targeting_vessels=unmapped(pno_units_targeting_vessels),
pno_units_ports_and_segments_subscriptions=unmapped(
pno_units_ports_and_segments_subscriptions
),
utcnow=unmapped(utcnow),
upstream_tasks=[reset],
)
Expand All @@ -669,6 +762,10 @@ def extract_enrich_load_logbook(
pno_types=unmapped(pno_types),
control_anteriority=unmapped(control_anteriority),
all_control_priorities=unmapped(all_control_priorities),
pno_units_targeting_vessels=unmapped(pno_units_targeting_vessels),
pno_units_ports_and_segments_subscriptions=unmapped(
pno_units_ports_and_segments_subscriptions
),
utcnow=unmapped(utcnow),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pno_species AS (
flag_state,
trip_number,
report_datetime_utc,
p.locode,
p.facade,
(r.value->>'tripStartDate')::TIMESTAMPTZ AS trip_start_date,
(r.value->>'predictedArrivalDatetimeUtc')::TIMESTAMPTZ AS predicted_arrival_datetime_utc,
Expand Down Expand Up @@ -103,6 +104,7 @@ SELECT
s.fao_area,
s.weight,
s.flag_state,
s.locode,
s.facade
FROM pno_species s
LEFT JOIN far_gears fg
Expand Down
20 changes: 20 additions & 0 deletions datascience/src/pipeline/shared_tasks/pnos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd
from prefect import task

from src.pipeline.generic_tasks import extract


@task(checkpoint=False)
def extract_pno_units_targeting_vessels() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_targeting_vessels.sql",
)


@task(checkpoint=False)
def extract_pno_units_ports_and_segments_subscriptions() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_ports_and_segments_subscriptions.sql",
)
49 changes: 49 additions & 0 deletions datascience/tests/test_pipeline/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pandas as pd
import pytest


@pytest.fixture
def pno_units_targeting_vessels():
return pd.DataFrame(
{
"vessel_id": [2, 4, 7],
"cfr": ["ABC000542519", None, "___TARGET___"],
"control_unit_ids_targeting_vessel": [[4], [1, 2], [4]],
}
)


@pytest.fixture
def pno_units_ports_and_segments_subscriptions():
return pd.DataFrame(
{
"port_locode": [
"FRCQF",
"FRDKK",
"FRDPE",
"FRLEH",
"FRLEH",
"FRZJZ",
"FRZJZ",
],
"control_unit_id": [1, 2, 4, 2, 3, 2, 3],
"receive_all_pnos_from_port": [
False,
False,
True,
False,
False,
False,
False,
],
"unit_subscribed_segments": [
["SWW01/02/03"],
[],
[],
[],
["SWW01/02/03", "NWW01"],
[],
["SWW01/02/03", "NWW01"],
],
}
)
63 changes: 0 additions & 63 deletions datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
create_email,
create_sms,
extract_fishing_gear_names,
extract_pno_units_ports_and_segments_subscriptions,
extract_pno_units_targeting_vessels,
extract_pnos_to_generate,
extract_species_names,
fetch_control_units_contacts,
Expand Down Expand Up @@ -1017,53 +1015,6 @@ def pno_pdf_document_to_distribute_verified_assigned(
)


@pytest.fixture
def pno_units_targeting_vessels():
return pd.DataFrame(
{
"vessel_id": [2, 4, 7],
"cfr": ["ABC000542519", None, "___TARGET___"],
"control_unit_ids_targeting_vessel": [[4], [1, 2], [4]],
}
)


@pytest.fixture
def pno_units_ports_and_segments_subscriptions():
return pd.DataFrame(
{
"port_locode": [
"FRCQF",
"FRDKK",
"FRDPE",
"FRLEH",
"FRLEH",
"FRZJZ",
"FRZJZ",
],
"control_unit_id": [1, 2, 4, 2, 3, 2, 3],
"receive_all_pnos_from_port": [
False,
False,
True,
False,
False,
False,
False,
],
"unit_subscribed_segments": [
["SWW01/02/03"],
[],
[],
[],
["SWW01/02/03", "NWW01"],
[],
["SWW01/02/03", "NWW01"],
],
}
)


@pytest.fixture
def monitorenv_control_units_api_response() -> list:
return [
Expand Down Expand Up @@ -1530,20 +1481,6 @@ def test_extract_species_names(reset_test_data, species_names):
assert res == species_names


def test_extract_pno_units_targeting_vessels(
reset_test_data, pno_units_targeting_vessels
):
res = extract_pno_units_targeting_vessels.run()
pd.testing.assert_frame_equal(res, pno_units_targeting_vessels)


def test_extract_pno_units_ports_and_segments_subscriptions(
reset_test_data, pno_units_ports_and_segments_subscriptions
):
res = extract_pno_units_ports_and_segments_subscriptions.run()
pd.testing.assert_frame_equal(res, pno_units_ports_and_segments_subscriptions)


def test_extract_fishing_gear_names(reset_test_data, fishing_gear_names):
res = extract_fishing_gear_names.run()
assert res == fishing_gear_names
Expand Down
Loading