diff --git a/src/app.py b/src/app.py index 95fb523..18eb3cc 100644 --- a/src/app.py +++ b/src/app.py @@ -68,11 +68,7 @@ qos: int Quality of service of MQTT. ''' - -import json - import can - import auth import stats_service import data_service @@ -81,7 +77,8 @@ import paho.mqtt.client as mqtt from multiprocessing import Process, Queue, Event from threading import Thread - +from mqtt_util import MQTTConf, gcb_init_publisher, gcb_connect, gcb_disconnect +from config_util import ConfFlags, read_conf, get_temp_interval, get_load_interval, get_fuel_level_limit from mqtt_utils import MQTTClient logging.config.fileConfig('logging.conf') @@ -127,26 +124,6 @@ fuel_alarm_topic = "alarms/fuel" -def read_conf(): - ''' - Reads app config file. - - Parameters - ---------- - Returns - ------- - conf: dict - Configuration data parsed from json config file. - ''' - try: - conf_file = open(conf_path) - conf = json.load(conf_file) - return conf - except BaseException: - errorLogger.critical("Cant read app configuration file - ", conf_path, " !") - return None - - def signup_periodically(key, username, password, time_pattern, url, interval): ''' Periodically requests device signup. @@ -180,7 +157,10 @@ def signup_periodically(key, username, password, time_pattern, url, interval): return jwt -def shutdown_controller(temp_handler_flag, load_handler_flag, fuel_handler_flag): +def shutdown_controller( + temp_handler_flag, + load_handler_flag, + fuel_handler_flag): ''' Handles user request for sensor shutdown. @@ -224,12 +204,16 @@ def on_connect_temp_handler(client, userdata, flags, rc, props): ------- ''' if rc == 0: - infoLogger.info("Temperature data handler successfully established connection with MQTT broker!") - customLogger.info("Temperature data handler successfully established connection with MQTT broker!") + infoLogger.info( + "Temperature data handler successfully established connection with MQTT broker!") + customLogger.info( + "Temperature data handler successfully established connection with MQTT broker!") client.subscribe(temp_topic, qos=qos) else: - errorLogger.error("Temperature data handler failed to establish connection with MQTT broker!") - customLogger.critical("Temperature data handler failed to establish connection with MQTT broker!") + errorLogger.error( + "Temperature data handler failed to establish connection with MQTT broker!") + customLogger.critical( + "Temperature data handler failed to establish connection with MQTT broker!") def on_connect_load_handler(client, userdata, flags, rc, props): @@ -248,11 +232,14 @@ def on_connect_load_handler(client, userdata, flags, rc, props): ------- ''' if rc == 0: - infoLogger.info("Arm load data handler successfully established connection with MQTT broker!") + infoLogger.info( + "Arm load data handler successfully established connection with MQTT broker!") client.subscribe(load_topic, qos=qos) else: - errorLogger.error("Arm load data handler failed to establish connection with MQTT broker!") - customLogger.critical("Arm load data handler failed to establish connection with MQTT broker!") + errorLogger.error( + "Arm load data handler failed to establish connection with MQTT broker!") + customLogger.critical( + "Arm load data handler failed to establish connection with MQTT broker!") def on_connect_fuel_handler(client, userdata, flags, rc, props): @@ -271,16 +258,19 @@ def on_connect_fuel_handler(client, userdata, flags, rc, props): ------- ''' if rc == 0: - infoLogger.info("Fuel data handler successfully established connection with MQTT broker!") + infoLogger.info( + "Fuel data handler successfully established connection with MQTT broker!") client.subscribe(fuel_topic, qos=qos) else: - errorLogger.error("Fuel data handler failed to establish connection with MQTT broker!") - customLogger.critical("Fuel data handler failed to establish connection with MQTT broker!") + errorLogger.error( + "Fuel data handler failed to establish connection with MQTT broker!") + customLogger.critical( + "Fuel data handler failed to establish connection with MQTT broker!") # iot data aggregation and forwarding to cloud -def collect_temperature_data(config, url, jwt, flag, stats_queue): +def collect_temperature_data(config, url, jwt, flag, conf_flag, stats_queue): ''' Temperature data handler logic. @@ -315,6 +305,15 @@ def collect_temperature_data(config, url, jwt, flag, stats_queue): new_data = [] old_data = [] + # [REST/MQTT] + interval = get_temp_interval(config) + gcb_conf = MQTTConf.from_app_config(config, "gateway_cloud_broker") + gcb_client = gcb_init_publisher( + "temp_data_publisher_mqtt", + gcb_conf.username, + gcb_conf.password) + gcb_connect(gcb_client, gcb_conf.address, gcb_conf.port) + customLogger.debug("TEMP PUBLISHER ESTABLISHED CONNECTION WITH BROKER.") # called when there is new message in temp_topic topic def on_message_handler(client, userdata, message): @@ -344,37 +343,49 @@ def on_message_handler(client, userdata, message): customLogger.info("Temperature of " + str(data_value) + " C is too high! Sounding the alarm!") client.publish(temp_alarm_topic, True, qos) - client = MQTTClient("temp-data-handler-mqtt-client", transport_protocol=transport_protocol, - protocol_version=mqtt.MQTTv5, - mqtt_username=config[mqtt_broker][user], - mqtt_pass=config[mqtt_broker][password], - broker_address=config[mqtt_broker][address], - broker_port=config[mqtt_broker][port], - keepalive=config[temp_settings][interval] * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="TEMP", - bus=None, - ) + client = MQTTClient( + "temp-data-handler-mqtt-client", + transport_protocol=transport_protocol, + protocol_version=mqtt.MQTTv5, + mqtt_username=config[mqtt_broker][user], + mqtt_pass=config[mqtt_broker][password], + broker_address=config[mqtt_broker][address], + broker_port=config[mqtt_broker][port], + keepalive=config[temp_settings][interval] * 3, + infoLogger=infoLogger, + errorLogger=errorLogger, + flag=flag, + sensor_type="TEMP", + bus=None, + ) # initializing stats object stats = stats_service.Stats() # initializing mqtt client for collecting sensor data from broker client.set_on_connect(on_connect_temp_handler) client.set_on_message(on_message_handler) client.connect() - # periodically processes collected data and forwards result to cloud services + # periodically processes collected data and forwards result to cloud + # services while not flag.is_set(): - # copy data from list that is populated with newly arrived data and clear that list + customLogger.debug(f"INTERVAL: {interval}") + # [REST/MQTT] + if conf_flag.is_set(): + interval = get_temp_interval(read_conf()) + conf_flag.clear() + + # copy data from list that is populated with newly arrived data and + # clear that list data = new_data.copy() new_data.clear() - # append data that is not sent in previous iterations due to connection problem + # append data that is not sent in previous iterations due to connection + # problem for i in old_data: data.append(i) old_data.clear() # send request to Cloud only if there is available data if len(data) > 0: - code = data_service.handle_temperature_data(data, url, jwt, config[time_format], client) + code = data_service.handle_temperature_data( + data, url, jwt, config[user], config[time_format], client, gcb_client) # if data is not sent to cloud, it is returned to queue if code != http_ok: @@ -386,15 +397,20 @@ def on_message_handler(client, userdata, message): customLogger.error("JWT has expired!") break else: - infoLogger.warning("There is no temperature sensor data to handle!") - time.sleep(config[temp_settings][interval]) + infoLogger.warning( + "There is no temperature sensor data to handle!") + time.sleep(interval) # shutting down temperature sensor stats_queue.put(stats) client.disconnect() + + # [REST/MQTT] + gcb_disconnect(gcb_client) + customLogger.debug("Temperature data handler shutdown!") -def collect_load_data(config, url, jwt, flag, stats_queue): +def collect_load_data(config, url, jwt, flag, conf_flag, stats_queue): ''' Load data handler logic. @@ -429,6 +445,17 @@ def collect_load_data(config, url, jwt, flag, stats_queue): ''' new_data = [] old_data = [] + + # [REST/MQTT] + interval = get_load_interval(config) + gcb_conf = MQTTConf.from_app_config(config, "gateway_cloud_broker") + gcb_client = gcb_init_publisher( + "load_data_publisher_mqtt", + gcb_conf.username, + gcb_conf.password) + gcb_connect(gcb_client, gcb_conf.address, gcb_conf.port) + customLogger.debug("LOAD PUBLISHER ESTABLISHED CONNECTION WITH BROKER.") + # called when there is new message in load_topic topic def on_message_handler(client, userdata, message): @@ -450,9 +477,10 @@ def on_message_handler(client, userdata, message): data = message.payload.decode("utf-8") new_data.append(str(data)) customLogger.info("Received load data: " + str(data)) - data_sum, unit = data_service.parse_incoming_data(str(data), "load") - time_value = time.strftime(time_format, - time.localtime()) # ASK this is the time from the gateway, not the sensor + data_sum, unit = data_service.parse_incoming_data( + str(data), "load") + # ASK this is the time from the gateway, not the sensor + time_value = time.strftime(time_format, time.localtime()) if data_sum > 1000: # sound the alarm! ask him what do I send #ASK customLogger.info("Load of " + str(data_sum) + " kg is too high! Sounding the alarm!") @@ -461,37 +489,49 @@ def on_message_handler(client, userdata, message): # initializing stats object stats = stats_service.Stats() # initializing mqtt client for collecting sensor data from broker - client = MQTTClient("load-data-handler-mqtt-client", transport_protocol=transport_protocol, - protocol_version=mqtt.MQTTv5, - mqtt_username=config[mqtt_broker][user], - mqtt_pass=config[mqtt_broker][password], - broker_address=config[mqtt_broker][address], - broker_port=config[mqtt_broker][port], - keepalive=config[temp_settings][interval] * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="LOAD", - bus=None, - ) + client = MQTTClient( + "load-data-handler-mqtt-client", + transport_protocol=transport_protocol, + protocol_version=mqtt.MQTTv5, + mqtt_username=config[mqtt_broker][user], + mqtt_pass=config[mqtt_broker][password], + broker_address=config[mqtt_broker][address], + broker_port=config[mqtt_broker][port], + keepalive=config[temp_settings][interval] * 3, + infoLogger=infoLogger, + errorLogger=errorLogger, + flag=flag, + sensor_type="LOAD", + bus=None, + ) # initializing stats object stats = stats_service.Stats() # initializing mqtt client for collecting sensor data from broker client.set_on_connect(on_connect_load_handler) client.set_on_message(on_message_handler) client.connect() - # periodically processes collected data and forwards result to cloud services + + # periodically processes collected data and forwards result to cloud + # services while not flag.is_set(): - # copy data from list that is populated with newly arrived data and clear that list + # [REST/MQTT] + if conf_flag.is_set(): + interval = get_load_interval(read_conf()) + conf_flag.clear() + + # copy data from list that is populated with newly arrived data and + # clear that list data = new_data.copy() new_data.clear() - # append data that is not sent in previous iterations due to connection problem + # append data that is not sent in previous iterations due to connection + # problem for i in old_data: data.append(i) old_data.clear() # send request to Cloud only if there is available data if len(data) > 0: - code = data_service.handle_load_data(data, url, jwt, config[time_format]) + code = data_service.handle_load_data( + data, url, jwt, config[user], config[time_format], gcb_client) # if data is not sent to cloud, it is returned to queue if code != http_ok: old_data = data.copy() @@ -503,15 +543,19 @@ def on_message_handler(client, userdata, message): break else: infoLogger.warning("There is no arm load sensor data to handle!") - time.sleep(config[load_settings][interval]) + time.sleep(interval) # shutting down load sensor stats_queue.put(stats) client.loop_stop() client.disconnect() + + # [REST/MQTT] + gcb_disconnect(gcb_client) + customLogger.debug("Arm load data handler shutdown!") -def collect_fuel_data(config, url, jwt, flag, stats_queue): +def collect_fuel_data(config, url, jwt, flag, conf_flag, stats_queue): ''' Fuel data handler logic. @@ -545,20 +589,35 @@ def collect_fuel_data(config, url, jwt, flag, stats_queue): ------- ''' # initializing stats object + + stats = stats_service.Stats() + + # [REST/MQTT] + limit = get_fuel_level_limit(config) + gcb_conf = MQTTConf.from_app_config(config, "gateway_cloud_broker") + gcb_client = gcb_init_publisher( + "fuel_data_publisher_mqtt", + gcb_conf.username, + gcb_conf.password) + gcb_connect(gcb_client, gcb_conf.address, gcb_conf.port) + customLogger.debug("FUEL PUBLISHER ESTABLISHED CONNECTION WITH BROKER.") + # called when there is new message in load_topic topic - client = MQTTClient("fuel-data-handler-mqtt-client", transport_protocol=transport_protocol, - protocol_version=mqtt.MQTTv5, - mqtt_username=config[mqtt_broker][user], - mqtt_pass=config[mqtt_broker][password], - broker_address=config[mqtt_broker][address], - broker_port=config[mqtt_broker][port], - keepalive=config[temp_settings][interval] * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="FUEL", - bus=None, - ) + client = MQTTClient( + "fuel-data-handler-mqtt-client", + transport_protocol=transport_protocol, + protocol_version=mqtt.MQTTv5, + mqtt_username=config[mqtt_broker][user], + mqtt_pass=config[mqtt_broker][password], + broker_address=config[mqtt_broker][address], + broker_port=config[mqtt_broker][port], + keepalive=config[temp_settings][interval] * 3, + infoLogger=infoLogger, + errorLogger=errorLogger, + flag=flag, + sensor_type="FUEL", + bus=None, + ) def on_message_handler(client, userdata, message): ''' @@ -577,14 +636,28 @@ def on_message_handler(client, userdata, message): ''' # making sure that flag is not set in meantime if not flag.is_set(): + # [REST/MQTT] + if conf_flag.is_set(): + nonlocal limit + limit = get_fuel_level_limit(read_conf()) + conf_flag.clear() + customLogger.info("Received fuel data: " + str(message.payload.decode("utf-8"))) - code = data_service.handle_fuel_data(str(message.payload.decode( - "utf-8")), config[fuel_settings][level_limit], url, jwt, config[time_format], client) + + code = data_service.handle_fuel_data(str(message.payload.decode("utf-8")), + config[fuel_settings][level_limit], + url, + jwt, + config[user], + config[time_format], + client, + gcb_client) if code == http_ok: stats.update_data(4, 4, 1) elif code == http_no_content: stats.update_data(4, 0, 0) - # jwt has expired - handler will be stopped, and started again after app restart + # jwt has expired - handler will be stopped, and started again + # after app restart elif code == http_unauthorized: customLogger.error("JWT has expired!") flag.set() @@ -597,12 +670,18 @@ def on_message_handler(client, userdata, message): client.set_on_message(on_message_handler) client.connect() - # must do like this to be able to stop thread acquired for incoming messages(on_message) after flag is set + # must do like this to be able to stop thread acquired for incoming + # messages(on_message) after flag is set + while not flag.is_set(): time.sleep(2) # shutting down temperature sensor stats_queue.put(stats) client.disconnect() + + # [REST/MQTT] + gcb_disconnect(gcb_client) + customLogger.debug("Fuel level data handler shutdown!") @@ -625,22 +704,47 @@ def main(): if config is not None: infoLogger.info("IoT Gateway app started!") customLogger.debug("IoT Gateway app started!") + + # [REST/MQTT] + conf_flags = ConfFlags() + conf_observer = start_config_observer(conf_flags) + # iot cloud platform login - jwt = auth.login(config[user], config[password], config[server_url] + "/auth/login") + jwt = auth.login(config[user], + config[password], + config[server_url] + "/auth/login") # if failed, periodically request signup if jwt is None: - customLogger.error("Login failed! Trying to sign up periodically!") - jwt = signup_periodically(config[api_key], config[user], config[password], - config[server_time_format], config[server_url] + "/auth/signup", + customLogger.error( + "Login failed! Trying to sign up periodically!") + jwt = signup_periodically(config[api_key], + config[user], + config[password], + config[server_time_format], + config[server_url] + "/auth/signup", config[auth_interval]) else: customLogger.debug("Login successful!") # now JWT required for Cloud platform auth is stored in jwt var customLogger.info("Received JWT: " + jwt) # starting stats collecting - # using shared memory Queue objects for returning stats data from processes + + # using shared memory Queue objects for returning stats data from + # processes customLogger.debug("Initializing devices stats data!") - stats = stats_service.OverallStats(config[server_url] + "/stats", jwt, config[time_format]) + + # [REST/MQTT] [Publisher client created and passed as new parameter i OverallStats] + gcb_conf = MQTTConf.from_app_config(config, "gateway_cloud_broker") + gcb_client = gcb_init_publisher("stats_data_publisher_mqtt", gcb_conf.username, gcb_conf.password) + gcb_connect(gcb_client, gcb_conf.address, gcb_conf.port) + customLogger.debug( + "STATS PUBLISHER ESTABLISHED CONNECTION WITH BROKER.") + + stats = stats_service.OverallStats(config[server_url] + "/stats", + jwt, + config[user], + config[time_format], + gcb_client) temp_stats_queue = Queue() load_stats_queue = Queue() fuel_stats_queue = Queue() @@ -660,6 +764,7 @@ def main(): config[server_url] + "/data/temp", jwt, temp_handler_flag, + conf_flags.temp_flag, temp_stats_queue)) temperature_data_handler.start() time.sleep(1) @@ -668,6 +773,7 @@ def main(): config[server_url] + "/data/load", jwt, load_handler_flag, + conf_flags.load_flag, load_stats_queue)) load_data_handler.start() time.sleep(1) @@ -676,7 +782,9 @@ def main(): config[server_url] + "/data/fuel", jwt, fuel_handler_flag, - fuel_stats_queue,)) + conf_flags.fuel_flag, + fuel_stats_queue, + )) fuel_data_handler.start() time.sleep(1) # waiting fow workers to stop @@ -686,12 +794,20 @@ def main(): fuel_data_handler.join() customLogger.debug("Workers stopped!") + # [REST/MQTT] + conf_observer.join() + gcb_disconnect(gcb_client) + # finalizing stats - stats.combine_stats(temp_stats_queue.get(), load_stats_queue.get(), fuel_stats_queue.get()) + stats.combine_stats( + temp_stats_queue.get(), + load_stats_queue.get(), + fuel_stats_queue.get()) customLogger.debug("Sending device stats data!") stats.send_stats() # checking jwt, if jwt has expired app will restart - jwt_code = auth.check_jwt(jwt, config[server_url] + "/auth/jwt-check") + jwt_code = auth.check_jwt( + jwt, config[server_url] + "/auth/jwt-check") if jwt_code == http_ok: reset = False infoLogger.info("IoT Gateway app shutdown!")