-
Notifications
You must be signed in to change notification settings - Fork 0
/
parser.py
102 lines (89 loc) · 3.97 KB
/
parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from bs4 import BeautifulSoup
import pickle as pkl
import argparse
import logging
from model import Pollutant, DBClient
import datetime as dt
import requests
logger = logging.getLogger(__name__)
def parse_stations(station_xml):
"""Parse a station xml and return the id and geographical coordinates of each station
Args:
path_to_stations (string): path to the xml
Returns:
list(dict): list of all stations with their id and coordinates
"""
soup = BeautifulSoup(station_xml, "xml")
elts = soup.find_all("gml:Point")
stations = []
for elt in elts :
station = {}
station["station_ref"] = elt["gml:id"].split("-")[-1]
station["position"] = [float(coord) for coord in elt.find("gml:pos").string.split()]
stations.append(station)
return stations
def parse_mesures(last_mesures_xml):
"""
Parse a mesure xml using the previously parsed stations, and parse stations metadata the first time
Args:
last_mesures_xml (string): xml string containing the last mesures
Returns:
list(dict): list of mesures with the geographical coordinates of mesure points
"""
dbclient = DBClient("localhost")
if dbclient.stations.count()==0:
print("downloading and parsing stations metadata")
r = requests.get("https://www.data.gouv.fr/fr/datasets/r/5f112ee8-84fa-4ff7-901c-862a8c0c478b")
stations = parse_stations(r.text)
dbclient.insert_stations(stations)
soup = BeautifulSoup(last_mesures_xml, "xml")
elts = soup.find_all("om:OM_Observation")
mesures = []
for elt in elts :
mesure_batch = []
try :
values = elt.find("swe:values").string.split("@@")[:-1]
for i, value in enumerate(values) :
mesure = {}
mesure["mesure_ref"] = elt["gml:id"]+"_"+str(i)
mesure["start_mesure"] = dt.datetime.fromisoformat(value.split(",")[0]).astimezone()
mesure["end_mesure"] = dt.datetime.fromisoformat(value.split(",")[1]).astimezone()
mesure["value"] = float(value.split(",")[4])
mesure["station_id"] = elt.find("om:name",{"xlink:href":"http://dd.eionet.europa.eu/vocabulary/aq/processparameter/SamplingPoint"}).find_next()["xlink:href"]
pollutant_idx = int(elt.find("om:observedProperty")["xlink:href"].split("/")[-1])
mesure["pollutant"] = Pollutant(pollutant_idx).name
mesure["position"] = {}
station_id = mesure["station_id"].split("/")[-1].split("-")[-1]
position = get_station_coords(station_id, dbclient)
mesure["position"]["x"] = position[0]
mesure["position"]["y"] = position[1]
mesure_batch.append(mesure)
mesures += mesure_batch
except :
logger.debug(f"unable to parse the following element :\n {elt.prettify()}")
return mesures
def get_station_coords(station_id, dbclient):
"""find the station coordinates using its id
Args:
station_id (string): id of the station
stations_df (DBClient): database client of all stations
Returns:
[int, int]: latitude and longitude of the station
"""
return dbclient.stations.find_one({"station_ref":station_id})["position"]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("path", help="path of the document to parse")
parser.add_argument("--parse-stations", action="store_true", help="if set the parsed document is a stations xml, else it is a mesure xml")
args = parser.parse_args()
client = DBClient("localhost")
if args.parse_stations :
with open(args.path,"r") as f:
stations_xml = f.read()
stations = parse_stations(stations_xml)
client.insert_stations(stations)
else :
with open(args.path,"r") as f:
mesures_xml = f.read()
mesures = parse_mesures(mesures_xml)
client.insert_mesures(mesures)