forked from KaitaiD/py-network-rail-feeder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.py
183 lines (158 loc) · 5.64 KB
/
listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import logging
import json
import sqlite3
logger = logging.getLogger("Listener Logger")
class MessagerToSQL(object):
"""
Convert the messages received from feeds into sqlite database
"""
def __init__(self,
fp: str,
schema: dict,
drop_if_exists=True):
"""
fp: Database name
schema: Schema of the fields to collect from data feeds
drop_if_exists: If true, drop the table if it exists already to avoid raising a sqlite error.
"""
self.table_name = fp.split(".")[0]
self.schema = schema
# Reconstruct schema to be accepted by sqlite3
self.printable_schema = ", ".join([" ".join([k, v]) for k,v in schema.items()])
self.conn = sqlite3.connect(fp, check_same_thread=False)
self.c = self.conn.cursor()
if drop_if_exists:
self.c.execute(f"DROP TABLE IF EXISTS {self.table_name}")
self.c.execute(f"CREATE TABLE {self.table_name} ({self.printable_schema})")
def insert(self, sql: str, msg_list: list):
"""
Make a simple Python wrapper of insert method in sqlite.
Args:
sql: A SQL string of command
msg_list: The list containing values of fields that need to be saved in sql table
"""
self.c.execute(sql, msg_list)
self.conn.commit()
def close(self):
"""
Make a simple Python wrapper to close connection to sql table
"""
self.conn.close()
class BaseListener(object):
"""
Make a Base Listener. This can facilitate to create different listeners for different feeds.
"""
def __init__(self, messager):
"""
Args:
messager: MessagerToSQL object
"""
self.msger = messager
def on_error(self, headers, message):
logger.error(f"ERROR: {headers} {message}")
def on_message(self, headers, messages):
pass
def _insert_message(self, msg: str):
"""
An internal function to insert the message to sql table.
Args:
msg: Customized message for MV
"""
columns = self.msger.schema.keys()
placeholders = ", ".join("?" * len(columns))
sql = f"INSERT INTO {self.msger.table_name} VALUES ({placeholders})"
self.msger.insert(sql, [msg.get(col) for col in columns])
def _flatten(self, d, parent_key='', sep='_'):
"""
An function to flatten the nested dictionary and connect the different levels
of dictionaries with _ symbol
"""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
try:
items.extend(self._flatten(v, new_key, sep=sep).items())
except:
items.append((new_key, v))
return dict(items)
class MVListener(BaseListener):
"""
Make a Listener for Train Movement Feeds.
"""
def __init__(self, messager, view=False):
"""
Args:
messager: MessagerToSQL object
view: If True, will print all message instead of saving. Default is False
"""
self.view = view
super().__init__(messager)
def on_message(self, headers, messages):
logger.info(headers)
for message in json.loads(messages):
if view:
print(message['body'])
else:
self._insert_message(message['body'])
class PPMListener(BaseListener):
"""
Make a Listener for Public Performance Measures (PPM) Feeds.
"""
def __init__(self, messager, view=False):
"""
Args:
messager: MessagerToSQL object
view: If True, will print all message instead of saving. Default is False
"""
self.view = view
super().__init__(messager)
def on_message(self, headers, messages):
logger.info(headers)
data = json.loads(messages)
for message in data["RTPPMDataMsgV1"]['RTPPMData']['OperatorPage']:
if self.view:
print(self.flatten(message['Operator']))
else:
self._insert_message(self.flatten(message['Operator']))
class VSTPListener(BaseListener):
"""
Make a Listener for VSTP (Very Short Term Planning) Feeds.
"""
def __init__(self, messager, view=False):
"""
Args:
messager: MessagerToSQL object
view: If True, will print all message instead of saving. Default is False
"""
self.view = view
super().__init__(messager)
def on_message(self, headers, messages):
logger.info(headers)
data = json.loads(messages)
if self.view:
print(data['VSTPCIFMsgV1']['schedule'])
else:
self._insert_message(data['VSTPCIFMsgV1']['schedule'])
class TDListener(BaseListener):
"""
Make a Listener for train describer (TD) Feeds.
"""
def __init__(self, messager, view=False):
"""
Args:
messager: MessagerToSQL object
view: If True, will print all message instead of saving. Default is False
"""
self.view = view
super().__init__(messager)
def on_message(self, headers, messages):
logger.info(headers)
data = json.loads(messages)
new_key = "MSG"
for message in data:
for k, v in message.items():
new_formed_message = {**{new_key: k}, **v}
if self.view:
print(new_formed_message)
else:
self._insert_message(new_formed_message)