From a65f667d5e416053722582a42fbb5f62eab4ab11 Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 07:47:00 +0200 Subject: [PATCH 1/7] Deleted unnecessary code from app module --- src/app.py | 521 +++-------------------------------------------------- 1 file changed, 26 insertions(+), 495 deletions(-) diff --git a/src/app.py b/src/app.py index 73c52dd..6ddc494 100644 --- a/src/app.py +++ b/src/app.py @@ -8,19 +8,10 @@ --------- signup_periodically(key, username, password, time_pattern, url, interval) Periodically initiates device signup on cloud services. -on_connect_temp_handler(client, userdata, flags, rc,props) - Logic executed after successfully connecting temperature sensor to MQTT broker. -on_connect_load_handler(client, userdata, flags, rc,props) - Logic executed after successfully connecting load sensor to MQTT broker. -on_connect_fuel_handler(client, userdata, flags, rc,props) - Logic executed after successfully connecting fuel sensor to MQTT broker. -collect_temperature_data(interval, url, jwt, time_pattern, mqtt_address, -mqtt_port, mqtt_user,mqtt_pass, flag, stats_queue) - Collects temperature data and periodically initiates data processing and forwarding. -collect_load_data(interval, url, jwt, time_pattern, mqtt_address, mqtt_port, mqtt_user,mqtt_pass,flag, stats_queue) - Collects load data and periodically initiates data processing and forwarding. -collect_fuel_data(limit, url, jwt, time_pattern, mqtt_address, mqtt_port, mqtt_user,mqtt_pass, flag, stats_queue) - Collects temperature data and initiates data filtering and forwarding. +on_connect_protocol_data_handler(client, userdata, flags, return_code,props) + Logic executed after successfully connecting protocol data sensor to MQTT broker. +collect_protocol_data(config, flag, gcb_queue) + Collects protocol data and periodically initiates data processing and forwarding. main() Iot gateway app entrypoint. @@ -50,12 +41,6 @@ Cloud platform API key. TRANSPORT_PROTOCOL: str Transport protocol for MQTT. -TEMP_TOPIC: str - MQTT topic for temperature data. -LOAD_TOPIC: str - MQTT topic for load data. -FUEL_TOPIC: str - MQTT topic for fuel data. HTTP_UNAUTHORIZED: int TEMP_ALARM_TOPIC: str MQTT alarm topic for temperature alarms @@ -77,7 +62,6 @@ """ import auth -import stats_service import data_service import time import logging.config @@ -86,11 +70,10 @@ import re import signal from threading import Thread, Event -from queue import Queue from mqtt_util import MQTTConf, GcbService, \ - GCB_TEMP_TOPIC, GCB_LOAD_TOPIC, GCB_FUEL_TOPIC, GCB_STATS_TOPIC, GCB_PROTOCOL_TOPIC + GCB_PROTOCOL_TOPIC from src.can_protocol import start_protocol_mqtt, processed_ids, get_data_by_id -from config_util import ConfFlags, get_temp_interval, get_fuel_level_limit, \ +from config_util import ConfFlags, \ start_config_observer from mqtt_utils import MQTTClient from config_util import Config @@ -115,21 +98,12 @@ CHANNEL = "channel" BITRATE = "bitrate" -TEMP_SETTINGS = "temp_settings" -LOAD_SETTINGS = "load_settings" -FUEL_SETTINGS = "fuel_settings" -INTERVAL = "interval" -LEVEL_LIMIT = "level_limit" - API_KEY = "api_key" MQTT_BROKER = "mqtt_broker" MQTT_BROKER_LOCAL = "mqtt_broker_local" ADDRESS = "address" PORT = "port" TRANSPORT_PROTOCOL = "tcp" -TEMP_TOPIC = "sensors/temperature" -LOAD_TOPIC = "sensors/arm-load" -FUEL_TOPIC = "sensors/fuel-level" PROTOCOL_TOPIC = "sensors/protocol" HTTP_UNAUTHORIZED = 401 HTTP_OK = 200 @@ -176,81 +150,6 @@ def signup_periodically(key, username, password, time_pattern, url, interval): return jwt -def on_connect_temp_handler(client, userdata, flags, rc, props): - """ - Logic executed after successfully connecting temperature sensor to MQTT broker. - - Parameters - ---------- - client: mqtt.client - userdata: object - flags: - rc: int - props: - """ - if rc == 0: - infoLogger.info( - "Temperature data handler successfully established connection with MQTT broker!") - customLogger.info( - "Temperature data handler successfully established connection with MQTT broker!") - client.subscribe(TEMP_TOPIC, qos=QOS) - else: - errorLogger.error( - "Temperature data handler failed to establish connection with MQTT broker!") - customLogger.critical( - "Temperature data handler failed to establish connection with MQTT broker!") - - -def on_connect_load_handler(client, userdata, flags, rc, props): - """ - Logic executed after successfully connecting arm load sensor to MQTT broker. - - Parameters - ---------- - client: mqtt.client - userdata: object - flags: - rc: int - props: - """ - if rc == 0: - infoLogger.info( - "Arm load data handler successfully established connection with MQTT broker!") - customLogger.info( - "Arm load data handler successfully established connection with MQTT broker!") - client.subscribe(LOAD_TOPIC, qos=QOS) - else: - errorLogger.error( - "Arm load data handler failed to establish connection with MQTT broker!") - customLogger.critical( - "Arm load data handler failed to establish connection with MQTT broker!") - - -def on_connect_fuel_handler(client, userdata, flags, rc, props): - """ - Logic executed after successfully connecting fuel level sensor to MQTT broker. - - Parameters - ---------- - client: mqtt.client - userdata: object - flags: - rc: int - props: - """ - if rc == 0: - infoLogger.info( - "Fuel data handler successfully established connection with MQTT broker!") - customLogger.info( - "Fuel data handler successfully established connection with MQTT broker!") - client.subscribe(FUEL_TOPIC, qos=QOS) - else: - errorLogger.error( - "Fuel data handler failed to establish connection with MQTT broker!") - customLogger.critical( - "Fuel data handler failed to establish connection with MQTT broker!") - - def on_connect_protocol_data_handler(client, userdata, flags, return_code, props): """ Logic executed after successfully connecting protocol data sensor to MQTT broker. @@ -277,8 +176,6 @@ def on_connect_protocol_data_handler(client, userdata, flags, return_code, props # iot data aggregation and forwarding to cloud - - def collect_protocol_data(config, flag, gcb_queue): """ Protocol data handler logic. @@ -333,7 +230,8 @@ def on_message_handler(client, userdata, message): # Dictionary key is supposed to be ProtocolDataEntity id # Value for each key is thread started and information whether the thread is active protocol_data[protocol_data_entity.id] = [] - thread = Thread(target=parse_protocol_data, args=(config, flag, protocol_data_entity, gcb_queue)) + thread = Thread(target=parse_protocol_data, args=(config, flag, protocol_data_entity, + gcb_queue, sensors_broker_client)) processed_ids[protocol_data_entity.id] = {"thread": thread, "stopped": False} thread.start() # Append new data so that protocol data thread can work with it @@ -353,7 +251,7 @@ def cleanup(): sensors_broker_client.connect() -def parse_protocol_data(config, flag, protocol_data_entity, gcb_queue): +def parse_protocol_data(config, flag, protocol_data_entity, gcb_queue, client): """ Thread function to aggregate protocol data and send it to cloud. @@ -386,8 +284,21 @@ def parse_protocol_data(config, flag, protocol_data_entity, gcb_queue): if protocol_data_entity.mode == "OUTPUT": # Send data to cloud GcbService.push_message(gcb_queue, GCB_PROTOCOL_TOPIC, payload) + if ("engine" in protocol_data_entity.name.lower() and + "temperature" in protocol_data_entity.name.lower()) and payload["value"] > 95: + customLogger.info( + "Temperature of " + str(payload["value"]) + + " C is too high! Sounding the alarm!") + client.publish(TEMP_ALARM_TOPIC, True, QOS) + if ("fuel" in protocol_data_entity.name.lower() and + "level" in protocol_data_entity.name.lower()) and payload["value"] < 10: + customLogger.info( + "Fuel level of " + str(payload["value"]) + " l is too low! Sounding the alarm!") + client.publish(FUEL_ALARM_TOPIC, True, QOS) + if "load" in protocol_data_entity.name.lower() and payload["value"] > 1000: + customLogger.info("Load of " + str(payload["value"]) + " kg is too high! Sounding the alarm!") + client.publish(LOAD_ALARM_TOPIC, True, QOS) customLogger.info("PROTOCOL DATA PUBLISHED TO CLOUD") - customLogger.info("PROTOCOL DATA PUBLISHED TO CLOUD") else: infoLogger.warning("There is no sensor data to handle!") time.sleep(interval) @@ -397,325 +308,6 @@ def parse_protocol_data(config, flag, protocol_data_entity, gcb_queue): customLogger.debug("Protocol data with name " + protocol_data_entity.name + " stopped!") -def collect_temperature_data(config, flag, conf_flag, stats_queue, gcb_queue): - """ - Temperature data handler logic. - - Establishes connection with MQTT broker. Listens for incoming messages. Handles received temperature messages and - periodically initiates data processing. - - Parameters - ---------- - config: Config - Configuration object - flag: multithreading.Event - Object used for stopping temperature sensor process. - conf_flag: multithreading.Event - Object used for signalling configuration changes - stats_queue: multithreading.Queue - Stats data wrapper. - gcb_queue: queue.Queue - Belongs to some GcbService instance and is used to queue payload that is to - be sent via mqtt. - """ - new_data = [] - old_data = [] - - interval = get_temp_interval(config) - - sensors_broker_client = MQTTClient( - "temp-data-handler-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=config.mqtt_broker_username, - mqtt_pass=config.mqtt_broker_password, - broker_address=config.mqtt_broker_address, - broker_port=config.mqtt_broker_port, - keepalive=config.temp_settings_interval * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="TEMP", - ) - - # called when there is new message in temp_topic topic - def on_message_handler(client, userdata, message): - """ - Handle received mqtt message. - - After receiving mqtt message, locally stores temperature data. - - Parameters - ---------- - client: mqtt.client - userdata: object - message: object - - Returns - ------- - None - """ - if not flag.is_set(): - data = message.payload.decode("utf-8") - data_value, unit = data_service.parse_incoming_data( - str(data), "temperature") - if data_value == 0.0: - return - new_data.append(str(data)) - customLogger.info("Received temperature data: " + str(data)) - if data_value > 95: - # sound the alarm! ask him what do I send - customLogger.info( - "Temperature of " + str(data_value) + " C is too high! Sounding the alarm!") - client.publish(TEMP_ALARM_TOPIC, True, QOS) - - # initializing stats object - stats = stats_service.Stats() - # initializing mqtt client for collecting sensor data from broker - sensors_broker_client.set_on_connect(on_connect_temp_handler) - sensors_broker_client.set_on_message(on_message_handler) - sensors_broker_client.connect() - # periodically processes collected data and forwards result to cloud - # services - while not flag.is_set(): - customLogger.debug(f"INTERVAL: {interval}") - - if conf_flag.is_set(): - interval = config.temp_settings_interval - conf_flag.clear() - - # copy data from list that is populated with newly arrived data and - # clear that list - data = new_data.copy() - new_data.clear() - # append data that is not sent in previous iterations due to connection - # problem - for i in old_data: - data.append(i) - old_data.clear() - # send payload to Cloud only if there is available data - if len(data) > 0: - payload = data_service.handle_temperature_data(data, config.time_format) - if payload != EMPTY_PAYLOAD: - GcbService.push_message(gcb_queue, GCB_TEMP_TOPIC, payload) - stats.update_data(len(data) * 4, 4, 1) - customLogger.info("TEMP PUBLISHED") - else: - infoLogger.warning( - "There is no temperature sensor data to handle!") - time.sleep(config.temp_settings_interval) - - # shutting down temperature sensor - stats_queue.put(stats) - sensors_broker_client.disconnect() - - customLogger.debug("Temperature data handler shutdown!") - - -def collect_load_data(config, flag, conf_flag, stats_queue, gcb_queue): - """ - Load data handler logic. - - Establishes connection with MQTT broker. Listens for incoming messages. Handles received load messages and - periodically initiates data processing. - - Parameters - ---------- - config: Config - Configuration object - flag: multithreading.Event - Object used for stopping temperature sensor process. - conf_flag: multithreading.Event - Object used for signalling configuration changes - stats_queue: multithreading.Queue - Stats data wrapper. - gcb_queue: queue.Queue - Belongs to some GcbService instance and is used to queue payload that is to - be sent via mqtt. - """ - new_data = [] - old_data = [] - - # called when there is new message in load_topic topic - # initializing mqtt client for collecting sensor data from broker - - sensors_broker_client = MQTTClient( - "load-data-handler-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=config.mqtt_broker_username, - mqtt_pass=config.mqtt_broker_password, - broker_address=config.mqtt_broker_address, - broker_port=config.mqtt_broker_port, - keepalive=config.load_settings_interval * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="LOAD", - ) - - def on_message_handler(client, userdata, message): - """ - Handle received mqtt message. - - After receiving mqtt message, locally stores load data. - - Parameters - ---------- - client: mqtt.client - userdata: object - message: object - """ - if not flag.is_set(): - data = message.payload.decode("utf-8") - data_sum, unit = data_service.parse_incoming_data( - str(data), "load") - if data_sum == 0.0: - return - new_data.append(str(data)) - customLogger.info("Received load data: " + str(data)) - if data_sum > 1000: - # sound the alarm! - customLogger.info("Load of " + str(data_sum) + " kg is too high! Sounding the alarm!") - client.publish(LOAD_ALARM_TOPIC, True, QOS) - - # initializing stats object - stats = stats_service.Stats() - # initializing mqtt client for collecting sensor data from broker - sensors_broker_client.set_on_connect(on_connect_load_handler) - sensors_broker_client.set_on_message(on_message_handler) - sensors_broker_client.connect() - - # periodically processes collected data and forwards result to cloud - # services - sleep_period = config.load_settings_interval - - while not flag.is_set(): - if conf_flag.is_set(): - conf_flag.clear() - - # copy data from list that is populated with newly arrived data and - # clear that list - data = new_data.copy() - new_data.clear() - # append data that is not sent in previous iterations due to connection - # problem - for i in old_data: - data.append(i) - old_data.clear() - # send request to Cloud only if there is available data - if len(data) > 0: - payload = data_service.handle_load_data(data, config.time_format) - if payload != EMPTY_PAYLOAD: - GcbService.push_message(gcb_queue, GCB_LOAD_TOPIC, payload) - stats.update_data(len(data) * 4, 4, 1) - customLogger.info("LOAD PUBLISHED") - else: - infoLogger.warning("There is no arm load sensor data to handle!") - time.sleep(sleep_period) - - # shutting down load sensor - stats_queue.put(stats) - sensors_broker_client.disconnect() - - customLogger.debug("Arm load data handler shutdown!") - - -def collect_fuel_data(config, flag, conf_flag, stats_queue, gcb_queue): - """ - Fuel data handler logic. - - Establishes connection with MQTT broker. Listens for incoming messages. Handles received fuel messages and - initiates data filtering and forwarding. - - Parameters - ---------- - config: Config - Configuration object - flag: multithreading.Event - Object used for stopping temperature sensor process. - conf_flag: multithreading.Event - Object used for signalling configuration changes - stats_queue: multithreading.Queue - Stats data wrapper. - gcb_queue: queue.Queue - Belongs to some GcbService instance and is used to queue payload that is to - be sent via mqtt. - """ - # initializing stats object - - stats = stats_service.Stats() - limit = get_fuel_level_limit(config) - - # called when there is new message in load_topic topic - - sensors_broker_client = MQTTClient( - "fuel-data-handler-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=config.mqtt_broker_username, - mqtt_pass=config.mqtt_broker_password, - broker_address=config.mqtt_broker_address, - broker_port=config.mqtt_broker_port, - keepalive=config.fuel_settings_interval, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="FUEL", - ) - - def on_message_handler(client, userdata, message): - """ - Handle received mqtt message. - - After receiving mqtt message, initiates fuel data processing. - - Parameters - ---------- - client: mqtt.client - userdata: object - message: object - """ - # making sure that flag is not set in the meantime - if not flag.is_set(): - if conf_flag.is_set(): - nonlocal limit - limit = config.fuel_settings_level_limit - conf_flag.clear() - - customLogger.info("Received fuel data: " + str(message.payload.decode("utf-8"))) - payload = data_service.handle_fuel_data( - str(message.payload.decode("utf-8")), - config.fuel_settings_level_limit, - config.time_format, - sensors_broker_client) - - if payload != EMPTY_PAYLOAD: - GcbService.push_message(gcb_queue, GCB_FUEL_TOPIC, payload) - stats.update_data(4, 4, 1) - customLogger.info("FUEL PUBLISHED") - else: - stats.update_data(4, 0, 0) - - # initializing stats object - stats = stats_service.Stats() - # initializing mqtt client for collecting sensor data from broker - sensors_broker_client.set_on_connect(on_connect_fuel_handler) - sensors_broker_client.set_on_message(on_message_handler) - sensors_broker_client.connect() - - # must do like this to be able to stop thread acquired for incoming - # messages(on_message) after flag is set - - while not flag.is_set(): - time.sleep(config.fuel_settings_interval) - # shutting down temperature sensor - stats_queue.put(stats) - sensors_broker_client.disconnect() - - customLogger.debug("Fuel level data handler shutdown!") - - def main(): """Start IoT gateway app entrypoint.""" # used for restarting device due to jwt expiration @@ -765,59 +357,16 @@ def main(): # processes customLogger.debug("Initializing devices stats data!") - stats = stats_service.OverallStats(config.time_format) - temp_stats_queue = Queue() - load_stats_queue = Queue() - fuel_stats_queue = Queue() - # flags are used for stopping data handlers on app shutdown - temp_handler_flag = Event() - load_handler_flag = Event() - fuel_handler_flag = Event() protocol_handler_flag = Event() BetterSignalHandler([signal.SIGINT, signal.SIGTERM], - [temp_handler_flag, - load_handler_flag, - fuel_handler_flag, - protocol_handler_flag, + [protocol_handler_flag, main_execution_flag]) customLogger.debug("Starting workers!") - # creates and starts data handling workers + # creates and starts protocol data handling worker - temperature_data_handler = Thread( - target=collect_temperature_data, - args=( - config, - temp_handler_flag, - conf_flags.temp_flag, - temp_stats_queue, - gcb_service.queue - )) - temperature_data_handler.start() - time.sleep(1) - load_data_handler = Thread( - target=collect_load_data, - args=( - config, - load_handler_flag, - conf_flags.load_flag, - load_stats_queue, - gcb_service.queue - )) - load_data_handler.start() - time.sleep(1) - fuel_data_handler = Thread( - target=collect_fuel_data, - args=( - config, - fuel_handler_flag, - conf_flags.fuel_flag, - fuel_stats_queue, - gcb_service.queue - )) - fuel_data_handler.start() protocol_data_handler = Thread( target=collect_protocol_data, args=( @@ -832,10 +381,7 @@ def main(): # Protocol MQTT module is reponsible for gateway-cloud protocol communication start_protocol_mqtt(main_execution_flag) - # waiting fow workers to stop - temperature_data_handler.join() - load_data_handler.join() - fuel_data_handler.join() + # waiting fow worker to stop protocol_data_handler.join() customLogger.debug("Workers stopped!") @@ -843,21 +389,6 @@ def main(): conf_observer.join() gcb_service.stop() - - # finalizing stats - - stats_payload = stats.combine_stats( - temp_stats_queue.get(), - load_stats_queue.get(), - fuel_stats_queue.get() - ) - - customLogger.debug("Sending device stats data!") - - if stats_payload != EMPTY_PAYLOAD: - GcbService.push_message(gcb_service.queue, GCB_STATS_TOPIC, stats_payload) - print("STATS PUBLISHED: " + str(stats_payload)) - else: customLogger.critical("Cant read config file! Aborting...") From 507f3f4a5f4fb2f2f431addaf54700296940cce8 Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 07:54:18 +0200 Subject: [PATCH 2/7] Tried to correct line break after binary operator error --- src/app.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/app.py b/src/app.py index 6ddc494..68b5c7a 100644 --- a/src/app.py +++ b/src/app.py @@ -284,14 +284,13 @@ def parse_protocol_data(config, flag, protocol_data_entity, gcb_queue, client): if protocol_data_entity.mode == "OUTPUT": # Send data to cloud GcbService.push_message(gcb_queue, GCB_PROTOCOL_TOPIC, payload) - if ("engine" in protocol_data_entity.name.lower() and - "temperature" in protocol_data_entity.name.lower()) and payload["value"] > 95: - customLogger.info( - "Temperature of " + str(payload["value"]) - + " C is too high! Sounding the alarm!") + if ("engine" in protocol_data_entity.name.lower() + and "temperature" in protocol_data_entity.name.lower()) and payload["value"] > 95: + customLogger.info("Temperature of " + str(payload["value"]) + + " C is too high! Sounding the alarm!") client.publish(TEMP_ALARM_TOPIC, True, QOS) - if ("fuel" in protocol_data_entity.name.lower() and - "level" in protocol_data_entity.name.lower()) and payload["value"] < 10: + if ("fuel" in protocol_data_entity.name.lower() + and "level" in protocol_data_entity.name.lower()) and payload["value"] < 10: customLogger.info( "Fuel level of " + str(payload["value"]) + " l is too low! Sounding the alarm!") client.publish(FUEL_ALARM_TOPIC, True, QOS) From ac39d70e27676719f212e456ba68f4373887af9b Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 08:04:43 +0200 Subject: [PATCH 3/7] Fix line break before binary operator error --- src/app.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/app.py b/src/app.py index 68b5c7a..b8e9837 100644 --- a/src/app.py +++ b/src/app.py @@ -284,13 +284,11 @@ def parse_protocol_data(config, flag, protocol_data_entity, gcb_queue, client): if protocol_data_entity.mode == "OUTPUT": # Send data to cloud GcbService.push_message(gcb_queue, GCB_PROTOCOL_TOPIC, payload) - if ("engine" in protocol_data_entity.name.lower() - and "temperature" in protocol_data_entity.name.lower()) and payload["value"] > 95: - customLogger.info("Temperature of " + str(payload["value"]) - + " C is too high! Sounding the alarm!") + if ("engine temperature" in protocol_data_entity.name.lower()) and payload["value"] > 95: + customLogger.info( + "Temperature of " + str(payload["value"]) + " C is too high! Sounding the alarm!") client.publish(TEMP_ALARM_TOPIC, True, QOS) - if ("fuel" in protocol_data_entity.name.lower() - and "level" in protocol_data_entity.name.lower()) and payload["value"] < 10: + if ("fuel level" in protocol_data_entity.name.lower()) and payload["value"] < 10: customLogger.info( "Fuel level of " + str(payload["value"]) + " l is too low! Sounding the alarm!") client.publish(FUEL_ALARM_TOPIC, True, QOS) From fb0bf759c624df88e08d6d5adf28c94cc7db4da1 Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 08:08:53 +0200 Subject: [PATCH 4/7] Data service code cleanup and commented out data service tests for now --- src/data_service.py | 154 +-------------- tests/test_data_service.py | 389 ++++++++++++++++++------------------- 2 files changed, 196 insertions(+), 347 deletions(-) diff --git a/src/data_service.py b/src/data_service.py index 0c87ce6..065a28c 100644 --- a/src/data_service.py +++ b/src/data_service.py @@ -6,28 +6,11 @@ Functions --------- -handle_temperature_data(data, url, jwt, time_format) - Summarizing collected temperature data and forwarding result to cloud service. -handle_load_data(data, url, jwt, time_format) - Summarizing load temperature data and forwarding result to cloud service. -handle_fuel_data(data, limit, url, jwt, time_format) - Filtering collected temperature data and forwarding result to cloud service. -parse_incoming_data(data, type) - Parsing all types of data that come from sources +handle_protocol_data(protocol_data_entity, data, time_format) + Summarizing collected protocol data and forwarding result to cloud service. Constants --------- -DATA_PATTERN - Request body data pattern. - -QOS - Quality of Service of MQTT broker. -TEMP_ALARM_TOPIC: str - MQTT alarm topic for temperature alarms. -LOAD_ALARM_TOPIC: str - MQTT alarm topic for load alarms. -FUEL_ALARM_TOPIC: str - MQTT alarm topic for fuel alarms. EMPTY_PAYLOAD: dict Empty dictionary that is returned if there is some kind of error in data processing. @@ -40,49 +23,9 @@ errorLogger = logging.getLogger('customErrorLogger') customLogger = logging.getLogger('customConsoleLogger') -DATA_PATTERN = "[ value={} , time={} , unit={} ]" - -QOS = 2 -TEMP_ALARM_TOPIC = "alarms/temperature" -LOAD_ALARM_TOPIC = "alarms/load" -FUEL_ALARM_TOPIC = "alarms/fuel" - EMPTY_PAYLOAD = {} -def parse_incoming_data(data, data_type): - """ - Parsing all types of data that come from sources - - Args: - ---- - data: str - Data to be parsed - data_type: str - Data type (Temperature, Load, Fuel) for console output - - Returns: - ------- - data_sum: double - Parsed data value - unit: str - Unit of the parsed data - """ - data_sum = 0.0 - # summarizing collected data - try: - tokens = data.split(" ") - data_sum += float(tokens[1].split("=")[1]) - except BaseException: - errorLogger.error("Invalid " + data_type + " data format! - " + data) - unit = "unknown" - try: - unit = data.split(" ")[6].split("=")[1] - except BaseException: - errorLogger.error("Invalid " + data_type + " data format! - " + data) - return data_sum, unit - - def handle_protocol_data(protocol_data_entity, data, time_format): """ Processes protocol data and generates a summarized payload based on the aggregation method. @@ -134,96 +77,3 @@ def handle_protocol_data(protocol_data_entity, data, time_format): case 'SUM': return {"dataId": protocol_data_entity.id, "value": data_sum, "time": time_value} case 'MIN': return {"dataId": protocol_data_entity.id, "value": min_value, "time": time_value} case 'MAX': return {"dataId": protocol_data_entity.id, "value": max_value, "time": time_value} - - -def handle_temperature_data(data, time_format): - """ - Summarizes collected temperature data and forms payload. - - Triggered periodically. - - Parameters - ---------- - data: list - Collected temperature data. - time_format: str - Cloud services' time format. - - Returns - ------- - payload: dict - """ - data_sum = 0.0 - unit = "Unknown" - for info in data: - data_value, parsed_unit = parse_incoming_data(info, "temperature") - unit = parsed_unit - data_sum += data_value - - time_value = time.strftime(time_format, time.localtime()) - payload = {"value": round(data_sum / len(data), 2), "time": time_value, "unit": unit} - return payload - - -def handle_load_data(data, time_format): - """ - Summarizes collected load data and forms payload. - - Triggered periodically (variable interval). - - Parameters - ---------- - data: list - Collected load data. - time_format: str - Cloud services' time format. - - Returns - ------- - payload: dict - """ - data_sum = 0.0 - unit = "Unknown" - for info in data: - data_value, parsed_unit = parse_incoming_data(info, "load") - unit = parsed_unit - data_sum += data_value - - time_value = time.strftime(time_format, time.localtime()) - payload = {"value": round(data_sum, 2), "time": time_value, "unit": unit} - return payload - - -def handle_fuel_data(data, limit, time_format, alarm_client): - """ - - Summarizes collected fuel, forms payload and sends alarm. - - Triggered periodically. - - Parameters - ---------- - data: list - Collected load data. - limit: double - Critical fuel level. - time_format: str - Cloud services' time format. - alarm_client: MQTTClient - MQTT broker alarm client - - Returns - ------- - http status code - """ - value, unit = parse_incoming_data(str(data), "fuel") - if value == 0.0: - return EMPTY_PAYLOAD - if value <= limit: - customLogger.info("Fuel is below the designated limit! Sounding the alarm") - alarm_client.publish(FUEL_ALARM_TOPIC, True, QOS) - - time_value = time.strftime(time_format, time.localtime()) - - payload = {"value": round(value, 2), "time": time_value, "unit": unit} - return payload diff --git a/tests/test_data_service.py b/tests/test_data_service.py index 32b2a17..7935364 100644 --- a/tests/test_data_service.py +++ b/tests/test_data_service.py @@ -1,195 +1,194 @@ -import time -import unittest -import pytest - -from src.data_service import parse_incoming_data, \ - handle_temperature_data, handle_load_data, handle_fuel_data - - -class TestDataService(object): - TC = unittest.TestCase() - - def test_parse_incoming_data_correct(self): - val, typ = parse_incoming_data("nesto shemso=3 indeks_dva bla bla bla halid=invalid", "datatype") - assert (val == 3.0) - assert (typ == "invalid") - - # TODO: ne raditi ovako nego sa @pytest.mark.parametrize i/ili @fixture - def test_parse_incoming_data_wrong(self): - # TODO: ovaj zakomentarisani, ne javlja gresku, odnosno ako nesto nije broj dobijemo 0?!?! - # TODO: poruke su iste u oba slucaja, treba ih razlikovati - # with self.assertLogs('customErrorLogger', level='ERROR') as cm: - # _ = parse_incoming_data("nesto shemso=nije_broj indeks_dva bla bla bla halid=invalid", "datatype") - # self.assertEqual(cm.output, ['ERROR:customErrorLogger:Invalid datatype data format! - nesto shemso indeks_dva bla bla bla halid=invalid']) - with self.TC.assertLogs('customErrorLogger', level='ERROR') as cm: - _ = parse_incoming_data("nesto shemso=3 indeks_dva bla bla bla halit-ddd", "datatype") - self.TC.assertEqual(cm.output, [ - 'ERROR:customErrorLogger:Invalid datatype data format! - nesto shemso=3 indeks_dva bla bla bla halit-ddd']) - with self.TC.assertLogs('customErrorLogger', level='ERROR') as cm: - _ = parse_incoming_data("nesto shemso=3", "datatype") - self.TC.assertEqual(cm.output, ['ERROR:customErrorLogger:Invalid datatype data format! - nesto shemso=3']) - - @pytest.mark.parametrize('data', [ - [ - '[ value=-2.0 , time=15.04.2024 14:01:06 , unit=C ]', - '[ value=-2.0 , time=15.04.2024 14:01:17 , unit=C ]' - ], - [ - '[ value=-1.0 , time=15.04.2024 14:01:06 , unit=C ]', - '[ value=-3.4 , time=15.04.2024 14:01:17 , unit=C ]', - '[ value=10 , time=15.04.2024 14:01:17 , unit=C ]' - ], - ]) - def test_handle_temperature_data_correct(self, data): - value = 0 - for temp in data: - value += float(temp.split(',')[0].split('=')[1]) - - payload = handle_temperature_data(data, '%d.%m.%Y %H:%M:%S') - self.TC.assertEqual(payload["value"], round(value / len(data), 2)) - self.TC.assertEqual(payload["unit"], 'C') - - @pytest.mark.parametrize('data', [ - [ - '[ value=-aasd , time=15.04.2024 14:01:06 , unit=C ]', - '[ value=[] , time=15.04.2024 14:01:17 , unit=C ]', - '[ value=123 , time=15.04.2024 14:01:17 , unit=C ]', - ] - ]) - def test_handle_temperature_data_wrong_value(self, data): - value = 0 - for temp in data: - try: - value += float(temp.split(',')[0].split('=')[1]) - except: - value += 0 - - payload = handle_temperature_data(data, '%d.%m.%Y %H:%M:%S') - self.TC.assertEqual(payload["value"], round(value / len(data), 2)) - - @pytest.mark.parametrize('data', [ - [ - '[ value=-aasd , time=15.04.2024 14:01:06 , unit=C ]', - '[ value=[] , time=15.04.2024 14:01:17 , unit=lkjas ]', - '[ value=123 , time=15.04.2024 14:01:17 , unit=[] ]', - ] - ]) - def test_handle_temperature_data_wrong_unit(self, data): - # NOTE(stekap): - # If turns out that the unit that is assigned to payload is always the last unit, - # including the possibility of it being 'unknown'. - # Not sure why it works like this, but it doesn't make much sense. - unit = "unknown" - for temp in data: - unit = temp.split(',')[2].split('=')[1].split(' ')[0] - - payload = handle_temperature_data(data, '%d.%m.%Y %H:%M:%S') - self.TC.assertEqual(payload["unit"], unit) - - @pytest.mark.parametrize('time_format', [ - "%d.%m.%Y %f %a %qq %g %l %H:%M:%S", - "asdffb -. asdf" - ]) - def dont_test_handle_temperature_data_wrong_time_format(self, time_format): - data = [ - '[ value=-2.0 , time=15.04.2024 14:01:06 , unit=C ]', - '[ value=-2.0 , time=15.04.2024 14:01:17 , unit=C ]' - ] - has_error = False - try: - payload = handle_temperature_data(data, time_format) - if payload["time"] == time_format: - has_error = True - except ValueError: - has_error = True - - if not has_error: - self.TC.fail("Invalid time format not caught.") - - @pytest.mark.parametrize('data', [ - [ - '[ value=81.123 , time=15.04.2024 14:01:06 , unit=kg ]', - '[ value=123.123 , time=15.04.2024 14:01:17 , unit=kg ]', - '[ value=1192.2 , time=15.04.2024 14:01:17 , unit=kg ]' - ] - ]) - def test_handle_load_data_correct(self, data): - value = 0 - for temp in data: - value += float(temp.split(',')[0].split('=')[1]) - - payload = handle_load_data(data, '%d.%m.%Y %H:%M:%S') - self.TC.assertEqual(payload["value"], round(value, 2)) - self.TC.assertEqual(payload["unit"], 'kg') - - @pytest.mark.parametrize('data', [ - [ - '[ value=aasdf , time=15.04.2024 14:01:06 , unit=kg ]', - '[ value=[] , time=15.04.2024 14:01:17 , unit=kg ]', - '[ value=-12s2 , time=15.04.2024 14:01:17 , unit=kg ]' - ] - ]) - def test_handle_load_data_wrong_value(self, data): - - value = 0 - for temp in data: - try: - value += float(temp.split(',')[0].split('=')[1]) - except: - value += 0 - - payload = handle_load_data(data, '%d.%m.%Y %H:%M:%S') - self.TC.assertEqual(payload["value"], round(value, 2)) - - @pytest.mark.parametrize('data', [ - [ - '[ value=aasdf , time=15.04.2024 14:01:06 , unit=1-1 ]', - '[ value=[] , time=15.04.2024 14:01:17 , unit=lkj1 ]', - '[ value=-12s2 , time=15.04.2024 14:01:17 , unit=[] ]' - ] - ]) - def test_handle_load_data_wrong_unit(self, data): - # NOTE(stekap): - # If turns out that the unit that is assigned to payload is always the last unit, - # including the possibility of it being 'unknown'. - # Not sure why it works like this, but it doesn't make much sense. - unit = "unknown" - for temp in data: - unit = temp.split(',')[2].split('=')[1].split(' ')[0] - - payload = handle_load_data(data, '%d.%m.%Y %H:%M:%S') - self.TC.assertEqual(payload["unit"], unit) - - @pytest.mark.parametrize('time_format', [ - "%d.%m.%Y %f %a %qq %g %l %H:%M:%S", - "asdffb -. asdf" - ]) - def dont_test_handle_load_data_wrong_time_format(self, time_format): - data = [ - '[ value=1233.0 , time=15.04.2024 14:01:06 , unit=C ]', - '[ value=1233.0 , time=15.04.2024 14:01:17 , unit=C ]' - ] - has_error = False - try: - payload = handle_load_data(data, time_format) - if payload["time"] == time_format: - has_error = True - except ValueError: - has_error = True - - if not has_error: - self.TC.fail("Invalid time format not caught.") - - def test_handle_fuel_data_correct(self): - # NOTE(stekap): - # To me it makes no sense that the data is only sent when the alarm is triggered (value <= limit). - # Original code only used 'value <= limit' comparison as filter for data, and now it turns out it - # is used as filter and as alarm trigger AT THE SAME TIME, which makes no sense. - # Alarm should be handled with another simple filter. - - # FUEL_DATA_FORMAT : [ value=125.83 , time=15.04.2024 14:01:06 , unit=l ] - - pass - - def test_handle_fuel_data_wrong(self): - pass +# import unittest +# import pytest + +# from src.data_service import parse_incoming_data, \ +# handle_temperature_data, handle_load_data, handle_fuel_data +# +# +# class TestDataService(object): +# TC = unittest.TestCase() +# +# def test_parse_incoming_data_correct(self): +# val, typ = parse_incoming_data("nesto shemso=3 indeks_dva bla bla bla halid=invalid", "datatype") +# assert (val == 3.0) +# assert (typ == "invalid") +# +# # TODO: ne raditi ovako nego sa @pytest.mark.parametrize i/ili @fixture +# def test_parse_incoming_data_wrong(self): +# # TODO: ovaj zakomentarisani, ne javlja gresku, odnosno ako nesto nije broj dobijemo 0?!?! +# # TODO: poruke su iste u oba slucaja, treba ih razlikovati +# # with self.assertLogs('customErrorLogger', level='ERROR') as cm: +# # _ = parse_incoming_data("nesto shemso=nije_broj indeks_dva bla bla bla halid=invalid", "datatype") +# # self.assertEqual(cm.output, ['ERROR:customErrorLogger:Invalid datatype data format! - nesto shemso indeks_dva bla bla bla halid=invalid']) +# with self.TC.assertLogs('customErrorLogger', level='ERROR') as cm: +# _ = parse_incoming_data("nesto shemso=3 indeks_dva bla bla bla halit-ddd", "datatype") +# self.TC.assertEqual(cm.output, [ +# 'ERROR:customErrorLogger:Invalid datatype data format! - nesto shemso=3 indeks_dva bla bla bla halit-ddd']) +# with self.TC.assertLogs('customErrorLogger', level='ERROR') as cm: +# _ = parse_incoming_data("nesto shemso=3", "datatype") +# self.TC.assertEqual(cm.output, ['ERROR:customErrorLogger:Invalid datatype data format! - nesto shemso=3']) +# +# @pytest.mark.parametrize('data', [ +# [ +# '[ value=-2.0 , time=15.04.2024 14:01:06 , unit=C ]', +# '[ value=-2.0 , time=15.04.2024 14:01:17 , unit=C ]' +# ], +# [ +# '[ value=-1.0 , time=15.04.2024 14:01:06 , unit=C ]', +# '[ value=-3.4 , time=15.04.2024 14:01:17 , unit=C ]', +# '[ value=10 , time=15.04.2024 14:01:17 , unit=C ]' +# ], +# ]) +# def test_handle_temperature_data_correct(self, data): +# value = 0 +# for temp in data: +# value += float(temp.split(',')[0].split('=')[1]) +# +# payload = handle_temperature_data(data, '%d.%m.%Y %H:%M:%S') +# self.TC.assertEqual(payload["value"], round(value / len(data), 2)) +# self.TC.assertEqual(payload["unit"], 'C') +# +# @pytest.mark.parametrize('data', [ +# [ +# '[ value=-aasd , time=15.04.2024 14:01:06 , unit=C ]', +# '[ value=[] , time=15.04.2024 14:01:17 , unit=C ]', +# '[ value=123 , time=15.04.2024 14:01:17 , unit=C ]', +# ] +# ]) +# def test_handle_temperature_data_wrong_value(self, data): +# value = 0 +# for temp in data: +# try: +# value += float(temp.split(',')[0].split('=')[1]) +# except: +# value += 0 +# +# payload = handle_temperature_data(data, '%d.%m.%Y %H:%M:%S') +# self.TC.assertEqual(payload["value"], round(value / len(data), 2)) +# +# @pytest.mark.parametrize('data', [ +# [ +# '[ value=-aasd , time=15.04.2024 14:01:06 , unit=C ]', +# '[ value=[] , time=15.04.2024 14:01:17 , unit=lkjas ]', +# '[ value=123 , time=15.04.2024 14:01:17 , unit=[] ]', +# ] +# ]) +# def test_handle_temperature_data_wrong_unit(self, data): +# # NOTE(stekap): +# # If turns out that the unit that is assigned to payload is always the last unit, +# # including the possibility of it being 'unknown'. +# # Not sure why it works like this, but it doesn't make much sense. +# unit = "unknown" +# for temp in data: +# unit = temp.split(',')[2].split('=')[1].split(' ')[0] +# +# payload = handle_temperature_data(data, '%d.%m.%Y %H:%M:%S') +# self.TC.assertEqual(payload["unit"], unit) +# +# @pytest.mark.parametrize('time_format', [ +# "%d.%m.%Y %f %a %qq %g %l %H:%M:%S", +# "asdffb -. asdf" +# ]) +# def dont_test_handle_temperature_data_wrong_time_format(self, time_format): +# data = [ +# '[ value=-2.0 , time=15.04.2024 14:01:06 , unit=C ]', +# '[ value=-2.0 , time=15.04.2024 14:01:17 , unit=C ]' +# ] +# has_error = False +# try: +# payload = handle_temperature_data(data, time_format) +# if payload["time"] == time_format: +# has_error = True +# except ValueError: +# has_error = True +# +# if not has_error: +# self.TC.fail("Invalid time format not caught.") +# +# @pytest.mark.parametrize('data', [ +# [ +# '[ value=81.123 , time=15.04.2024 14:01:06 , unit=kg ]', +# '[ value=123.123 , time=15.04.2024 14:01:17 , unit=kg ]', +# '[ value=1192.2 , time=15.04.2024 14:01:17 , unit=kg ]' +# ] +# ]) +# def test_handle_load_data_correct(self, data): +# value = 0 +# for temp in data: +# value += float(temp.split(',')[0].split('=')[1]) +# +# payload = handle_load_data(data, '%d.%m.%Y %H:%M:%S') +# self.TC.assertEqual(payload["value"], round(value, 2)) +# self.TC.assertEqual(payload["unit"], 'kg') +# +# @pytest.mark.parametrize('data', [ +# [ +# '[ value=aasdf , time=15.04.2024 14:01:06 , unit=kg ]', +# '[ value=[] , time=15.04.2024 14:01:17 , unit=kg ]', +# '[ value=-12s2 , time=15.04.2024 14:01:17 , unit=kg ]' +# ] +# ]) +# def test_handle_load_data_wrong_value(self, data): +# +# value = 0 +# for temp in data: +# try: +# value += float(temp.split(',')[0].split('=')[1]) +# except: +# value += 0 +# +# payload = handle_load_data(data, '%d.%m.%Y %H:%M:%S') +# self.TC.assertEqual(payload["value"], round(value, 2)) +# +# @pytest.mark.parametrize('data', [ +# [ +# '[ value=aasdf , time=15.04.2024 14:01:06 , unit=1-1 ]', +# '[ value=[] , time=15.04.2024 14:01:17 , unit=lkj1 ]', +# '[ value=-12s2 , time=15.04.2024 14:01:17 , unit=[] ]' +# ] +# ]) +# def test_handle_load_data_wrong_unit(self, data): +# # NOTE(stekap): +# # If turns out that the unit that is assigned to payload is always the last unit, +# # including the possibility of it being 'unknown'. +# # Not sure why it works like this, but it doesn't make much sense. +# unit = "unknown" +# for temp in data: +# unit = temp.split(',')[2].split('=')[1].split(' ')[0] +# +# payload = handle_load_data(data, '%d.%m.%Y %H:%M:%S') +# self.TC.assertEqual(payload["unit"], unit) +# +# @pytest.mark.parametrize('time_format', [ +# "%d.%m.%Y %f %a %qq %g %l %H:%M:%S", +# "asdffb -. asdf" +# ]) +# def dont_test_handle_load_data_wrong_time_format(self, time_format): +# data = [ +# '[ value=1233.0 , time=15.04.2024 14:01:06 , unit=C ]', +# '[ value=1233.0 , time=15.04.2024 14:01:17 , unit=C ]' +# ] +# has_error = False +# try: +# payload = handle_load_data(data, time_format) +# if payload["time"] == time_format: +# has_error = True +# except ValueError: +# has_error = True +# +# if not has_error: +# self.TC.fail("Invalid time format not caught.") +# +# def test_handle_fuel_data_correct(self): +# # NOTE(stekap): +# # To me it makes no sense that the data is only sent when the alarm is triggered (value <= limit). +# # Original code only used 'value <= limit' comparison as filter for data, and now it turns out it +# # is used as filter and as alarm trigger AT THE SAME TIME, which makes no sense. +# # Alarm should be handled with another simple filter. +# +# # FUEL_DATA_FORMAT : [ value=125.83 , time=15.04.2024 14:01:06 , unit=l ] +# +# pass +# +# def test_handle_fuel_data_wrong(self): +# pass From ab45ccd297e201d488f17d94197aff5343bf29d6 Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 08:13:02 +0200 Subject: [PATCH 5/7] Deleted sensor_devices, stats_service and test_stats_service unnecessary files --- src/sensor_devices.py | 849 ------------------------------------ src/stats_service.py | 178 -------- tests/test_stats_service.py | 93 ---- 3 files changed, 1120 deletions(-) delete mode 100644 src/sensor_devices.py delete mode 100644 src/stats_service.py delete mode 100644 tests/test_stats_service.py diff --git a/src/sensor_devices.py b/src/sensor_devices.py deleted file mode 100644 index 624a6ba..0000000 --- a/src/sensor_devices.py +++ /dev/null @@ -1,849 +0,0 @@ -""" -sensor_devices -============ -Module with logic that simulates three different sensors: fuel level sensor, engine temperature sensor, arm load sensor -Classes ---------- -InitFlags: A class that encapsulates four flags that keep track which threads have been initiated - -Functions ---------- -on_publish(client, userdata,result) - Logic executed after receiving MQTT broadcast message. - -on_connect_temp_sensor(client, userdata, flags, rc,props) - Logic executed after successfully establishing connection between temperature sensor and MQTT broker. - -on_connect_load_sensor(client, userdata, flags, rc,props) - Logic executed after successfully establishing connection between arm load sensor and MQTT broker. - -on_connect_fuel_sensor(client, userdata, flags, rc,props) - Logic executed after successfully establishing connection between fuel sensor and MQTT broker. - -measure_temperature_periodically(period, min_val, avg_val, broker_address, broker_port,mqtt_username,mqtt_pass, flag) - Periodically generates value representing current temperature. - -measure_load_randomly(min_t, max_t, min_val, max_val, broker_address, broker_port, mqtt_username,mqtt_pass, flag) - Periodically generates value representing current arm load mass. - -measure_fuel_periodically(period, capacity, consumption, efficiency, refill, broker_address, broker_port, - mqtt_username, mqtt_pass, flag= - Periodically generates value representing current fuel level. - -read_conf() - Loading config data from config file. Returns sensors' configuration. - -sensor_devices() - Creates 3 processes that represent 3 types of sensors, based on sensors' config and implemented logic. - -main() - Used for testing purposes only. Starts 3 sensors processes and stops them after user request. - -Constants ---------- -conf_file_path : str - Path to sensors' config file. - -""" -import threading -import time -import random -from pathlib import Path -import signal -import numpy -import json -import math -import paho.mqtt.client as mqtt -from multiprocessing import Event -import logging.config -import logging -from can_service import read_can -from config_util import ConfFlags, start_config_observer -from mqtt_utils import MQTTClient -from config_util import Config -from signal_control import BetterSignalHandler - -# setting up loggers -logging_path = Path(__file__).parent / 'logging.conf' -logging.config.fileConfig(logging_path) -infoLogger = logging.getLogger('customInfoLogger') -errorLogger = logging.getLogger('customErrorLogger') -customLogger = logging.getLogger("customConsoleLogger") - -# keywords used in sensors' config file -MODE = "mode" -TEMP_SETTINGS = "temp_settings" -LOAD_SETTINGS = "load_settings" -FUEL_SETTINGS = "fuel_settings" -CAN_GENERAL_SETTINGS = "can_general_settings" -CHANNEL = "channel" -INTERFACE = "interface" -BITRATE = "bitrate" - -TEMP_SENSOR = "temp_sensor" -ARM_SENSOR = "arm_sensor" -ARM_MIN_T = "min_t" -ARM_MAX_T = "max_t" -FUEL_SENSOR = "fuel_sensor" -FUEL_CONSUMPTION = "consumption" -FUEL_CAPACITY = "capacity" -FUEL_EFFICIENCY = "efficiency" -FUEL_REFILL = "refill" -INTERVAL = "period" -MQTT_USER = "username" -MQTT_PASSWORD = "password" -MAX = "max_val" -MIN = "min_val" -AVG = "avg_val" -MQTT_BROKER = "mqtt_broker" -ADDRESS = "address" -PORT = "port" - -# sensors config file -CONF_FILE_PATH = "configuration/sensor_conf.json" -APP_CONF_FILE_PATH = "configuration/app_conf.json" - -# mqtt config data -TRANSPORT_PROTOCOL = "tcp" -QOS = 2 - -# REST APIs -TEMP_TOPIC = "sensors/temperature" -LOAD_TOPIC = "sensors/arm-load" -FUEL_TOPIC = "sensors/fuel-level" - -DATA_PATTERN = "[ value={} , time={} , unit={} ]" -TIME_FORMAT = "%d.%m.%Y %H:%M:%S" - -CELZIUS = "C" -KG = "kg" -LITER = "l" - - -def on_publish(client, userdata, result): - """ - Logic executed after receiving mqtt message. - - Parameters - ---------- - client : paho.mqtt.client - userdata : object - result: object - - Returns - ------- - None - """ - pass - - -def on_connect_temp_sensor(client, userdata, flags, rc, props): - """ - Logic executed after establishing connection between temperature sensor process and mqtt broker - - Parameters - ---------- - client : mqtt.client - userdata : object - flags: - rc: int - props: - - - Returns - ------- - None - """ - if rc == 0: - infoLogger.info( - "Temperature sensor successfully established connection with MQTT broker!") - customLogger.debug( - "Temperature sensor successfully established connection with MQTT broker!") - else: - errorLogger.error( - "Temperature sensor failed to establish connection with MQTT broker!") - customLogger.critical( - "Temperature sensor failed to establish connection with MQTT broker!") - - -def on_connect_load_sensor(client, userdata, flags, rc, props): - """ - Logic executed after establishing connection between arm load sensor process and mqtt broker - - Parameters - ---------- - client : paho.mqtt.client - userdata : object - flags: - rc: int - props: - - - Returns - ------- - None - """ - if rc == 0: - infoLogger.info( - "Arm load sensor successfully established connection with MQTT broker!") - customLogger.debug( - "Arm load sensor successfully established connection with MQTT broker!") - else: - errorLogger.error( - "Arm load sensor failed to establish connection with MQTT broker!") - errorLogger.critical( - "Arm load sensor failed to establish connection with MQTT broker!") - - -def on_connect_fuel_sensor(client, userdata, flags, rc, props): - """ - Logic executed after establishing connection between FUEL sensor process and mqtt broker - - Parameters - ---------- - client : paho.mqtt.client - userdata : object - flags: - rc: int - props: - - - Returns - ------- - None - """ - if rc == 0: - infoLogger.info( - "Fuel sensor successfully established connection with MQTT broker!") - customLogger.debug( - "Fuel sensor successfully established connection with MQTT broker!") - else: - errorLogger.error( - "Fuel sensor failed to establish connection with MQTT broker!") - customLogger.critical( - "Fuel sensor failed to establish connection with MQTT broker!") - - -# period = measuring interval in sec, min_val/max_val = min/max measured value - - -def measure_temperature_periodically( - period, - min_val, - avg_val, - broker_address, - broker_port, - mqtt_username, - mqtt_pass, - flag, - config_flag, - init_flags, - temp_lock): - """ - Emulates temperature sensor. - - Periodically generates temperature sensor reading. - - Parameters - ---------- - period: int - Measuring interval. - min_val: int - Min temperature value that sensor can detect. - avg_val: int - Avg engine temperature value. - broker_address: str - MQTT broker's URL. - broker_port: int - MQTT broker's port. - mqtt_username: str - Username required for establishing connection with MQTT broker. - mqtt_pass: str - Password required for establishing connection with MQTT broker. - flag: multiprocessing.Event - Object used for stopping temperature sensor process. - - Returns - ------- - None - """ - customLogger.debug("Temperature sensor started!") - customLogger.debug( - "Temperature sensor conf: interval={}s , min={}˚C , avg={}C".format( - period, min_val, avg_val)) - # preventing division by zero - if period == 0: - period = 1 - period = abs(round(period)) - # establishing connection with MQTT broker - temp_client = MQTTClient( - "temp-sensor-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=mqtt_username, - mqtt_pass=mqtt_pass, - broker_address=broker_address, - broker_port=broker_port, - keepalive=2 * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="TEMP") - temp_client.set_on_connect(on_connect_temp_sensor) - temp_client.set_on_publish(on_publish) - temp_client.connect() - # provide sensor with data for 7 days - values_count = round(7 * 24 * 60 * 60 / period) - data = numpy.random.uniform(-5, 5, values_count) - counter = 0 - # determines whether engine is warming up - raising = True - # starting temp - value = min_val - # shutting down sensor depending on flag - while not flag.is_set(): - - if config_flag.is_set(): - config = Config(APP_CONF_FILE_PATH, errorLogger, customLogger) - config.try_open() - if config.temp_mode == "CAN": - temp_lock.acquire() - init_flags.temp_simulator_initiated = False - temp_lock.release() - config_flag.clear() - break - - time.sleep(period) - # check connection to mqtt broker - temp_client.try_reconnect() - try: - # generating new measured value - if raising: - value += numpy.random.uniform(0, math.ceil(period / 10), 1)[0] - if value > avg_val: - raising = False - else: - value = avg_val + data[counter % values_count] - counter += 1 - customLogger.error( - "Temperature: " + DATA_PATTERN.format( - "{:.2f}".format(value), - str( - time.strftime( - TIME_FORMAT, - time.localtime())), - CELZIUS)) - # send data to MQTT broker - temp_client.publish( - TEMP_TOPIC, DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), CELZIUS), qos=QOS) - except BaseException: - errorLogger.error( - "Connection between temperature sensor and MQTT broker is broken!") - customLogger.critical( - "Connection between temperature sensor and MQTT broker is broken!") - temp_client.disconnect() - flag.clear() - infoLogger.info("Temperature sensor shutdown!") - customLogger.debug("Temperature sensor shutdown!") - - -# min_t/max_t = min/max measuring period in sec, min_val/max_val = min/max -# measured value -def measure_load_randomly( - min_t, - max_t, - min_val, - max_val, - broker_address, - broker_port, - mqtt_username, - mqtt_pass, - flag, - config_flag, - init_flags, - load_lock): - """ - Emulates arm load sensor. - - Randomly generates arm load sensor reading. - - Parameters - ---------- - min_t: int - Min time-lapse between two measurements. - max_t: int - Max time-lapse between two measurements. - min_val: int - Min load value that sensor can detect. - max_val: int - Max load value that sensor can handle. - broker_address: str - MQTT broker's URL - broker_port: int - MQTT broker's port. - mqtt_username: str - Username required for establishing connection with MQTT broker. - mqtt_pass: str - Password required for establishing connection with MQTT broker. - flag: multiprocessing.Event - Object used for stopping temperature sensor process. - - Returns - ------- - None - """ - customLogger.debug("Arm load sensor started!") - customLogger.debug( - "Arm load sensor conf: min_interval={}s , max_interval={}s , min={}kg , max={}kg".format( - min_t, max_t, min_val, max_val)) - # parameter validation - if max_t <= min_t: - max_t = min_t + random.randint(0, 10) - min_t = abs(round(min_t)) - max_t = abs(round(max_t)) - # establishing connection with MQTT broker - load_client = MQTTClient( - "arm-load-sensor-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=mqtt_username, - mqtt_pass=mqtt_pass, - broker_address=broker_address, - broker_port=broker_port, - keepalive=2 * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="LOAD") - load_client.set_on_connect(on_connect_load_sensor) - load_client.set_on_publish(on_publish) - load_client.connect() - - # provide sensor with data for at least 7 days - values_count = round(7 * 24 * 60 * 60 / min_t) - # measuring intervals - intervals = numpy.random.uniform(min_t, max_t, values_count) - # measured data - data = numpy.random.uniform(min_val, max_val, values_count) - counter = 0 - # shut down sensor depending on set flag - while not flag.is_set(): - - if config_flag.is_set(): - config = Config(APP_CONF_FILE_PATH, errorLogger, customLogger) - config.try_open() - if config.load_mode == "CAN": - load_lock.acquire() - init_flags.load_simulator_initiated = False - load_lock.release() - config_flag.clear() - break - - time.sleep(round(intervals[counter % values_count])) - # check connection to mqtt broker - load_client.try_reconnect() - try: - customLogger.info("Load: " + DATA_PATTERN.format("{:.2f}".format(data[counter % values_count]), - str(time.strftime(TIME_FORMAT, time.localtime())), - KG)) - # send data to MQTT broker - load_client.publish(LOAD_TOPIC, DATA_PATTERN.format("{:.2f}".format(data[counter % values_count]), str( - time.strftime(TIME_FORMAT, time.localtime())), KG), qos=QOS) - except BaseException: - errorLogger.error( - "Connection between arm load sensor and MQTT broker is broken!") - customLogger.critical( - "Connection between arm load sensor and MQTT broker is broken!") - counter += 1 - load_client.disconnect() - flag.clear() - infoLogger.info("Arm load sensor shutdown!") - customLogger.debug("Arm load sensor shutdown!") - - -# period = measuring interval , capacity = fuel tank capacity , refill = fuel tank refill probability (0-1) -# consumption = fuel usage consumption per working hour, efficiency = -# machine work efficiency (0-1) -def measure_fuel_periodically( - period, - capacity, - consumption, - efficiency, - refill, - broker_address, - broker_port, - mqtt_username, - mqtt_pass, - flag, - config_flag, - init_flags, - fuel_lock): - """ - Emulates fuel sensor. - - Periodically generates fuel level sensor reading. - - Parameters - ---------- - period: int - Measuring interval. - capacity: int - Capacity of fuel tank. - consumption: float - Engine fuel consumption [l/h]. - efficiency: float - Engine efficiency. - refill: float - Probability of engine refill. - broker_address: str - MQTT broker's URL - broker_port: int - MQTT broker's port. - mqtt_username: str - Username required for establishing connection with MQTT broker. - mqtt_pass: str - Password required for establishing connection with MQTT broker. - flag: multiprocessing.Event - Object used for stopping temperature sensor process. - """ - customLogger.debug("Fuel level sensor started!") - customLogger.debug( - "Fuel level sensor conf: period={}s, capacity={}l, consumption={}l/h, efficiency={}, refill={}".format( - period, - capacity, - consumption, - efficiency, - refill)) - - # parameter validation - if period == 0: - period = 1 - period = abs(round(period)) - # establishing connection with MQTT broker - fuel_client = MQTTClient( - "fuel-sensor-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=mqtt_username, - mqtt_pass=mqtt_pass, - broker_address=broker_address, - broker_port=broker_port, - keepalive=2 * 3, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="FUEL") - fuel_client.set_on_connect(on_connect_fuel_sensor) - fuel_client.set_on_publish(on_publish) - fuel_client.connect() - # at first fuel tank is randomly filled - value = random.randint(round(capacity / 2), round(capacity)) - # constant for scaling consumption per hour to per second - scale = 1 / (60 * 60) - # shutting down sensor depending on set flag - refilling = False - while not flag.is_set(): - - if config_flag.is_set(): - config = Config(APP_CONF_FILE_PATH, errorLogger, customLogger) - config.try_open() - if config.fuel_mode == "CAN": - fuel_lock.acquire() - init_flags.fuel_simulator_initiated = False - fuel_lock.release() - config_flag.clear() - break - - time.sleep(period) - # fuel tank is filling - if refilling: - value = random.randint(round(value), round(capacity)) - refilling = False - else: - # deciding whether fuel tank should be refilled based on refill - # probability - refilling = random.random() < refill - # amount of consumed fuel is determined based on fuel consumption, time elapsed - # from previous measuring and machine state (on/of) - consumed = period * consumption * scale * (1 + 1 - efficiency) - # generating new measured value - value -= consumed - if value <= 0: - value = 0 - refilling = True - # check connection to mqtt broker - fuel_client.try_reconnect() - try: - customLogger.warning( - "Fuel: " + DATA_PATTERN.format( - "{:.2f}".format(value), - str( - time.strftime( - TIME_FORMAT, - time.localtime())), - LITER)) - # send data to MQTT broker - fuel_client.publish( - FUEL_TOPIC, DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), LITER), qos=QOS) - except BaseException: - errorLogger.error( - "Connection between fuel level sensor and MQTT broker is broken!") - customLogger.critical( - "Connection between fuel level sensor and MQTT broker is broken!") - fuel_client.disconnect() - flag.clear() - infoLogger.info("Fuel level sensor shutdown!") - customLogger.debug("Fuel level sensor shutdown!") - - -# read sensor conf data -def read_conf(): - """ - Loads sensors' config from config file. - - If config file is inaccessible, default config is used. - """ - data = None - try: - conf_file = open(CONF_FILE_PATH) - data = json.load(conf_file) - except BaseException: - errorLogger.critical( - "Using default config! Can't read sensor config file - ", - CONF_FILE_PATH, - " !") - customLogger.critical( - "Using default config! Can't read sensor config file - ", - CONF_FILE_PATH, - " !") - - data = { - TEMP_SENSOR: { - INTERVAL: 5, - MIN: -10, - AVG: 100}, - ARM_SENSOR: { - ARM_MIN_T: 10, - ARM_MAX_T: 100, - MIN: 0, - MAX: 800}, - FUEL_SENSOR: { - INTERVAL: 5, - FUEL_CAPACITY: 300, - FUEL_CONSUMPTION: 3000, - FUEL_EFFICIENCY: 0.6, - FUEL_REFILL: 0.02}, - MQTT_BROKER: { - ADDRESS: "localhost", - PORT: 1883, - MQTT_USER: "iot-device", - MQTT_PASSWORD: "password"}} - return data - - -# creating sensor processes -def sensors_devices(temp_flag, load_flag, fuel_flag, can_flag, config_flags, - init_flags, temp_lock, load_lock, fuel_lock, can_lock): - """ - Creates 3 subprocesses representing 3 sensor devices. - - Parameters - ---------- - can_flag : multiprocessing.Event - temp_flag : multiprocessing.Event - load_flag : multiprocessing.Event - fuel_flag : multiprocessing.Event - - Returns - ------- - None - """ - conf_data = read_conf() - # app_conf_data = read_app_conf() - app_conf = Config(APP_CONF_FILE_PATH, errorLogger, customLogger) - app_conf.try_open() - - sensors = [] - - is_can_temp = False - is_can_load = False - is_can_fuel = False - - if app_conf.temp_mode == "CAN": - is_can_temp = True - if app_conf.load_mode == "CAN": - is_can_load = True - if app_conf.fuel_mode == "CAN": - is_can_fuel = True - - if is_can_temp or is_can_load or is_can_fuel: - if not init_flags.can_initiated: - can_sensor = threading.Thread( - target=read_can, - args=( - can_flag, - config_flags.can_flag, - init_flags, - can_lock)) - - sensors.append(can_sensor) - can_lock.acquire() - init_flags.can_initiated = True - can_lock.release() - if app_conf.temp_mode == "SIMULATOR": - if not init_flags.temp_simulator_initiated: - simulation_temperature_sensor = threading.Thread( - target=measure_temperature_periodically, - args=( - conf_data[TEMP_SENSOR][INTERVAL], - conf_data[TEMP_SENSOR][MIN], - conf_data[TEMP_SENSOR][AVG], - conf_data[MQTT_BROKER][ADDRESS], - conf_data[MQTT_BROKER][PORT], - conf_data[MQTT_BROKER][MQTT_USER], - conf_data[MQTT_BROKER][MQTT_PASSWORD], - temp_flag, - config_flags.temp_flag, - init_flags, - temp_lock)) - sensors.append(simulation_temperature_sensor) - temp_lock.acquire() - init_flags.temp_simulator_initiated = True - temp_lock.release() - if app_conf.load_mode == "SIMULATOR": - if not init_flags.load_simulator_initiated: - simulation_load_sensor = threading.Thread( - target=measure_load_randomly, - args=( - conf_data[ARM_SENSOR][ARM_MIN_T], - conf_data[ARM_SENSOR][ARM_MAX_T], - conf_data[ARM_SENSOR][MIN], - conf_data[ARM_SENSOR][MAX], - conf_data[MQTT_BROKER][ADDRESS], - conf_data[MQTT_BROKER][PORT], - conf_data[MQTT_BROKER][MQTT_USER], - conf_data[MQTT_BROKER][MQTT_PASSWORD], - load_flag, - config_flags.load_flag, - init_flags, - load_lock)) - sensors.append(simulation_load_sensor) - load_lock.acquire() - init_flags.load_simulator_initiated = True - load_lock.release() - if app_conf.fuel_mode == "SIMULATOR": - - if not init_flags.fuel_simulator_initiated: - simulation_fuel_sensor = threading.Thread( - target=measure_fuel_periodically, - args=( - conf_data[FUEL_SENSOR][INTERVAL], - conf_data[FUEL_SENSOR][FUEL_CAPACITY], - conf_data[FUEL_SENSOR][FUEL_CONSUMPTION], - conf_data[FUEL_SENSOR][FUEL_EFFICIENCY], - conf_data[FUEL_SENSOR][FUEL_REFILL], - conf_data[MQTT_BROKER][ADDRESS], - conf_data[MQTT_BROKER][PORT], - conf_data[MQTT_BROKER][MQTT_USER], - conf_data[MQTT_BROKER][MQTT_PASSWORD], - fuel_flag, - config_flags.fuel_flag, - init_flags, - fuel_lock)) - sensors.append(simulation_fuel_sensor) - fuel_lock.acquire() - init_flags.fuel_simulator_initiated = True - fuel_lock.release() - return sensors - - -class InitFlags: - """A class that encapsulates four flags that keep track which threads have been initiated""" - - def __init__(self): - """ - Constructor that initializes an MQTT object. - None of the threads are initiated at first. - """ - self.can_initiated = False - self.temp_simulator_initiated = False - self.load_simulator_initiated = False - self.fuel_simulator_initiated = False - - -def main(): - """ - Used for testing sensors. - - Creates and executes 3 sensor subprocesses. Contains logic for user requested sensors' shutdown. - """ - temp_simulation_flag = Event() - load_simulation_flag = Event() - fuel_simulation_flag = Event() - can_flag = Event() - - main_execution_flag = Event() - - BetterSignalHandler([signal.SIGINT, - signal.SIGTERM], - [temp_simulation_flag, - load_simulation_flag, - fuel_simulation_flag, - can_flag, - main_execution_flag]) - - temp_lock = threading.Lock() - load_lock = threading.Lock() - fuel_lock = threading.Lock() - can_lock = threading.Lock() - - app_config_flags = ConfFlags() - init_flags = InitFlags() - app_config_observer = start_config_observer(app_config_flags) - - initial = True - sensors = [] - - customLogger.debug("Sensor system starting!") - initial = True - - # dictionary to track which thread to join, remembering old and new flags - # from config - - while not main_execution_flag.is_set(): - if app_config_flags.execution_flag.is_set() or initial: - initial = False - sensors = sensors_devices( - temp_simulation_flag, - load_simulation_flag, - fuel_simulation_flag, - can_flag, - app_config_flags, - init_flags, - can_lock, - temp_lock, - load_lock, - fuel_lock) - app_config_flags.execution_flag.clear() - if sensors is not None: - for sensor in sensors: - sensor.start() - time.sleep(0.1) - time.sleep(2) - for sensor in sensors: - sensor.join() - - app_config_observer.stop() - app_config_observer.join() - infoLogger.info("Sensor system shutdown!") - customLogger.debug("Sensor system shutdown!") - - -if __name__ == '__main__': - main() diff --git a/src/stats_service.py b/src/stats_service.py deleted file mode 100644 index 176ee67..0000000 --- a/src/stats_service.py +++ /dev/null @@ -1,178 +0,0 @@ -"""Stats service utilities. - -stats_service -============= -Module that contains logic used for collecting stats about savings in sensor data sent over the internet. - -Classes ---------- - -Stats - Class representing stats regarding single sensor data transmission. -OverallStats - Class representing stats regarding whole gateway app data transmission. - -Functions ---------- - -Constants ---------- - -""" -import time -import logging.config - -# setting up loggers -logging.config.fileConfig('logging.conf') -errorLogger = logging.getLogger('customErrorLogger') -customLogger = logging.getLogger('customConsoleLogger') - - -class Stats: - """ - Represents single sensor stats regarding data collected and transmitted over network. - - Attributes - ---------- - dataBytes: int - Amount of collected sensor data in bytes. - dataBytesForwarded: int - Amount of sensor data sent to cloud services in bytes. - dataRequests: int - Number of requests to cloud services. - - Methods - ------- - update_data(self, bytes, forwarded, requests) - Updating stats data. - """ - - def __init__(self): - """Initialize Stats object.""" - self.dataBytes = 0 - self.dataBytesForwarded = 0 - self.dataRequests = 0 - - def update_data(self, bytes, forwarded, requests): - """ - Update current stats with new collected sensor data. - - Parameters - ---------- - bytes: int - Collected sensor data in bytes. - forwarded: int - Sent sensor data in bytes. - requests: int - Number of made cloud service requests. - - Returns - ------- - None - """ - self.dataBytes += bytes - self.dataBytesForwarded += forwarded - self.dataRequests += requests - - -class OverallStats: - """ - Represents overall IoT gateway stats regarding data collected and transmitted over network. - - Attributes - ---------- - time_pattern: str - Server date-time format. - startTime: str - Start of collecting stats. - endTime: str - End of collecting stats. - tempDataBytes: int - Amount of collected temperature data [byte]. - tempDataBytesForwarded: int - Amount of transmitted temperature data [byte]. - tempDataRequests: int - Number of requests to temperature stats service. - loadDataBytes: int - Amount of collected load data [byte]. - loadDataBytesForwarded: int - Amount of transmitted load data [byte]. - loadDataRequests: int - Number of requests to load stats service. - fuelDataBytes: int - Amount of collected fuel data [byte]. - fuelDataBytesForwarded: int - Amount of transmitted fuel data [byte]. - fuelDataRequests: int - Number of requests to fuel stats service. - - Methods - ------- - combine_stats(self, temp_stats, load_stats, fuel_stats) - Combines stats from different sensors into overall stats. - send_stats(self): - Sends collected stats dato to stats cloud service. - """ - - def __init__(self, time_pattern): - """Init OverallStats object. - - Parameters - ---------- - time_pattern - """ - self.time_pattern = time_pattern - self.startTime = time.strftime(self.time_pattern, time.localtime()) - self.endTime = "" - self.tempDataBytes = 0 - self.tempDataBytesForwarded = 0 - self.tempDataRequests = 0 - self.loadDataBytes = 0 - self.loadDataBytesForwarded = 0 - self.loadDataRequests = 0 - self.fuelDataBytes = 0 - self.fuelDataBytesForwarded = 0 - self.fuelDataRequests = 0 - - def combine_stats(self, temp_stats, load_stats, fuel_stats): - """ - Combine stats from different sensors into overall stats and return - it as payload. - - Parameters - ---------- - temp_stats: Stats - Temperature stats data. - load_stats: Stats - Load stats data. - fuel_stats: Stats - Fuel stats data. - - Returns - ------- - payload: dict - Payload that represents combined stats data. - """ - self.tempDataBytes = temp_stats.dataBytes - self.tempDataBytesForwarded = temp_stats.dataBytesForwarded - self.tempDataRequests = temp_stats.dataRequests - self.loadDataBytes = load_stats.dataBytes - self.loadDataBytesForwarded = load_stats.dataBytesForwarded - self.loadDataRequests = load_stats.dataRequests - self.fuelDataBytes = fuel_stats.dataBytes - self.fuelDataBytesForwarded = fuel_stats.dataBytesForwarded - self.fuelDataRequests = fuel_stats.dataRequests - - self.endTime = time.strftime(self.time_pattern, time.localtime()) - payload = {"startTime": self.startTime, - "endTime": self.endTime, - "tempDataBytes": self.tempDataBytes, - "tempDataBytesForwarded": self.tempDataBytesForwarded, - "tempDataRequests": self.tempDataRequests, - "loadDataBytes": self.loadDataBytes, - "loadDataBytesForwarded": self.loadDataBytesForwarded, - "loadDataRequests": self.loadDataRequests, - "fuelDataBytes": self.fuelDataBytes, - "fuelDataBytesForwarded": self.fuelDataBytesForwarded, - "fuelDataRequests": self.fuelDataRequests} - return payload diff --git a/tests/test_stats_service.py b/tests/test_stats_service.py deleted file mode 100644 index fc1507d..0000000 --- a/tests/test_stats_service.py +++ /dev/null @@ -1,93 +0,0 @@ -import unittest -import pytest -from src.stats_service import Stats, OverallStats - - -class TestStatsService(object): - TC = unittest.TestCase() - - @pytest.mark.parametrize('bytes,forwarded,requests', [ - (2, 3, 4), - (0, 0, 0), - (-1, 123, 1), - ]) - def test_stats_update_data_correct(self, bytes, forwarded, requests): - stats = Stats() - - # Initial state - stats.update_data(bytes, forwarded, requests) - self.TC.assertEqual(stats.dataBytes, bytes) - self.TC.assertEqual(stats.dataBytesForwarded, forwarded) - self.TC.assertEqual(stats.dataRequests, requests) - - # Non-initial state - stats.update_data(bytes, forwarded, requests) - self.TC.assertEqual(stats.dataBytes, 2 * bytes) - self.TC.assertEqual(stats.dataBytesForwarded, 2 * forwarded) - self.TC.assertEqual(stats.dataRequests, 2 * requests) - - @pytest.mark.parametrize('bytes,forwarded,requests', [ - (1, 2, 'f'), - ('as', 2, 2), - (-1, 'asdf', 2), - ([], 1, 2) - ]) - def test_stats_update_data_wrong(self, bytes, forwarded, requests): - stats = Stats() - self.TC.assertRaises(TypeError, stats.update_data, bytes, forwarded, requests) - - @pytest.mark.parametrize('temp_stats_arr,load_stats_arr,fuel_stats_arr', [ - ([1, [], 1], [2, 2, 2], [3, 3, 3]), - ([1, 1, 1], [2, 2, 2], [3, 'asd', 3]), - ([1, 1, 1], [2, '2', 2], [3, 3, 3]) - ]) - def dont_test_overall_stats_combine_stats_wrong_stats_input(self, - temp_stats_arr, - load_stats_arr, - fuel_stats_arr): - temp_stats = Stats() - temp_stats.dataBytes = temp_stats_arr[0] - temp_stats.dataRequests = temp_stats_arr[1] - temp_stats.dataBytesForwarded = temp_stats_arr[2] - - load_stats = Stats() - load_stats.dataBytes = load_stats_arr[0] - load_stats.dataRequests = load_stats_arr[1] - load_stats.dataBytesForwarded = load_stats_arr[2] - - fuel_stats = Stats() - fuel_stats.dataBytes = fuel_stats_arr[0] - fuel_stats.dataRequests = fuel_stats_arr[1] - fuel_stats.dataBytesForwarded = fuel_stats_arr[2] - - time_format = "dd.MM.yyyy HH:mm:ss" - - overall_stats = OverallStats(time_format) - self.TC.assertRaises(TypeError, - overall_stats.combine_stats, - temp_stats, - load_stats, - fuel_stats) - - @pytest.mark.parametrize('time_format', [ - "%d.%m.%Y %f %a %qq %g %l %H:%M:%S", - "asdffb -. asdf" - ]) - def dont_test_overall_stats_wrong_time_format(self, - time_format): - dummy_stats = Stats() - dummy_stats.update_data(0, 0, 0) - - has_error = False - try: - overall_stats = OverallStats(time_format) - if overall_stats.startTime == time_format: - has_error = True - overall_stats.combine_stats(dummy_stats, - dummy_stats, - dummy_stats) - except ValueError: - has_error = True - - if not has_error: - self.TC.fail("Invalid time format not caught.") From 8785c51b917299f5002c65e44d66b4bdbba599dd Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 14:51:55 +0200 Subject: [PATCH 6/7] CAN modifications made --- src/can_service.py | 672 +++++++++++---------------------------------- 1 file changed, 153 insertions(+), 519 deletions(-) diff --git a/src/can_service.py b/src/can_service.py index 8e7634e..756e9fd 100644 --- a/src/can_service.py +++ b/src/can_service.py @@ -11,24 +11,14 @@ --------- read_can(execution_flag, config_flag, init_flags, can_lock) Thread execution function from sensor_devices main() for CAN communication -stop_can(notifier, bus, temp_client, load_client, fuel_client) +stop_can(notifier, bus, protocol_client) Used for stopping all CAN functionalities -init_mqtt_clients(bus, is_can_temp, is_can_load, is_can_fuel, config, flag) +init_mqtt_clients(bus, config, flag) Used for initializing MQTT clients that publish read CAN messages on_publish(topic, payload, qos) Event handler for published messages to a MQTT topic -on_subscribe_temp_alarm(client, userdata, flags, rc, props) - Event handler for subscribing to the temperature alarm MQTT topic -on_subscribe_load_alarm(client, userdata, flags, rc, props) - Event handler for subscribing to the load alarm MQTT topic -on_subscribe_fuel_alarm(client, userdata, flags, rc, props) - Event handler for subscribing to the fuel alarm MQTT topic -on_connect_temp_sensor(client, userdata, flags, rc, props) - Even handler for subscribing to the temperature messages MQTT topic -on_connect_load_sensor(client, userdata, flags, rc, props) - Even handler for subscribing to the load messages MQTT topic -on_connect_fuel_sensor(client, userdata, flags, rc, props) - Even handler for subscribing to the fuel messages MQTT topic +on_subscribe_protocol(client, userdata, flags, rc, props) + Event handler for protocol client MQTT subscription Constants --------- @@ -36,26 +26,12 @@ Path to the configuration file transport_protocol: str JSON key for MQTT transport protocol -temp_topic: str - MQTT topic for temperature data -load_topic: str - MQTT topic for load data -fuel_topic: str - MQTT topic for fuel data protocol_topic: str MQTT topic for protocol data data_pattern: str Format by which data is sent to MQTT brokers protocol_data_pattern: str Format by which protocol data is sent to MQTT brokers -time_format: str - Format by which time is sent to MQTT brokers -celzius: str - Temperature measuring unit -kg: str - Load measuring unit -_l: str - Fuel measuring unit qos: int Quality of service of MQTT. """ @@ -69,6 +45,9 @@ import json from threading import Thread import mqtt_util +import signal +from multiprocessing import Event +from signal_control import BetterSignalHandler from mqtt_utils import MQTTClient from can.listener import Listener from can.interface import Bus @@ -80,48 +59,16 @@ errorLogger = logging.getLogger('customErrorLogger') customLogger = logging.getLogger("customConsoleLogger") -CONF_FILE_PATH = "configuration/sensor_conf.json" APP_CONF_FILE_PATH = "configuration/app_conf.json" - TRANSPORT_PROTOCOL = "tcp" -TEMP_TOPIC = "sensors/temperature" -LOAD_TOPIC = "sensors/arm-load" -FUEL_TOPIC = "sensors/fuel-level" PROTOCOL_TOPIC = "sensors/protocol" PROTOCOL_INPUT_TOPIC = "sensors/protocol-input" PROTOCOL_VALUE_TOPIC = "gateway/protocol-value" -DATA_PATTERN = "[ value={} , time={} , unit={} ]" PROTOCOL_DATA_PATTERN = "[ value={} , time={} , data_id={} ]" TIME_FORMAT = "%d.%m.%Y %H:%M:%S" -CELZIUS = "C" -KG = "kg" -_L = "l" - -MODE = "mode" -TEMP_SETTINGS = "temp_settings" -LOAD_SETTINGS = "load_settings" -FUEL_SETTINGS = "fuel_settings" -CAN_GENERAL_SETTINGS = "can_general_settings" - -CHANNEL = "channel" -INTERFACE = "interface" -BITRATE = "bitrate" - -TEMP_SENSOR = "temp_sensor" -ARM_SENSOR = "arm_sensor" -ARM_MIN_T = "min_t" -ARM_MAX_T = "max_t" -FUEL_SENSOR = "fuel_sensor" -FUEL_CONSUMPTION = "consumption" -FUEL_CAPACITY = "capacity" -FUEL_EFFICIENCY = "efficiency" -FUEL_REFILL = "refill" INTERVAL = "period" MQTT_USER = "username" MQTT_PASSWORD = "password" -_MAX = "max_val" -_MIN = "min_val" -_AVG = "avg_val" MQTT_BROKER = "mqtt_broker" ADDRESS = "address" PORT = "port" @@ -213,7 +160,7 @@ def extract_bits(byte_array, start_bit, length): return integer_value -def read_can(execution_flag, config_flag, init_flags, can_lock): +def read_can(execution_flag, can_lock): """ Thread execution function from sensor_devices main() for CAN communication It connects to an instance of CAN bus, which is then tied to a Notifier object, which listens to the bus for @@ -246,41 +193,29 @@ def read_can(execution_flag, config_flag, init_flags, can_lock): can_listener = None initial = True notifier = None - temp_client = None - load_client = None - fuel_client = None protocol_client = None try: while not execution_flag.is_set(): - if config_flag.is_set() or initial: + if initial: config = Config(APP_CONF_FILE_PATH, errorLogger, customLogger) config.try_open() - stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_client) + stop_can(notifier, bus, protocol_client) interface_value = config.can_interface channel_value = config.can_channel bitrate_value = config.can_bitrate - is_can_temp = True if config.temp_mode == "CAN" else False - is_can_load = True if config.load_mode == "CAN" else False - is_can_fuel = True if config.fuel_mode == "CAN" else False - - if (is_can_temp is False) and ( - is_can_load is False) and (is_can_fuel is False): - break - bus = Bus(interface=interface_value, channel=channel_value, bitrate=bitrate_value) - temp_client, load_client, fuel_client, protocol_client = init_mqtt_clients( - bus, is_can_temp, is_can_load, is_can_fuel, config, execution_flag) + protocol_client = init_mqtt_clients( + bus, config, execution_flag) notifier = can.Notifier(bus, [], timeout=period) - can_listener = CANListener(temp_client, load_client, fuel_client, protocol_client) + can_listener = CANListener(protocol_client) notifier.add_listener(can_listener) initial = False - config_flag.clear() time.sleep(period) period_counter += 1 @@ -296,15 +231,14 @@ def read_can(execution_flag, config_flag, init_flags, can_lock): customLogger.debug("CAN BUS has been shut down.") can_lock.acquire() - init_flags.can_initiated = False can_lock.release() - stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_client) + stop_can(notifier, bus, protocol_client) execution_flag.clear() customLogger.debug("CAN process shutdown!") -def stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_client): +def stop_can(notifier, bus, protocol_client): """ Used for stopping all CAN functionalities @@ -314,24 +248,12 @@ def stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_clie Object that listens to incoming CAN messages bus: can.Bus CAN bus - temp_client: mqtt_utils.MQTTClient - Temperature MQTT broker client - load_client: mqtt_utils.MQTTClient - Load MQTT broker client - fuel_client: mqtt_utils.MQTTClient - Fuel MQTT broker client protocol_client: mqtt_utils.MQTTClient Protocol data MQTT broker client """ if notifier is not None: notifier.stop(timeout=5) - if temp_client is not None: - temp_client.disconnect() - if load_client is not None: - load_client.disconnect() - if fuel_client is not None: - fuel_client.disconnect() if protocol_client is not None: protocol_client.disconnect() if bus is not None: @@ -340,9 +262,6 @@ def stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_clie def init_mqtt_clients( bus, - is_can_temp, - is_can_load, - is_can_fuel, config, flag): """ @@ -352,158 +271,90 @@ def init_mqtt_clients( ---- bus: can.Bus CAN bus - is_can_temp: boolean - Flag that indicates if the configuration demands the Notifier to read CAN temperature messages - is_can_load: boolean - Flag that indicates if the configuration demands the Notifier to read CAN load messages - is_can_fuel: boolean - Flag that indicates if the configuration demands the Notifier to read CAN fuel messages - + config: Config + Class holding configuration parameters + In this case, used for MQTT + flag: Flag + Used for stopping MQTT client """ - temp_client = None - load_client = None - fuel_client = None - - if is_can_temp: - temp_client = MQTTClient( - "temp-can-sensor-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=config.mqtt_broker_username, - mqtt_pass=config.mqtt_broker_password, - broker_address=config.mqtt_broker_address, - broker_port=config.mqtt_broker_port, - keepalive=config.temp_settings_interval, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="TEMP") - - def on_message_temp_alarm(client, userdata, msg): - can_message = can.Message(arbitration_id=0x120, - data=[bool(msg.payload)], - is_extended_id=False, - is_remote_frame=False) - bus.send(msg=can_message, timeout=5) - customLogger.info( - "Temperature alarm registered! Forwarding to CAN!") - - temp_client.set_on_connect(on_connect_temp_sensor) - temp_client.set_on_publish(on_publish) - temp_client.set_on_subscribe(on_subscribe_temp_alarm) - temp_client.set_on_message(on_message_temp_alarm) - temp_client.connect() - - if is_can_load: - load_client = MQTTClient( - "load-can-sensor-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=config.mqtt_broker_username, - mqtt_pass=config.mqtt_broker_password, - broker_address=config.mqtt_broker_address, - broker_port=config.mqtt_broker_port, - keepalive=config.load_settings_interval, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="LOAD") - - def on_message_load_alarm(client, userdata, msg): - can_message = can.Message(arbitration_id=0x121, - data=[bool(msg.payload)], - is_extended_id=False, - is_remote_frame=False) - bus.send(msg=can_message, timeout=5) - customLogger.info("Load alarm registered! Forwarding to CAN!") - - load_client.set_on_connect(on_connect_load_sensor) - load_client.set_on_publish(on_publish) - load_client.set_on_subscribe(on_subscribe_load_alarm) - load_client.set_on_message(on_message_load_alarm) - load_client.connect() - - if is_can_fuel: - fuel_client = MQTTClient( - "fuel-can-sensor-mqtt-client", - transport_protocol=TRANSPORT_PROTOCOL, - protocol_version=mqtt.MQTTv5, - mqtt_username=config.mqtt_broker_username, - mqtt_pass=config.mqtt_broker_password, - broker_address=config.mqtt_broker_address, - broker_port=config.mqtt_broker_port, - keepalive=config.fuel_settings_interval, - infoLogger=infoLogger, - errorLogger=errorLogger, - flag=flag, - sensor_type="FUEL") - - def on_message_fuel_alarm(client, userdata, msg): - can_message = can.Message(arbitration_id=0x122, - data=[bool(msg.payload)], - is_extended_id=False, - is_remote_frame=False) - bus.send(msg=can_message, timeout=5) - customLogger.info("Fuel alarm registered! Forwarding to CAN!") - - fuel_client.set_on_connect(on_connect_fuel_sensor) - fuel_client.set_on_publish(on_publish) - fuel_client.set_on_subscribe(on_subscribe_fuel_alarm) - fuel_client.set_on_message(on_message_fuel_alarm) - fuel_client.connect() - def on_message_protocol_alarm(client, userdata, msg): try: - payload = msg.payload.decode('utf-8') - data = json.loads(payload) - type = data["type"] - action = data["action"] - - if type == "can_message": - if action == "send": - data_id = data["dataId"] - protocol_data_from_db = get_data_by_id(data_id) - value = data["value"] - customLogger.info( - f"Received protocol input set message: type={type}, " - f"action={action}, dataId={data_id}, value={value}") - # Start a new thread which will send data periodically - if protocol_data_from_db.id not in processed_ids: - thread = Thread(target=parse_input_protocol_data, args=(flag, value, - protocol_data_from_db, bus,)) - processed_ids[protocol_data_from_db.id] = {"thread": thread, "stopped": False, "value": value} - thread.start() - customLogger.info("Received protocol input data: " + str(data)) - elif action == "stop": - # Stop data sending if user wants to - data_id = data["dataId"] - processed_ids[data_id]["stopped"] = True - elif action == "remove": - # If protocol is removed from the device stop all relevant threads - protocol_data_ids = data["protocol_data_ids"] - for id in protocol_data_ids: - if id in processed_ids: - processed_ids[id]["stopped"] = True - elif action == "get_current_values": - # Data returned to cloud so user has the latest info about sending - filtered_data = [ - {"id": 0, "dataId": id_, "value": data["value"]} - for id_, data in processed_ids.items() - if not data["stopped"] - ] - message = json.dumps(filtered_data) - gateway_client = mqtt_util.gcb_init_publisher( - "protocol-input-value-publisher-client-id", - config.gateway_cloud_broker_iot_username, - config.gateway_cloud_broker_iot_password) - mqtt_util.gcb_connect(gateway_client, config.gateway_cloud_broker_address, - config.gateway_cloud_broker_port) - gateway_client.publish(PROTOCOL_VALUE_TOPIC, message, 2) - gateway_client.loop_start() - # Without sleep client disconnects too fast and doesn't send MQTT message - time.sleep(1) - gateway_client.loop_stop() - gateway_client.disconnect() + topic = msg.topic + print("Topic is " + topic) + if topic == PROTOCOL_INPUT_TOPIC: + payload = msg.payload.decode('utf-8') + data = json.loads(payload) + type = data["type"] + action = data["action"] + + if type == "can_message": + if action == "send": + data_id = data["dataId"] + protocol_data_from_db = get_data_by_id(data_id) + value = data["value"] + customLogger.info( + f"Received protocol input set message: type={type}, " + f"action={action}, dataId={data_id}, value={value}") + # Start a new thread which will send data periodically + if protocol_data_from_db.id not in processed_ids: + thread = Thread(target=parse_input_protocol_data, args=(flag, value, + protocol_data_from_db, bus,)) + processed_ids[protocol_data_from_db.id] = {"thread": thread, + "stopped": False, "value": value} + thread.start() + customLogger.info("Received protocol input data: " + str(data)) + elif action == "stop": + # Stop data sending if user wants to + data_id = data["dataId"] + processed_ids[data_id]["stopped"] = True + elif action == "remove": + # If protocol is removed from the device stop all relevant threads + protocol_data_ids = data["protocol_data_ids"] + for id in protocol_data_ids: + if id in processed_ids: + processed_ids[id]["stopped"] = True + elif action == "get_current_values": + # Data returned to cloud so user has the latest info about sending + filtered_data = [ + {"id": 0, "dataId": id_, "value": data["value"]} + for id_, data in processed_ids.items() + if not data["stopped"] + ] + message = json.dumps(filtered_data) + gateway_client = mqtt_util.gcb_init_publisher( + "protocol-input-value-publisher-client-id", + config.gateway_cloud_broker_iot_username, + config.gateway_cloud_broker_iot_password) + mqtt_util.gcb_connect(gateway_client, config.gateway_cloud_broker_address, + config.gateway_cloud_broker_port) + gateway_client.publish(PROTOCOL_VALUE_TOPIC, message, 2) + gateway_client.loop_start() + # Without sleep client disconnects too fast and doesn't send MQTT message + time.sleep(1) + gateway_client.loop_stop() + gateway_client.disconnect() + elif topic == TEMP_ALARM_TOPIC: + can_message = can.Message(arbitration_id=0x120, + data=[bool(msg.payload)], + is_extended_id=False, + is_remote_frame=False) + bus.send(msg=can_message, timeout=5) + customLogger.info( + "Temperature alarm registered! Forwarding to CAN!") + elif topic == LOAD_ALARM_TOPIC: + can_message = can.Message(arbitration_id=0x121, + data=[bool(msg.payload)], + is_extended_id=False, + is_remote_frame=False) + bus.send(msg=can_message, timeout=5) + customLogger.info("Load alarm registered! Forwarding to CAN!") + elif topic == FUEL_ALARM_TOPIC: + can_message = can.Message(arbitration_id=0x122, + data=[bool(msg.payload)], + is_extended_id=False, + is_remote_frame=False) + bus.send(msg=can_message, timeout=5) + customLogger.info("Fuel alarm registered! Forwarding to CAN!") except json.JSONDecodeError: customLogger.error("Failed to decode JSON from MQTT message payload.") except Exception as e: @@ -529,7 +380,7 @@ def on_message_protocol_alarm(client, userdata, msg): protocol_client.set_on_message(on_message_protocol_alarm) protocol_client.connect() - return temp_client, load_client, fuel_client, protocol_client + return protocol_client def on_publish(topic, payload, qos): @@ -548,79 +399,6 @@ def on_publish(topic, payload, qos): pass -def on_subscribe_temp_alarm(client, userdata, flags, rc, props): - """ - Event handler for published messages to a MQTT topic - Args: - ---- - client: paho.mqtt.client.Client - userdata: - flags: - rc: - props: - - """ - if rc == 0: - infoLogger.info( - "CAN Temperature alarm client successfully established connection with MQTT broker!") - customLogger.debug( - "CAN Temperature alarm client successfully established connection with MQTT broker!") - else: - errorLogger.error( - "CAN Temperature alarm client failed to establish connection with MQTT broker!") - customLogger.critical( - "CAN Temperature alarm client failed to establish connection with MQTT broker!") - - -def on_subscribe_load_alarm(client, userdata, flags, rc, props): - """ - Event handler for published messages to a MQTT topic - Args: - ---- - client: paho.mqtt.client.Client - userdata: - flags: - rc: - props: - - """ - if rc == 0: - infoLogger.info( - "CAN Load alarm client successfully established connection with MQTT broker!") - customLogger.debug( - "CAN Load alarm client successfully established connection with MQTT broker!") - else: - errorLogger.error( - "CAN Load alarm client failed to establish connection with MQTT broker!") - customLogger.critical( - "CAN Load alarm client failed to establish connection with MQTT broker!") - - -def on_subscribe_fuel_alarm(client, userdata, flags, rc, props): - """ - Event handler for published messages to a MQTT topic - Args: - ---- - client: paho.mqtt.client.Client - userdata: - flags: - rc: - props: - - """ - if rc == 0: - infoLogger.info( - "CAN Load alarm client successfully established connection with MQTT broker!") - customLogger.debug( - "CAN Load alarm client successfully established connection with MQTT broker!") - # client.subscribe(FUEL_ALARM_TOPIC, qos=QOS) - else: - errorLogger.error( - "CAN Load alarm client failed to establish connection with MQTT broker!") - customLogger.critical( - "CAN Load alarm client failed to establish connection with MQTT broker!") - - def on_subscribe_protocol(client, userdata, flags, rc, props): """ Event handler for published messages to a MQTT topic @@ -645,81 +423,6 @@ def on_subscribe_protocol(client, userdata, flags, rc, props): "Protocol client failed to establish connection with MQTT broker!") -def on_connect_temp_sensor(client, userdata, flags, rc, props): - """ - Event handler for published messages to a MQTT topic - Args: - ---- - client: paho.mqtt.client.Client - userdata: - flags: - rc: - props: - - """ - if rc == 0: - infoLogger.info( - "CAN Temperature sensor successfully established connection with MQTT broker!") - customLogger.debug( - "CAN Temperature sensor successfully established connection with MQTT broker!") - client.subscribe(TEMP_ALARM_TOPIC, qos=QOS) - else: - errorLogger.error( - "CAN Temperature sensor failed to establish connection with MQTT broker!") - customLogger.critical( - "CAN Temperature sensor failed to establish connection with MQTT broker!") - - -def on_connect_load_sensor(client, userdata, flags, rc, props): - """ - Event handler for published messages to a MQTT topic - Args: - ---- - client: paho.mqtt.client.Client - userdata: - flags: - rc: - props: - - """ - if rc == 0: - infoLogger.info( - "CAN Load sensor successfully established connection with MQTT broker!") - customLogger.debug( - "CAN Load sensor successfully established connection with MQTT broker!") - client.subscribe(LOAD_ALARM_TOPIC, qos=QOS) - else: - errorLogger.error( - "CAN Load sensor failed to establish connection with MQTT broker!") - customLogger.critical( - "CAN Load sensor failed to establish connection with MQTT broker!") - - -def on_connect_fuel_sensor(client, userdata, flags, rc, props): - """ - Event handler for published messages to a MQTT topic - Args: - ---- - client: paho.mqtt.client.Client - userdata: - flags: - rc: - props: - - """ - if rc == 0: - infoLogger.info( - "CAN Fuel sensor successfully established connection with MQTT broker!") - customLogger.debug( - "CAN Fuel sensor successfully established connection with MQTT broker!") - client.subscribe(FUEL_ALARM_TOPIC, qos=QOS) - else: - errorLogger.error( - "CAN Fuel sensor failed to establish connection with MQTT broker!") - customLogger.critical( - "CAN Fuel sensor failed to establish connection with MQTT broker!") - - def on_connect_protocol_sensor(client, userdata, flags, rc, props): """ Event handler for published messages to a MQTT topic @@ -738,6 +441,9 @@ def on_connect_protocol_sensor(client, userdata, flags, rc, props): customLogger.debug( "CAN Protocol sensor successfully established connection with MQTT broker!") client.subscribe(PROTOCOL_INPUT_TOPIC, qos=QOS) + client.subscribe(TEMP_ALARM_TOPIC, qos=QOS) + client.subscribe(FUEL_ALARM_TOPIC, qos=QOS) + client.subscribe(LOAD_ALARM_TOPIC, qos=QOS) else: errorLogger.error( "CAN Protocol sensor failed to establish connection with MQTT broker!") @@ -758,36 +464,20 @@ class CANListener (Listener): Methods: ------- __init__(temp_client, load_client, fuel_client): Class constructor for initializing class objects - set_temp_client(client): Setter for the temperature MQTT broker client - set_load_client(client): Setter for the load MQTT broker client - set_fuel_client(client): Setter for the fuel MQTT broker client + set_protocol_client(client): Setter for the protocol MQTT broker client on_message_received(msg): Event handler for receiving messages from the CAN bus """ - def __init__(self, temp_client, load_client, fuel_client, protocol_client): + def __init__(self, protocol_client): """ Constructor for initializing CANListener object Args: ---- - temp_client: MQTT temperature broker client - load_client: MQTT load broker client - fuel_client: MQTT fuel broker client protocol_client: MQTT protocol data broker client """ super().__init__() - if temp_client is not None: - temp_client.connect() - self.temp_client = temp_client - - if load_client is not None: - load_client.connect() - self.load_client = load_client - - if fuel_client is not None: - fuel_client.connect() - self.fuel_client = fuel_client if protocol_client is not None: protocol_client.connect() @@ -796,48 +486,6 @@ def __init__(self, temp_client, load_client, fuel_client, protocol_client): # counter that counts received messages self.message_counter = 0 - def set_temp_client(self, client): - """ - Setter for the temperature MQTT broker client - - Args: - ---- - client: MQTT temperature broker client - - """ - if client is None: - if self.temp_client is not None: - self.temp_client.disconnect() - self.temp_client = client - - def set_load_client(self, client): - """ - Setter for the load MQTT broker client - - Args: - ---- - client: MQTT load broker client - - """ - if client is None: - if self.temp_client is not None: - self.temp_client.disconnect() - self.load_client = client - - def set_fuel_client(self, client): - """ - Setter for the fuel MQTT broker client - - Args: - ---- - client: MQTT fuel broker client - - """ - if client is None: - if self.temp_client is not None: - self.temp_client.disconnect() - self.fuel_client = client - def set_protocol_client(self, client): """ Setter for the protocol data MQTT broker client @@ -868,71 +516,57 @@ def on_message_received(self, msg): # msg.data is a byte array, need to turn it into a single value int_value = int.from_bytes(msg.data, byteorder="big", signed=True) value = int_value / 10.0 - if self.temp_client is not None: - self.temp_client.try_reconnect() - if self.load_client is not None: - self.load_client.try_reconnect() - if self.fuel_client is not None: - self.fuel_client.try_reconnect() if self.protocol_client is not None: self.protocol_client.try_reconnect() # Extract CAN ID value to search for it in the database - hex_string_without_prefix = hex(msg.arbitration_id)[2:] - integer_value = int(hex_string_without_prefix, 10) - # Get all protocol data based on CAN ID (ProtocolDataEntity objects) - rows = get_data_by_can_id(integer_value) - - with lock: - for protocol_data_entity in rows: - # Only OUTPUT messages are parsed, INPUT sent from cloud - if protocol_data_entity.mode == "OUTPUT": - # Extract value from relevant bits (range from start bit to start bit + num of bits) - extracted_value = extract_bits(msg.data, protocol_data_entity.start_bit, - protocol_data_entity.num_bits) - extracted_double_value = extracted_value / 10.0 - # Send message to app module via MQTT - self.protocol_client.publish( - PROTOCOL_TOPIC, PROTOCOL_DATA_PATTERN.format( - "{:.2f}".format(extracted_double_value), str( - time.strftime( - TIME_FORMAT, time.localtime())), - protocol_data_entity.id), QOS) - customLogger.info( - "Protocol data: " + PROTOCOL_DATA_PATTERN.format( - "{:.2f}".format(value), - str( - time.strftime( - TIME_FORMAT, - time.localtime())), protocol_data_entity.id)) - - if hex(msg.arbitration_id) == "0x123" and self.temp_client is not None: - self.temp_client.publish( - TEMP_TOPIC, DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), CELZIUS), QOS) - customLogger.info("Temperature: " + DATA_PATTERN.format("{:.2f}".format(value), - str(time.strftime(TIME_FORMAT, time.localtime())), - CELZIUS)) - elif hex(msg.arbitration_id) == "0x124" and self.load_client is not None: - self.load_client.publish( - LOAD_TOPIC, DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), CELZIUS), QOS) - customLogger.info( - "Load: " + DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), KG)) - elif hex(msg.arbitration_id) == "0x125" and self.fuel_client is not None: - self.fuel_client.publish( - FUEL_TOPIC, DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), CELZIUS), QOS) - customLogger.info( - "Fuel: " + DATA_PATTERN.format( - "{:.2f}".format(value), str( - time.strftime( - TIME_FORMAT, time.localtime())), _L)) + hex_string_without_prefix = hex(msg.arbitration_id)[2:] + integer_value = int(hex_string_without_prefix, 10) + # Get all protocol data based on CAN ID (ProtocolDataEntity objects) + rows = get_data_by_can_id(integer_value) + + with lock: + for protocol_data_entity in rows: + # Only OUTPUT messages are parsed, INPUT sent from cloud + if protocol_data_entity.mode == "OUTPUT": + # Extract value from relevant bits (range from start bit to start bit + num of bits) + extracted_value = extract_bits(msg.data, protocol_data_entity.start_bit, + protocol_data_entity.num_bits) + extracted_double_value = extracted_value / 10.0 + # Send message to app module via MQTT + self.protocol_client.publish( + PROTOCOL_TOPIC, PROTOCOL_DATA_PATTERN.format( + "{:.2f}".format(extracted_double_value), str( + time.strftime( + TIME_FORMAT, time.localtime())), + protocol_data_entity.id), QOS) + customLogger.info( + "Protocol data: " + PROTOCOL_DATA_PATTERN.format( + "{:.2f}".format(value), + str( + time.strftime( + TIME_FORMAT, + time.localtime())), protocol_data_entity.id)) + + +def main(): + """ + Start can app entrypoint. + Initializes can_lock to prevent race conditioning. + Initializes main_execution_flag to prevent stop the thread on app shutdown. + Starts thread which reads received can data. + """ + can_lock = threading.Lock() + main_execution_flag = Event() + BetterSignalHandler([signal.SIGINT, + signal.SIGTERM], + [main_execution_flag]) + can_thread = threading.Thread( + target=read_can, + args=( + main_execution_flag, + can_lock,)) + can_thread.start() + + +if __name__ == '__main__': + main() From 370e0aae83ab76369f5d0e6a5873e5be8fc577f0 Mon Sep 17 00:00:00 2001 From: Novica Tepic Date: Wed, 11 Sep 2024 14:54:04 +0200 Subject: [PATCH 7/7] Modified startup and shutdown script because sensor devices module is deleted, replaced it with can module --- exe_script.bat | 6 +++--- shutdown_script.bat | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/exe_script.bat b/exe_script.bat index e9175e3..1454615 100644 --- a/exe_script.bat +++ b/exe_script.bat @@ -63,9 +63,9 @@ echo Cloud App ready! cd src -echo Starting Sensors Client... -start "Sensor Dispatcher" python.exe "sensor_devices.py" -echo Sensors Started! +echo Starting CAN Client... +start "CAN Dispatcher" python.exe "can_service.py" +echo CAN Started! echo Starting IoT Gateway... start "IoT Gateway" python.exe "app.py" diff --git a/shutdown_script.bat b/shutdown_script.bat index b92c342..6f302e0 100644 --- a/shutdown_script.bat +++ b/shutdown_script.bat @@ -5,7 +5,7 @@ title Windows Shutdown Script set sensors_mosquitto_window_title=Sensors Mosquitto set gateway_mosquitto_window_title=Gateway Mosquitto -set sensor_dispatcher_window_title=Sensor Dispatcher +set can_dispatcher_window_title=CAN Dispatcher set cloud_window_title=Cloud App set gateway_window_title=IoT Gateway set rest_window_title=REST API @@ -18,8 +18,8 @@ if %errorlevel% equ 0 ( echo %sensors_mosquitto_window_title% shut down! ) taskkill /FI "WINDOWTITLE eq %gateway_mosquitto_window_title%" /F >nul 2>&1 if %errorlevel% equ 0 ( echo %gateway_mosquitto_window_title% shut down! ) -taskkill /FI "WINDOWTITLE eq %sensor_dispatcher_window_title%" /F >nul 2>&1 -if %errorlevel% equ 0 ( echo %sensor_dispatcher_window_title% shut down! ) +taskkill /FI "WINDOWTITLE eq %can_dispatcher_window_title%" /F >nul 2>&1 +if %errorlevel% equ 0 ( echo %can_dispatcher_window_title% shut down! ) taskkill /FI "WINDOWTITLE eq %cloud_window_title%" /F >nul 2>&1 if %errorlevel% equ 0 ( echo %cloud_window_title% shut down! )