from serial import Serial from threading import Thread, Lock from io import BufferedReader from time import sleep from pyubx2 import ( POLL_LAYER_BBR, POLL_LAYER_FLASH, SET, SET_LAYER_BBR, SET_LAYER_FLASH, SET_LAYER_RAM, TXN_NONE, U1, UBXMessage, UBXReader, bytes2val, val2bytes, ) CFG = b"\x06" VALGET = b"\x8b" VALSET = b"\x8a" # initialise global variables reading = False line_count = 0 send_count = 0 ACK_count = 0 NAK_count = 0 unknow_count = 0 def read_messages(stream, lock, ubxreader): """ Reads, parses and prints out incoming UBX messages """ # pylint: disable=unused-variable, broad-except global ACK_count, NAK_count, unknow_count, reading while reading: if stream.in_waiting: try: lock.acquire() print("Sémaphore pris (read)") (raw_data, parsed_data) = ubxreader.read() print(parsed_data) print("Relachement du sémaphore (read)") lock.release() if parsed_data: if (parsed_data.identity == "ACK-ACK"): ACK_count = ACK_count + 1 elif (parsed_data.identity == "ACK-NAK"): NAK_count = NAK_count + 1 else: unknow_count = unknow_count + 1 else: unknow_count = unknow_count + 1 except Exception as err: print(f"\n\nSomething went wrong {err}\n\n") continue def start_thread(stream, lock, ubxreader): """ Start read thread """ thr = Thread(target=read_messages, args=(stream, lock, ubxreader), daemon=True) thr.start() return thr def send_message(stream, lock, message): """ Send message to device """ global send_count lock.acquire() print("Sémaphore pris (send)") stream.write(message.serialize()) print("Message envoyé : ", message) send_count = send_count + 1 print("Relachement du sémaphore (send)") lock.release() def load_txt(fname: str, serial: Serial) -> int: global line_count, send_count, ACK_count, NAK_count, unknow_count, reading # create UBXReader instance, reading only UBX messages ubr = UBXReader(BufferedReader(serial), protfilter=2) print("\nStarting read thread...\n") reading = True serial_lock = Lock() read_thread = start_thread(serial, serial_lock, ubr) try: with open(fname, "r", encoding="utf-8") as file: for line in file: line_count = line_count + 1 print(f"Lecture de la ligne {line_count} du fichier") # Isolate each part of a line in the configuration file parts = line.replace("\n", "").replace(" ", "").split("-") data = bytes.fromhex(parts[-1]) msg_class = data[0:1] print("Classe de message trouvé : ", msg_class) msg_id = data[1:2] print("ID de message trouvé : ", msg_id) if(msg_class != CFG): print(data) #print(UBXReader.parse(data), msgmode=4) continue if(msg_id == VALGET): version = b"\x00" # Not transactionnnal layers = val2bytes(SET_LAYER_FLASH, U1) # Applied to ram memory action = b"\x00" # Type of transaction action (I don't understant why I have to specified it even for a non transactionnal VALSET) reserved0 = b"\x00" cfgdata = data[8:] # Diviser cfgdata en segments de 40 couples de clés-valeurs ou moins segment_size = 10 * 4 # Taille approximative d'un couple de clé-valeur est de 4 octets num_segments = (len(cfgdata) + segment_size - 1) // segment_size for i in range(num_segments): start = i * segment_size end = start + segment_size segment = cfgdata[start:end] # Créer un message UBX pour le segment payload = version + layers + action + reserved0 + segment msg = UBXMessage(CFG, VALSET, SET, payload=payload) # Envoyer le message send_message(serial, serial_lock, msg) send_count += 1 else: # In case of another type of message msg = UBXMessage(CFG, msg_id, SET, payload=data[4:]) if msg is not None: if serial is not None: # Sending a UBX command VIA serial port send_message(serial, serial_lock, msg) sleep(1) print("\nStopping reader thread...\n") reading = False read_thread.join() print("Toutes les lignes du fichier ont été traité") print(f"{line_count} lignes traitées dans le fichier de configuration") print(f"{send_count} commandes envoyées à la carte") print(f"{ACK_count} commandes acceptées par la carte") print(f"{NAK_count} commandes refusées par la carte") print(f"{unknow_count} réponses inconnues reçues de la carte") return line_count except Exception as e: print(f"Erreur : {e}") return 0 def main(**kwargs): infile = "test_conf.txt" port = "/dev/ttyACM0" baud = 9600 with Serial(port, baud, timeout=5) as serial: load_txt(infile, serial) if __name__ == "__main__": main()