#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Mar 22 11:33:14 2021 @author: angoosh """ from serial.tools import list_ports import serial import gi from time import sleep import threading import os gi.require_version("Gtk", "3.0") gi.require_version('GLib', '2.0') from gi.repository import Gtk, GLib, Gdk HANDSHAKE_MSG = b'Hello\n' VAR_DESIGNATOR = "V" REG_DESIGNATOR = "R" CSV_FILE = "MONITOR_DATA.csv" ser = None baud = 19200 readPortEnable = False readCsvEnable = False rows = [] csvContent = [] csvContentOverArr = "" testCsvFile = "test.csv" csvGenCsvContent = "" firsCsvRead = True csvIsWrite = False csvIsRead = False def CsvGen(dataForCsv):#vyroba csv pro tabulku promennych, nutno dodat dekodovana data global csvGenCsvContent, csvIsRead, csvIsWrite, csvContentOverArr newCsvGenContent = [] Passed = False dataForCsv = csvContentOverArr + dataForCsv rawData = dataForCsv.split('\n') csvGenLine = csvGenCsvContent.split('\n') csvContentOverArr = rawData[-1] rawData.remove(rawData[-1]) rawDataLen = len(rawData) for line in rawData: splitted = line.split(',') try: #csvGenLine = csvGenCsvContent.split('\n') VarPos = [i for i, s in enumerate(csvGenLine) if ","+splitted[2]+"," in s] if VarPos != []: csvGenLineParts = csvGenLine[VarPos[0]].split(",") csvGenLineParts[3] = splitted[3] string = "" for j in csvGenLineParts: string += j+',' string = string[:-1] csvGenLine[VarPos[0]] = string else: csvGenLine.append(line) csvGenCsvContent = "" except: pass for l in csvGenLine: if l != "": csvGenCsvContent += l csvGenCsvContent += '\n' else: pass while csvIsRead == True: pass csvIsWrite = True with open(CSV_FILE,'w') as dataFile: dataFile.write(csvGenCsvContent) dataFile.close() csvIsWrite = False #Funkce bezici na pozadi def readPort():#Cte data ze serioveho portu global readPortEnable print("MSG: READ_PORT_START") file = open("PORT_DATA.log","w") file.write("AQ_START\n") file.close() while readPortEnable == True: with open("PORT_DATA.log","a") as file: s = ser.read(10000) data = s.decode() CsvGen(data) file.write(data) file.close() #sleep(1) print("MSG: READ_PORT_EXIT") def readCsv():#Cte csv vygenerovane CsvGen() global csvContent, testCsvFile, csvIsRead, csvIsWrite #while readCsvEnable == True: while csvIsWrite == True: pass csvIsRead = True with open(CSV_FILE,"r") as file: content = file.read() lines = content.split("\n") csvContent = lines file.close() csvIsRead = False MainGUI.ApplyCsv() if readCsvEnable == True : return True else: print("MSG: READ_CSV_EXIT") return False def get_resource_path(rel_path): dir_of_py_file = os.path.dirname(__file__) rel_path_to_resource = os.path.join(dir_of_py_file, rel_path) abs_path_to_resource = os.path.abspath(rel_path_to_resource) return abs_path_to_resource class MainGUI(Gtk.Window): def __init__(self): Gtk.Window.__init__(self, title="Monitor") self.set_icon_from_file(get_resource_path("Icons/SerialPort.png")) self.set_resizable(False) #self.set_border_width(10) self.tid = None self.portList = Gtk.ListStore(str) self.baudList = Gtk.ListStore(str) vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) self.add(vbox) box = Gtk.Box(spacing=6) vbox.pack_start(box, False, False, 0) self.port_combo = Gtk.ComboBox.new_with_model_and_entry(self.portList) self.port_combo.connect("changed", self.on_name_combo_changed) self.port_combo.set_entry_text_column(0) self.refresh("ignore") try: self.port_combo.set_active(0) except: pass box.pack_start(self.port_combo, False, False, 0) self.baud_combo = Gtk.ComboBox.new_with_model_and_entry(self.baudList) self.baud_combo.connect("changed", self.baud_change) self.baud_combo.set_entry_text_column(0) bauds = ["9600", "19200", "38400", "57600", "115200", "250000", "500000", "1000000" ] for baud in bauds: self.baudList.append([baud]) self.baud_combo.set_active(1) box.pack_start(self.baud_combo, False, False, 0) button1 = Gtk.Button() button1.connect("clicked", self.refresh) button1.set_image(Gtk.Image.new_from_icon_name("view-refresh",1)) button1.set_size_request(20,20) button1.set_halign(Gtk.Align.END) box.pack_start(button1, True, False, 0) button2 = Gtk.Button(label="Connect") button2.set_image(Gtk.Image.new_from_icon_name("Connect",1)) button2.connect("clicked", self.connect_disconnect) box.pack_start(button2, True, True, 0) tabs = Gtk.Notebook() vbox.pack_start(tabs, False, False, 0) varScroll = Gtk.ScrolledWindow() varScroll.set_min_content_height(600) self.variable_tab = Gtk.Grid() varScroll.add_with_viewport(self.variable_tab) tabs.append_page(varScroll, Gtk.Label("Variables"))#self.variable_tab regScroll = Gtk.ScrolledWindow() regScroll.set_min_content_height(600) self.register_tab = Gtk.Grid() regScroll.add_with_viewport(self.register_tab) tabs.append_page(regScroll, Gtk.Label("Registers")) #self.addRow("ignore") def refresh(self, widget): CsvGen("none") ports = list_ports.comports() self.portList.clear() for port in ports: self.portList.append([port[0]]) try: self.port_combo.set_active(0) except: pass def baud_change(self, combo): global baud tree_iter = combo.get_active_iter() if tree_iter is not None: model = combo.get_model() baud = model[tree_iter][0] print("Selected Baud: %s" % (baud)) else: entry = combo.get_child() print("Entered Baud: %s" % entry.get_text()) def connect_disconnect(self, butt): global ser, baud, readPortEnable, HANDSHAKE_MSG, readCsvEnable, firsCsvRead if butt.get_label() == "Connect": try: for i in range(0,len(rows)): self.variable_tab.remove_row(0) self.register_tab.remove_row(0) except: pass firsCsvRead = True self.port_combo.set_sensitive(False) self.baud_combo.set_sensitive(False) tree_iter = self.port_combo.get_active_iter() model = self.port_combo.get_model() port = model[tree_iter][0] ser = serial.Serial(port, baud, timeout=1) print("Connecting to:"+ port) butt.set_label("Disconnect") ser.write(b'Hello\n') try: os.remove(CSV_FILE) except: print("MSG: NO MONITOR FILE TO REMOVE") open(CSV_FILE, 'w').close() # while(ser.readline() != HANDSHAKE_MSG): # sleep(0.5) readPortEnable = True Port_thread = threading.Thread(target=readPort) Port_thread.start() readCsvEnable = True GLib.timeout_add_seconds(1, readCsv) print("MSG: READ_CSV_START") #Csv_thread = threading.Thread(target=readCsv) #Csv_thread.start() self.tid = GLib.timeout_add_seconds(1, self.addRow, None) else: ser.close() self.port_combo.set_sensitive(True) self.baud_combo.set_sensitive(True) readPortEnable = False readCsvEnable = False for row in rows: row[3].set_editable(False) print("Disconnecting") butt.set_label("Connect") def on_name_combo_changed(self, combo): tree_iter = combo.get_active_iter() if tree_iter is not None: model = combo.get_model() port = model[tree_iter][0] print("Selected Port: %s" % (port)) else: entry = combo.get_child() print("Entered: %s" % entry.get_text()) def addRow(self, widget): global rows, csvContent, testCsvFile, firsCsvRead iteration = 0 csvContentIsNone = True with open(CSV_FILE,"r") as file: content = file.read() lines = content.split("\n") file.close() if firsCsvRead == False: for line in lines: x = line.split(',') y = csvContent[iteration].split(',') if csvContent[iteration] == "": y = [0,0] if line == "": #print("BLANK_LINE!") iteration += 1 pass else: if x[1] == y[1]: #print("DUPE") iteration += 1 else: print("EDIT") try: csvContent.append(line) inLine = line.split(",") if inLine[0] == "V": varType = inLine[1] varName = inLine[2] value = inLine[3] address = inLine[4] typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START) f = Gtk.Frame() f.add(typeLabel) self.variable_tab.attach(f,0,iteration,1,1) nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END) f = Gtk.Frame() f.add(nameLabel) self.variable_tab.attach(f,1,iteration,1,1) nentry = Gtk.Entry() nentry.set_hexpand(False) nentry.set_text(value) nentry.set_editable(False) self.variable_tab.attach(nentry,2,iteration,1,1) edit = Gtk.Entry() edit.set_hexpand(False) edit.set_placeholder_text("Edit") edit.set_max_length(16) edit.connect("activate", self.SendData) self.variable_tab.attach(edit,3,iteration,1,1) self.variable_tab.show_all() rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address] rows.append(rowContent) elif inLine[0] == "R": varType = inLine[1] varName = inLine[2] value = inLine[3] value = int(value) value = str(hex(value)) address = inLine[4] typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START) f = Gtk.Frame() f.add(typeLabel) self.register_tab.attach(f,0,iteration,1,1) nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END) f = Gtk.Frame() f.add(nameLabel) self.register_tab.attach(f,1,iteration,1,1) nentry = Gtk.Entry() nentry.set_hexpand(False) nentry.set_text(value) nentry.set_editable(False) self.register_tab.attach(nentry,2,iteration,1,1) edit = Gtk.Entry() edit.set_hexpand(False) edit.set_placeholder_text("Edit") edit.set_max_length(10) edit.connect("activate", self.SendData) self.register_tab.attach(edit,3,iteration,1,1) self.register_tab.show_all() rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address] rows.append(rowContent) else: print("VAR_CLASS_ERROR!") iteration += 1 except: pass else: csvContent = [] rows = [] print(lines) for line in lines: if line == "": print("BLANK_LINE!") iteration += 1 pass else: try: csvContent.append(line) inLine = line.split(",") if inLine[0] == "V": varType = inLine[1] varName = inLine[2] value = inLine[3] address = inLine[4] typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START) f = Gtk.Frame() f.add(typeLabel) self.variable_tab.attach(f,0,iteration,1,1) nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END) f = Gtk.Frame() f.add(nameLabel) self.variable_tab.attach(f,1,iteration,1,1) nentry = Gtk.Entry() nentry.set_hexpand(False) nentry.set_text(value) nentry.set_editable(False) self.variable_tab.attach(nentry,2,iteration,1,1) edit = Gtk.Entry() edit.set_hexpand(False) edit.set_placeholder_text("Edit") edit.set_max_length(16) edit.connect("activate", self.SendData) self.variable_tab.attach(edit,3,iteration,1,1) self.variable_tab.show_all() rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address] rows.append(rowContent) elif inLine[0] == "R": varType = inLine[1] varName = inLine[2] value = inLine[3] value = int(value) value = str(hex(value)) address = inLine[4] typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START) f = Gtk.Frame() f.add(typeLabel) self.register_tab.attach(f,0,iteration,1,1) nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END) f = Gtk.Frame() f.add(nameLabel) self.register_tab.attach(f,1,iteration,1,1) nentry = Gtk.Entry() nentry.set_hexpand(False) nentry.set_text(value) nentry.set_editable(False) self.register_tab.attach(nentry,2,iteration,1,1) edit = Gtk.Entry() edit.set_hexpand(False) edit.set_placeholder_text("Edit") edit.set_max_length(10) edit.connect("activate", self.SendData) self.register_tab.attach(edit,3,iteration,1,1) self.register_tab.show_all() rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address] rows.append(rowContent) else: print("VAR_CLASS_ERROR!") iteration += 1 except: pass #GLib.idle_add(self.Void) firsCsvRead = False return readCsvEnable def ApplyCsv(): global csvContent, rows values = [] isReg = False iteration = 0 for content in csvContent: try: bits = content.split(",") if bits[0] == 'V': values.append(bits[3]) elif bits[0] == 'R': isReg = True x = int(bits[3]) x = str(hex(x)) values.append(x) except: pass for row in rows: row[3].set_text(values[iteration]) iteration += 1 def SendData(self, entry): global ser serMsg = "A" value = entry.get_text() numValue = 0 print("MSG: ENTERED="+value) for row in rows: if row[4] == entry: if row[1].get_text() == "uint8_t": serMsg += '0' numValue = int(value) numValue = numValue % 256 value = str(numValue) elif row[1].get_text() == "int8_t": serMsg += '1' numValue = int(value) numValue = numValue % 256 if numValue > 127: numValue -= 256 value = str(numValue) elif row[1].get_text() == "uint16_t": serMsg += '2' numValue = int(value) numValue = numValue % 65536 value = str(numValue) elif row[1].get_text() == "int16_t": serMsg += '3' numValue = int(value) numValue = numValue % 65536 if numValue > (65536/2)-1: numValue -= 65536 value = str(numValue) elif row[1].get_text() == "uint32_t": serMsg += '4' numValue = int(value) numValue = numValue % 4294967296 value = str(numValue) elif row[1].get_text() == "int32_t": serMsg += '5' numValue = int(value) numValue = numValue % 4294967296 if numValue > (4294967296/2)-1: numValue -= 4294967296 value = str(numValue) elif row[1].get_text() == "float": serMsg += '6' elif row[1].get_text() == "reg": serMsg += '7' numValue = int(value) numValue = numValue % 4294967296 value = str(numValue) serMsg += row[5] serMsg += value serMsg += "\n" print("MSG: SERIAL WRITE: "+serMsg) ser.write(serMsg.encode()) def Void(self): print("Void") return True win = MainGUI() win.connect("destroy", Gtk.main_quit) win.show_all() Gtk.main()