Skip to content

Commit

Permalink
Added operations for input values and thread stoppage if user removed…
Browse files Browse the repository at this point in the history
… protocol from the device
  • Loading branch information
novicatepic committed Sep 10, 2024
1 parent 0dda4ff commit 20bb172
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
20 changes: 18 additions & 2 deletions src/can_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Contains protocol data id which are processed as a key, values are thread and whether the thread is active or not
"""
import time
import json
import threading
import logging.config
from src.config_util import Config, CONF_PATH
Expand Down Expand Up @@ -124,14 +125,27 @@ def remove_protocols(protocol_ids):
ProtocolDataEntity.protocol.in_(protocol_ids)
).all()

# If user removed protocol from the device
# Input threads need to be stopped
protocol_data_input_ids_to_remove = []

# If user removed protocol in cloud configuration, stop the thread
for data_entity in protocol_data_entities:
if data_entity.id in processed_ids:
processed_ids[data_entity.id]["stopped"] = True
if data_entity.mode == "INPUT":
protocol_data_input_ids_to_remove.append(data_entity.id)

# Delete protocols
session.query(ProtocolEntity).filter(ProtocolEntity.id.in_(protocol_ids)).delete(synchronize_session=False)
session.commit()

# Send message to CAN module so ids are removed
if len(protocol_data_input_ids_to_remove) > 0:
message = {"type": "can_message", "action": "remove",
"protocol_data_ids": protocol_data_input_ids_to_remove}
message_payload = json.dumps(message)
publish_protocol_message_to_can_module(message_payload)
except Exception:
# Rollback the transaction in case of error
session.rollback()
Expand Down Expand Up @@ -293,7 +307,7 @@ def get_data_by_id(id):
return result


def start_protocol_startup_client(config):
def send_update_protocol_request():
"""
Start Protocol MQTT startup publisher client which sends MQTT request to the cloud to get updated protocol data.
Expand All @@ -302,6 +316,8 @@ def start_protocol_startup_client(config):
config : Config
Enables reading parameters from config file.
"""
config = Config(CONF_PATH, errorLogger, customLogger)
config.try_open()
client = mqtt_util.gcb_init_publisher("startup-protocol-client-id",
config.gateway_cloud_broker_iot_username,
config.gateway_cloud_broker_iot_password)
Expand Down Expand Up @@ -329,7 +345,7 @@ def start_protocol_mqtt(main_execution_flag):
config.try_open()
# Start threads for MQTT clients.
thread1 = threading.Thread(target=start_protocol_client, args=(config, main_execution_flag, ))
thread2 = threading.Thread(target=start_protocol_startup_client, args=(config, ))
thread2 = threading.Thread(target=send_update_protocol_request, args=())
thread1.start()
thread2.start()
thread2.join()
Expand Down
19 changes: 14 additions & 5 deletions src/can_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,22 @@ def parse_input_protocol_data(flag, value, protocol_data_from_db, bus):

interval = protocol_data_from_db.transmit_interval
hex_string = '0x' + str(protocol_data_from_db.can_id)
# Create byte array from double value which will be sent to the device
byte_array = list(struct.pack('f', value))

msg = can.Message(
arbitration_id=int(hex_string, 16), data=byte_array, is_extended_id=False, is_remote_frame=False
value += protocol_data_from_db.offset_value
if protocol_data_from_db.divisor > 0:
value /= protocol_data_from_db.divisor
if protocol_data_from_db.multiplier > 0:
value *= protocol_data_from_db.multiplier
byte_array = struct.pack('d', value)
extracted_value = extract_bits(byte_array, protocol_data_from_db.start_bit,
protocol_data_from_db.num_bits)
extracted_double_value = extracted_value / 10.0
extracted_value_byte_array = struct.pack('d', extracted_double_value)
can_message = can.Message(
arbitration_id=int(hex_string, 16), data=extracted_value_byte_array, is_extended_id=False,
is_remote_frame=False
)
task = bus.send_periodic(msg, interval)
task = bus.send_periodic(can_message, interval)

# While application is still running or thread is not stopped
while not flag.is_set() and processed_ids[protocol_data_from_db.id]["stopped"] is False:
Expand Down
2 changes: 2 additions & 0 deletions src/mqtt_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ def gcb_on_message(client, userdata, message):
elif type == "startup_fetching":
protocols = data["protocols"]
can_protocol.update_protocols_on_startup(protocols)
elif type == "sync":
can_protocol.send_update_protocol_request()
elif type == "can_message":
str_payload = message.payload.decode('utf-8')
can_protocol.publish_protocol_message_to_can_module(str_payload)
Expand Down

0 comments on commit 20bb172

Please sign in to comment.