Skip to content

Commit

Permalink
Added can service part
Browse files Browse the repository at this point in the history
  • Loading branch information
novicatepic committed Sep 5, 2024
1 parent 66497be commit db36cfa
Showing 1 changed file with 169 additions and 12 deletions.
181 changes: 169 additions & 12 deletions src/can_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@
MQTT topic for load data
fuel_topic: str
MQTT topic for fuel data
protocol_topic: str
MQTT topic for protocol data
data_pattern: str
Format by which data is sent to MQTT brokers
protocol_data_pattern: str
Format by which protocol data is sent to MQTT brokers
time_format: str
Format by which time is sent to MQTT brokers
celzius: str
Expand All @@ -60,10 +64,12 @@
import paho.mqtt.client as mqtt
import logging
import time
import threading
from mqtt_utils import MQTTClient
from can.listener import Listener
from can.interface import Bus
from config_util import Config
from can_protocol import get_data_by_can_id

logging.config.fileConfig('logging.conf')
infoLogger = logging.getLogger('customInfoLogger')
Expand All @@ -77,9 +83,10 @@
TEMP_TOPIC = "sensors/temperature"
LOAD_TOPIC = "sensors/arm-load"
FUEL_TOPIC = "sensors/fuel-level"

PROTOCOL_TOPIC = "sensors/protocol"

DATA_PATTERN = "[ value={} , time={} , unit={} ]"
PROTOCOL_DATA_PATTERN = "[ value={} , time={} , data_id={} ]"
TIME_FORMAT = "%d.%m.%Y %H:%M:%S"
CELZIUS = "C"
KG = "kg"
Expand Down Expand Up @@ -118,6 +125,36 @@
TEMP_ALARM_TOPIC = "alarms/temperature"
LOAD_ALARM_TOPIC = "alarms/load"
FUEL_ALARM_TOPIC = "alarms/fuel"
lock = threading.Lock()


def extract_bits(byte_array, start_bit, length):
"""
Converts byte array to bits and extract bits from start bit to start
bit plus number of bits.
Args:
----
byte_array: bytearray
The byte array containing the CAN message data.
start_bit: int
The starting bit position of the field to extract (0-indexed, LSB first).
length: int
The length of the field in bits.
Returns:
-------
int: The extracted bit field value.
"""
# Convert byte array to bits
bits = ''.join(f'{byte:08b}' for byte in byte_array)
reversed_bits = bits[::-1]
# Extract bits
extracted_bits_reversed = reversed_bits[start_bit:start_bit + length]
extracted_bits = extracted_bits_reversed[::-1]
# Convert bits to integer
integer_value = int(extracted_bits, 2)
return integer_value


def read_can(execution_flag, config_flag, init_flags, can_lock):
Expand Down Expand Up @@ -149,23 +186,22 @@ def read_can(execution_flag, config_flag, init_flags, can_lock):
# number of received messages. If the number of received messages is equal to the number of the previous check
# (the previous check 10 seconds ago), then the bus is idle, and is not transmitting any messages.
previous_message_counter = 0

bus = None
can_listener = None
initial = True
notifier = None
temp_client = None
load_client = None
fuel_client = None

bus = None
protocol_client = None

try:
while not execution_flag.is_set():
if config_flag.is_set() or initial:

config = Config(APP_CONF_FILE_PATH, errorLogger, customLogger)
config.try_open()
stop_can(notifier, bus, temp_client, load_client, fuel_client)
stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_client)

interface_value = config.can_interface
channel_value = config.can_channel
Expand All @@ -182,10 +218,10 @@ def read_can(execution_flag, config_flag, init_flags, can_lock):
bus = Bus(interface=interface_value,
channel=channel_value,
bitrate=bitrate_value)
temp_client, load_client, fuel_client = init_mqtt_clients(
temp_client, load_client, fuel_client, protocol_client = init_mqtt_clients(
bus, is_can_temp, is_can_load, is_can_fuel, config, execution_flag)
notifier = can.Notifier(bus, [], timeout=period)
can_listener = CANListener(temp_client, load_client, fuel_client)
can_listener = CANListener(temp_client, load_client, fuel_client, protocol_client)
notifier.add_listener(can_listener)
initial = False
config_flag.clear()
Expand All @@ -207,12 +243,12 @@ def read_can(execution_flag, config_flag, init_flags, can_lock):
init_flags.can_initiated = False
can_lock.release()

stop_can(notifier, bus, temp_client, load_client, fuel_client)
stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_client)
execution_flag.clear()
customLogger.debug("CAN process shutdown!")


def stop_can(notifier, bus, temp_client, load_client, fuel_client):
def stop_can(notifier, bus, temp_client, load_client, fuel_client, protocol_client):
"""
Used for stopping all CAN functionalities
Expand All @@ -228,6 +264,8 @@ def stop_can(notifier, bus, temp_client, load_client, fuel_client):
Load MQTT broker client
fuel_client: mqtt_utils.MQTTClient
Fuel MQTT broker client
protocol_client: mqtt_utils.MQTTClient
Protocol data MQTT broker client
"""
if notifier is not None:
Expand All @@ -238,6 +276,8 @@ def stop_can(notifier, bus, temp_client, load_client, fuel_client):
load_client.disconnect()
if fuel_client is not None:
fuel_client.disconnect()
if protocol_client is not None:
protocol_client.disconnect()
if bus is not None:
bus.shutdown()

Expand Down Expand Up @@ -355,7 +395,27 @@ def on_message_fuel_alarm(client, userdata, msg):
fuel_client.set_on_subscribe(on_subscribe_fuel_alarm)
fuel_client.set_on_message(on_message_fuel_alarm)
fuel_client.connect()
return temp_client, load_client, fuel_client

protocol_client = MQTTClient(
"protocol-data-can-sensor-mqtt-client",
transport_protocol=TRANSPORT_PROTOCOL,
protocol_version=mqtt.MQTTv5,
mqtt_username=config.mqtt_broker_username,
mqtt_pass=config.mqtt_broker_password,
broker_address=config.mqtt_broker_address,
broker_port=config.mqtt_broker_port,
keepalive=config.fuel_settings_interval,
infoLogger=infoLogger,
errorLogger=errorLogger,
flag=flag,
sensor_type="PROTOCOL")

protocol_client.set_on_connect(on_connect_protocol_sensor)
protocol_client.set_on_publish(on_publish)
protocol_client.set_on_subscribe(on_subscribe_protocol)
protocol_client.connect()

return temp_client, load_client, fuel_client, protocol_client


def on_publish(topic, payload, qos):
Expand Down Expand Up @@ -447,6 +507,30 @@ def on_subscribe_fuel_alarm(client, userdata, flags, rc, props):
"CAN Load alarm client failed to establish connection with MQTT broker!")


def on_subscribe_protocol(client, userdata, flags, rc, props):
"""
Event handler for published messages to a MQTT topic
Args:
----
client: paho.mqtt.client.Client
userdata:
flags:
rc:
props:
"""
if rc == 0:
infoLogger.info(
"Protocol client successfully established connection with MQTT broker!")
customLogger.debug(
"Protocol client successfully established connection with MQTT broker!")
else:
errorLogger.error(
"Protocol client failed to establish connection with MQTT broker!")
customLogger.critical(
"Protocol client failed to establish connection with MQTT broker!")


def on_connect_temp_sensor(client, userdata, flags, rc, props):
"""
Event handler for published messages to a MQTT topic
Expand Down Expand Up @@ -522,6 +606,31 @@ def on_connect_fuel_sensor(client, userdata, flags, rc, props):
"CAN Fuel sensor failed to establish connection with MQTT broker!")


def on_connect_protocol_sensor(client, userdata, flags, rc, props):
"""
Event handler for published messages to a MQTT topic
Args:
----
client: paho.mqtt.client.Client
userdata:
flags:
rc:
props:
"""
if rc == 0:
infoLogger.info(
"CAN Protocol sensor successfully established connection with MQTT broker!")
customLogger.debug(
"CAN Protocol sensor successfully established connection with MQTT broker!")
client.subscribe(PROTOCOL_TOPIC, qos=QOS)
else:
errorLogger.error(
"CAN Protocol sensor failed to establish connection with MQTT broker!")
customLogger.critical(
"CAN Protocol sensor failed to establish connection with MQTT broker!")


class CANListener (Listener):
"""
A class that accepts messages from the CAN bus.
Expand All @@ -541,7 +650,7 @@ class CANListener (Listener):
on_message_received(msg): Event handler for receiving messages from the CAN bus
"""

def __init__(self, temp_client, load_client, fuel_client):
def __init__(self, temp_client, load_client, fuel_client, protocol_client):
"""
Constructor for initializing CANListener object
Expand All @@ -550,6 +659,7 @@ def __init__(self, temp_client, load_client, fuel_client):
temp_client: MQTT temperature broker client
load_client: MQTT load broker client
fuel_client: MQTT fuel broker client
protocol_client: MQTT protocol data broker client
"""
super().__init__()
Expand All @@ -565,6 +675,10 @@ def __init__(self, temp_client, load_client, fuel_client):
fuel_client.connect()
self.fuel_client = fuel_client

if protocol_client is not None:
protocol_client.connect()
self.protocol_client = protocol_client

# counter that counts received messages
self.message_counter = 0

Expand Down Expand Up @@ -610,6 +724,20 @@ def set_fuel_client(self, client):
self.temp_client.disconnect()
self.fuel_client = client

def set_protocol_client(self, client):
"""
Setter for the protocol data MQTT broker client
Args:
----
client: MQTT protocol data broker client
"""
if client is None:
if self.protocol_client is not None:
self.protocol_client.disconnect()
self.protocol_client = client

def on_message_received(self, msg):
"""
Event handler for receiving messages from the CAN bus
Expand All @@ -626,13 +754,42 @@ def on_message_received(self, msg):
# msg.data is a byte array, need to turn it into a single value
int_value = int.from_bytes(msg.data, byteorder="big", signed=True)
value = int_value / 10.0

if self.temp_client is not None:
self.temp_client.try_reconnect()
if self.load_client is not None:
self.load_client.try_reconnect()
if self.fuel_client is not None:
self.fuel_client.try_reconnect()
if self.protocol_client is not None:
self.protocol_client.try_reconnect()
# Extract CAN ID value to search for it in the database
hex_string_without_prefix = hex(msg.arbitration_id)[2:]
integer_value = int(hex_string_without_prefix, 10)
# Get all protocol data based on CAN ID (ProtocolDataEntity objects)
rows = get_data_by_can_id(integer_value)

with lock:
for protocol_data_entity in rows:
# Only OUTPUT messages are parsed, INPUT sent from cloud
if protocol_data_entity.mode == "OUTPUT":
# Extract value from relevant bits (range from start bit to start bit + num of bits)
extracted_value = extract_bits(msg.data, protocol_data_entity.start_bit,
protocol_data_entity.num_bits)
extracted_double_value = extracted_value / 10.0
# Send message to app module via MQTT
self.protocol_client.publish(
PROTOCOL_TOPIC, PROTOCOL_DATA_PATTERN.format(
"{:.2f}".format(extracted_double_value), str(
time.strftime(
TIME_FORMAT, time.localtime())),
protocol_data_entity.id), QOS)
customLogger.info(
"Protocol data: " + PROTOCOL_DATA_PATTERN.format(
"{:.2f}".format(value),
str(
time.strftime(
TIME_FORMAT,
time.localtime())), protocol_data_entity.id))

if hex(msg.arbitration_id) == "0x123" and self.temp_client is not None:
self.temp_client.publish(
Expand Down

0 comments on commit db36cfa

Please sign in to comment.