Skip to content

Commit

Permalink
Archivage automatique des signalements de FAR manquant (24h) (#2598)
Browse files Browse the repository at this point in the history
## Linked issues

- Resolve #2584
----
  • Loading branch information
louptheron authored Oct 16, 2023
2 parents d874a1f + c536819 commit f936343
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package fr.gouv.cnsp.monitorfish.infrastructure.api.public_api

import fr.gouv.cnsp.monitorfish.domain.use_cases.reporting.ArchiveReporting
import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.tags.Tag
import jakarta.websocket.server.PathParam
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/api/v1/reportings")
@Tag(name = "Public APIs for reporting")
class PublicReportingController(
private val archiveReporting: ArchiveReporting,
) {

@PutMapping(value = ["/{reportingId}/archive"])
@Operation(summary = "Archive a reporting")
fun archiveReporting(
@PathParam("Reporting id")
@PathVariable(name = "reportingId")
reportingId: Int,
) {
archiveReporting.execute(reportingId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package fr.gouv.cnsp.monitorfish.infrastructure.api.public_api

import fr.gouv.cnsp.monitorfish.config.OIDCProperties
import fr.gouv.cnsp.monitorfish.config.SecurityConfig
import fr.gouv.cnsp.monitorfish.config.SentryConfig
import fr.gouv.cnsp.monitorfish.domain.use_cases.reporting.ArchiveReporting
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.annotation.Import
import org.springframework.test.web.servlet.MockMvc
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put
import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status

@Import(SecurityConfig::class, OIDCProperties::class, SentryConfig::class)
@WebMvcTest(value = [PublicReportingController::class])
class PublicReportingControllerITests {

@Autowired
private lateinit var api: MockMvc

@MockBean
private lateinit var archiveReporting: ArchiveReporting

@Test
fun `Should archive a reporting`() {
// When
api.perform(put("/api/v1/reportings/123/archive"))
// Then
.andExpect(status().isOk)

Mockito.verify(archiveReporting).execute(123)
}
}
3 changes: 3 additions & 0 deletions datascience/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@
PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE = (
API_ENDPOINT + "operational_alerts/{pending_alert_id}/validate"
)
REPORTING_ARCHIVING_ENDPOINT_TEMPLATE = (
API_ENDPOINT + "reportings/{reporting_id}/archive"
)

# Backend api key
BACKEND_API_KEY = os.environ.get("MONITORFISH_BACKEND_API_KEY")
Expand Down
14 changes: 10 additions & 4 deletions datascience/src/pipeline/flows/validate_pending_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from prefect.executors import LocalDaskExecutor

from src.pipeline.shared_tasks.alerts import (
extract_pending_alerts_ids_of_config_name,
archive_reporting,
extract_non_archived_reportings_ids_of_type,
extract_pending_alerts_ids_of_type,
validate_pending_alert,
)
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
Expand All @@ -13,8 +15,12 @@

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
alert_config_name = Parameter("alert_config_name")
pending_alert_ids = extract_pending_alerts_ids_of_config_name(alert_config_name)
validate_pending_alert.map(pending_alert_ids)
alert_type = Parameter("alert_type")
pending_alert_ids = extract_pending_alerts_ids_of_type(alert_type)
validated_alerts = validate_pending_alert.map(pending_alert_ids)
reporting_ids = extract_non_archived_reportings_ids_of_type(
alert_type, upstream_tasks=[validated_alerts]
)
archive_reporting.map(reporting_ids)

flow.file_name = Path(__file__).name
2 changes: 1 addition & 1 deletion datascience/src/pipeline/flows_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@
clocks.CronClock(
"50 6 * * *",
parameter_defaults={
"alert_config_name": "MISSING_FAR_ALERT",
"alert_type": "MISSING_FAR_ALERT",
},
),
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT id
FROM reportings
WHERE value->>'type' = :reporting_type AND NOT archived

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT id
FROM pending_alerts
WHERE value->>'type' = :alert_type
38 changes: 33 additions & 5 deletions datascience/src/pipeline/shared_tasks/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import requests
from prefect import task

from config import PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE
from config import (
PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE,
REPORTING_ARCHIVING_ENDPOINT_TEMPLATE,
)
from src.db_config import create_engine
from src.pipeline.generic_tasks import extract, load
from src.pipeline.processing import (
Expand All @@ -29,21 +32,46 @@ def extract_silenced_alerts() -> pd.DataFrame:


@task(checkpoint=False)
def extract_pending_alerts_ids_of_config_name(alert_config_name: str) -> List[int]:
def extract_pending_alerts_ids_of_type(alert_type: str) -> List[int]:
"""
Return ids of pending alerts corresponding to `alert_config_name`
Return ids of pending alerts corresponding to `alert_type`
"""
logger = prefect.context.get("logger")
pending_alerts = extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pending_alerts_of_config_name.sql",
params={"alert_config_name": alert_config_name},
query_filepath="monitorfish/pending_alerts_of_type.sql",
params={"alert_type": alert_type},
)
ids = pending_alerts.id.unique().tolist()
logger.info(f"Returning {len(ids)} pending alerts ids.")
return ids


@task(checkpoint=False)
def extract_non_archived_reportings_ids_of_type(reporting_type: str) -> List[int]:
"""
Return ids of pending alerts corresponding to `alert_type`
"""
logger = prefect.context.get("logger")
reportings = extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/non_archived_reportings_of_type.sql",
params={"reporting_type": reporting_type},
)
ids = reportings.id.unique().tolist()
logger.info(f"Returning {len(ids)} reportings ids.")
return ids


@task(checkpoint=False)
def archive_reporting(id: int) -> pd.DataFrame:
logger = prefect.context.get("logger")
url = REPORTING_ARCHIVING_ENDPOINT_TEMPLATE.format(reporting_id=id)
logger.info(f"Archiving reporting {id}.")
r = requests.put(url)
r.raise_for_status()


@task(checkpoint=False)
def validate_pending_alert(id: int) -> pd.DataFrame:
logger = prefect.context.get("logger")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
DELETE FROM reportings;

INSERT INTO reportings (
type, vessel_id, internal_reference_number, external_reference_number, ircs, vessel_name, vessel_identifier, creation_date, validation_date, archived, deleted, flag_state, value) VALUES
( 'ALERT', 6, NULL, 'ZZTOPACDC', 'ZZ000000', 'I DO 4H REPORT', 'IRCS', NOW() - ('1 DAY')::interval, NOW(), false, false, 'FR' ,'{"seaFront": "NAMO", "riskFactor": 3.5647, "type": "THREE_MILES_TRAWLING_ALERT", "natinfCode": 7059}'::jsonb);
id, type, vessel_id, internal_reference_number, external_reference_number, ircs, vessel_name, vessel_identifier, creation_date, validation_date, archived, deleted, flag_state, value) VALUES
( 56, 'ALERT', 6, NULL, 'ZZTOPACDC', 'ZZ000000', 'I DO 4H REPORT', 'IRCS', NOW() - ('1 DAY')::interval, NOW(), false, false, 'FR' ,'{"seaFront": "NAMO", "riskFactor": 3.5647, "type": "THREE_MILES_TRAWLING_ALERT", "natinfCode": 7059}'::jsonb);
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
DELETE FROM pending_alerts;

INSERT INTO pending_alerts (
id,
vessel_name, internal_reference_number, external_reference_number, ircs, vessel_identifier,
creation_date, trip_number, flag_state,
value,
alert_config_name
) VALUES
(
12,
'L''AMBRE', 'FRA000614250', 'GV614250', 'FUJW', 'INTERNAL_REFERENCE_NUMBER',
'2021-12-23 16:03:00+00', NULL, 'FR',
'{"type": "THREE_MILES_TRAWLING_ALERT", "seaFront": "NAMO"}',
Expand Down
41 changes: 41 additions & 0 deletions datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import pandas as pd

from src.pipeline.shared_tasks.alerts import (
archive_reporting,
extract_non_archived_reportings_ids_of_type,
extract_pending_alerts_ids_of_type,
extract_silenced_alerts,
filter_silenced_alerts,
load_alerts,
make_alerts,
validate_pending_alert,
)
from src.read_query import read_query
from tests.mocks import mock_datetime_utcnow
Expand All @@ -26,6 +30,43 @@ def test_extract_silenced_alerts(reset_test_data):
pd.testing.assert_frame_equal(silenced_alerts, expected_silenced_alerts)


def test_extract_pending_alerts_ids_of_type(reset_test_data):
assert extract_pending_alerts_ids_of_type.run(
alert_type="THREE_MILES_TRAWLING_ALERT"
) == [12]

assert extract_pending_alerts_ids_of_type.run(alert_type="NON_EXISTING_ALERT") == []


def test_extract_non_archived_reportings_ids_of_type(reset_test_data):
assert extract_non_archived_reportings_ids_of_type.run(
reporting_type="THREE_MILES_TRAWLING_ALERT"
) == [56]

assert (
extract_non_archived_reportings_ids_of_type.run(
reporting_type="NON_EXISTING_ALERT"
)
== []
)


@patch("src.pipeline.shared_tasks.alerts.requests")
def test_validate_pending_alert(requests_mock):
validate_pending_alert.run(12)
requests_mock.put.assert_called_once_with(
"https://monitor.fish/api/v1/operational_alerts/12/validate"
)


@patch("src.pipeline.shared_tasks.alerts.requests")
def test_archive_reporting(requests_mock):
archive_reporting.run(12)
requests_mock.put.assert_called_once_with(
"https://monitor.fish/api/v1/reportings/12/archive"
)


@patch(
"src.pipeline.shared_tasks.alerts.datetime",
mock_datetime_utcnow(datetime(2020, 5, 3, 8, 0, 0)),
Expand Down

0 comments on commit f936343

Please sign in to comment.