MCU_Monitor/main.py
2021-07-01 14:09:39 +02:00

572 lines
21 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 22 11:33:14 2021
@author: angoosh
"""
from serial.tools import list_ports
import serial
import gi
from time import sleep
import threading
import os
gi.require_version("Gtk", "3.0")
gi.require_version('GLib', '2.0')
from gi.repository import Gtk, GLib, Gdk
HANDSHAKE_MSG = b'Hello\n'
VAR_DESIGNATOR = "V"
REG_DESIGNATOR = "R"
CSV_FILE = "MONITOR_DATA.csv"
ser = None
baud = 19200
readPortEnable = False
readCsvEnable = False
rows = []
csvContent = []
csvContentOverArr = ""
testCsvFile = "test.csv"
csvGenCsvContent = ""
firsCsvRead = True
csvIsWrite = False
csvIsRead = False
def CsvGen(dataForCsv):#vyroba csv pro tabulku promennych, nutno dodat dekodovana data
global csvGenCsvContent, csvIsRead, csvIsWrite, csvContentOverArr
newCsvGenContent = []
Passed = False
dataForCsv = csvContentOverArr + dataForCsv
rawData = dataForCsv.split('\n')
csvGenLine = csvGenCsvContent.split('\n')
csvContentOverArr = rawData[-1]
rawData.remove(rawData[-1])
rawDataLen = len(rawData)
for line in rawData:
splitted = line.split(',')
try:
#csvGenLine = csvGenCsvContent.split('\n')
VarPos = [i for i, s in enumerate(csvGenLine) if ","+splitted[2]+"," in s]
if VarPos != []:
csvGenLineParts = csvGenLine[VarPos[0]].split(",")
csvGenLineParts[3] = splitted[3]
string = ""
for j in csvGenLineParts:
string += j+','
string = string[:-1]
csvGenLine[VarPos[0]] = string
else:
csvGenLine.append(line)
csvGenCsvContent = ""
except:
pass
for l in csvGenLine:
if l != "":
csvGenCsvContent += l
csvGenCsvContent += '\n'
else:
pass
while csvIsRead == True:
pass
csvIsWrite = True
with open(CSV_FILE,'w') as dataFile:
dataFile.write(csvGenCsvContent)
dataFile.close()
csvIsWrite = False
#Funkce bezici na pozadi
def readPort():#Cte data ze serioveho portu
global readPortEnable
print("MSG: READ_PORT_START")
file = open("PORT_DATA.log","w")
file.write("AQ_START\n")
file.close()
while readPortEnable == True:
with open("PORT_DATA.log","a") as file:
s = ser.read(10000)
data = s.decode()
CsvGen(data)
file.write(data)
file.close()
#sleep(1)
print("MSG: READ_PORT_EXIT")
def readCsv():#Cte csv vygenerovane CsvGen()
global csvContent, testCsvFile, csvIsRead, csvIsWrite
#while readCsvEnable == True:
while csvIsWrite == True:
pass
csvIsRead = True
with open(CSV_FILE,"r") as file:
content = file.read()
lines = content.split("\n")
csvContent = lines
file.close()
csvIsRead = False
MainGUI.ApplyCsv()
if readCsvEnable == True :
return True
else:
print("MSG: READ_CSV_EXIT")
return False
def get_resource_path(rel_path):
dir_of_py_file = os.path.dirname(__file__)
rel_path_to_resource = os.path.join(dir_of_py_file, rel_path)
abs_path_to_resource = os.path.abspath(rel_path_to_resource)
return abs_path_to_resource
class MainGUI(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self, title="Monitor")
self.set_icon_from_file(get_resource_path("Icons/SerialPort.png"))
self.set_resizable(False)
#self.set_border_width(10)
self.tid = None
self.portList = Gtk.ListStore(str)
self.baudList = Gtk.ListStore(str)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.add(vbox)
box = Gtk.Box(spacing=6)
vbox.pack_start(box, False, False, 0)
self.port_combo = Gtk.ComboBox.new_with_model_and_entry(self.portList)
self.port_combo.connect("changed", self.on_name_combo_changed)
self.port_combo.set_entry_text_column(0)
self.refresh("ignore")
try:
self.port_combo.set_active(0)
except:
pass
box.pack_start(self.port_combo, False, False, 0)
self.baud_combo = Gtk.ComboBox.new_with_model_and_entry(self.baudList)
self.baud_combo.connect("changed", self.baud_change)
self.baud_combo.set_entry_text_column(0)
bauds = ["9600",
"19200",
"38400",
"57600",
"115200",
"250000",
"500000",
"1000000"
]
for baud in bauds:
self.baudList.append([baud])
self.baud_combo.set_active(1)
box.pack_start(self.baud_combo, False, False, 0)
button1 = Gtk.Button()
button1.connect("clicked", self.refresh)
button1.set_image(Gtk.Image.new_from_icon_name("view-refresh",1))
button1.set_size_request(20,20)
button1.set_halign(Gtk.Align.END)
box.pack_start(button1, True, False, 0)
button2 = Gtk.Button(label="Connect")
button2.set_image(Gtk.Image.new_from_icon_name("Connect",1))
button2.connect("clicked", self.connect_disconnect)
box.pack_start(button2, True, True, 0)
tabs = Gtk.Notebook()
vbox.pack_start(tabs, False, False, 0)
varScroll = Gtk.ScrolledWindow()
varScroll.set_min_content_height(600)
self.variable_tab = Gtk.Grid()
varScroll.add_with_viewport(self.variable_tab)
tabs.append_page(varScroll, Gtk.Label("Variables"))#self.variable_tab
regScroll = Gtk.ScrolledWindow()
regScroll.set_min_content_height(600)
self.register_tab = Gtk.Grid()
regScroll.add_with_viewport(self.register_tab)
tabs.append_page(regScroll, Gtk.Label("Registers"))
#self.addRow("ignore")
def refresh(self, widget):
CsvGen("none")
ports = list_ports.comports()
self.portList.clear()
for port in ports:
self.portList.append([port[0]])
try:
self.port_combo.set_active(0)
except:
pass
def baud_change(self, combo):
global baud
tree_iter = combo.get_active_iter()
if tree_iter is not None:
model = combo.get_model()
baud = model[tree_iter][0]
print("Selected Baud: %s" % (baud))
else:
entry = combo.get_child()
print("Entered Baud: %s" % entry.get_text())
def connect_disconnect(self, butt):
global ser, baud, readPortEnable, HANDSHAKE_MSG, readCsvEnable, firsCsvRead
if butt.get_label() == "Connect":
try:
for i in range(0,len(rows)):
self.variable_tab.remove_row(0)
self.register_tab.remove_row(0)
except:
pass
firsCsvRead = True
self.port_combo.set_sensitive(False)
self.baud_combo.set_sensitive(False)
tree_iter = self.port_combo.get_active_iter()
model = self.port_combo.get_model()
port = model[tree_iter][0]
ser = serial.Serial(port, baud, timeout=1)
print("Connecting to:"+ port)
butt.set_label("Disconnect")
ser.write(b'Hello\n')
try:
os.remove(CSV_FILE)
except:
print("MSG: NO MONITOR FILE TO REMOVE")
open(CSV_FILE, 'w').close()
# while(ser.readline() != HANDSHAKE_MSG):
# sleep(0.5)
readPortEnable = True
Port_thread = threading.Thread(target=readPort)
Port_thread.start()
readCsvEnable = True
GLib.timeout_add_seconds(1, readCsv)
print("MSG: READ_CSV_START")
#Csv_thread = threading.Thread(target=readCsv)
#Csv_thread.start()
self.tid = GLib.timeout_add_seconds(1, self.addRow, None)
else:
ser.close()
self.port_combo.set_sensitive(True)
self.baud_combo.set_sensitive(True)
readPortEnable = False
readCsvEnable = False
for row in rows:
row[3].set_editable(False)
print("Disconnecting")
butt.set_label("Connect")
def on_name_combo_changed(self, combo):
tree_iter = combo.get_active_iter()
if tree_iter is not None:
model = combo.get_model()
port = model[tree_iter][0]
print("Selected Port: %s" % (port))
else:
entry = combo.get_child()
print("Entered: %s" % entry.get_text())
def addRow(self, widget):
global rows, csvContent, testCsvFile, firsCsvRead
iteration = 0
csvContentIsNone = True
with open(CSV_FILE,"r") as file:
content = file.read()
lines = content.split("\n")
file.close()
if firsCsvRead == False:
for line in lines:
x = line.split(',')
y = csvContent[iteration].split(',')
if csvContent[iteration] == "":
y = [0,0]
if line == "":
#print("BLANK_LINE!")
iteration += 1
pass
else:
if x[1] == y[1]:
#print("DUPE")
iteration += 1
else:
print("EDIT")
try:
csvContent.append(line)
inLine = line.split(",")
if inLine[0] == "V":
varType = inLine[1]
varName = inLine[2]
value = inLine[3]
address = inLine[4]
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
f = Gtk.Frame()
f.add(typeLabel)
self.variable_tab.attach(f,0,iteration,1,1)
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
f = Gtk.Frame()
f.add(nameLabel)
self.variable_tab.attach(f,1,iteration,1,1)
nentry = Gtk.Entry()
nentry.set_hexpand(False)
nentry.set_text(value)
nentry.set_editable(False)
self.variable_tab.attach(nentry,2,iteration,1,1)
edit = Gtk.Entry()
edit.set_hexpand(False)
edit.set_placeholder_text("Edit")
edit.set_max_length(16)
edit.connect("activate", self.SendData)
self.variable_tab.attach(edit,3,iteration,1,1)
self.variable_tab.show_all()
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
rows.append(rowContent)
elif inLine[0] == "R":
varType = inLine[1]
varName = inLine[2]
value = inLine[3]
value = int(value)
value = str(hex(value))
address = inLine[4]
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
f = Gtk.Frame()
f.add(typeLabel)
self.register_tab.attach(f,0,iteration,1,1)
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
f = Gtk.Frame()
f.add(nameLabel)
self.register_tab.attach(f,1,iteration,1,1)
nentry = Gtk.Entry()
nentry.set_hexpand(False)
nentry.set_text(value)
nentry.set_editable(False)
self.register_tab.attach(nentry,2,iteration,1,1)
edit = Gtk.Entry()
edit.set_hexpand(False)
edit.set_placeholder_text("Edit")
edit.set_max_length(10)
edit.connect("activate", self.SendData)
self.register_tab.attach(edit,3,iteration,1,1)
self.register_tab.show_all()
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
rows.append(rowContent)
else:
print("VAR_CLASS_ERROR!")
iteration += 1
except:
pass
else:
csvContent = []
rows = []
print(lines)
for line in lines:
if line == "":
print("BLANK_LINE!")
iteration += 1
pass
else:
try:
csvContent.append(line)
inLine = line.split(",")
if inLine[0] == "V":
varType = inLine[1]
varName = inLine[2]
value = inLine[3]
address = inLine[4]
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
f = Gtk.Frame()
f.add(typeLabel)
self.variable_tab.attach(f,0,iteration,1,1)
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
f = Gtk.Frame()
f.add(nameLabel)
self.variable_tab.attach(f,1,iteration,1,1)
nentry = Gtk.Entry()
nentry.set_hexpand(False)
nentry.set_text(value)
nentry.set_editable(False)
self.variable_tab.attach(nentry,2,iteration,1,1)
edit = Gtk.Entry()
edit.set_hexpand(False)
edit.set_placeholder_text("Edit")
edit.set_max_length(16)
edit.connect("activate", self.SendData)
self.variable_tab.attach(edit,3,iteration,1,1)
self.variable_tab.show_all()
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
rows.append(rowContent)
elif inLine[0] == "R":
varType = inLine[1]
varName = inLine[2]
value = inLine[3]
value = int(value)
value = str(hex(value))
address = inLine[4]
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
f = Gtk.Frame()
f.add(typeLabel)
self.register_tab.attach(f,0,iteration,1,1)
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
f = Gtk.Frame()
f.add(nameLabel)
self.register_tab.attach(f,1,iteration,1,1)
nentry = Gtk.Entry()
nentry.set_hexpand(False)
nentry.set_text(value)
nentry.set_editable(False)
self.register_tab.attach(nentry,2,iteration,1,1)
edit = Gtk.Entry()
edit.set_hexpand(False)
edit.set_placeholder_text("Edit")
edit.set_max_length(10)
edit.connect("activate", self.SendData)
self.register_tab.attach(edit,3,iteration,1,1)
self.register_tab.show_all()
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
rows.append(rowContent)
else:
print("VAR_CLASS_ERROR!")
iteration += 1
except:
pass
#GLib.idle_add(self.Void)
firsCsvRead = False
return readCsvEnable
def ApplyCsv():
global csvContent, rows
values = []
isReg = False
iteration = 0
for content in csvContent:
try:
bits = content.split(",")
if bits[0] == 'V':
values.append(bits[3])
elif bits[0] == 'R':
isReg = True
x = int(bits[3])
x = str(hex(x))
values.append(x)
except:
pass
for row in rows:
row[3].set_text(values[iteration])
iteration += 1
def SendData(self, entry):
global ser
serMsg = "A"
value = entry.get_text()
numValue = 0
print("MSG: ENTERED="+value)
for row in rows:
if row[4] == entry:
if row[1].get_text() == "uint8_t":
serMsg += '0'
numValue = int(value)
numValue = numValue % 256
value = str(numValue)
elif row[1].get_text() == "int8_t":
serMsg += '1'
numValue = int(value)
numValue = numValue % 256
if numValue > 127:
numValue -= 256
value = str(numValue)
elif row[1].get_text() == "uint16_t":
serMsg += '2'
numValue = int(value)
numValue = numValue % 65536
value = str(numValue)
elif row[1].get_text() == "int16_t":
serMsg += '3'
numValue = int(value)
numValue = numValue % 65536
if numValue > (65536/2)-1:
numValue -= 65536
value = str(numValue)
elif row[1].get_text() == "uint32_t":
serMsg += '4'
numValue = int(value)
numValue = numValue % 4294967296
value = str(numValue)
elif row[1].get_text() == "int32_t":
serMsg += '5'
numValue = int(value)
numValue = numValue % 4294967296
if numValue > (4294967296/2)-1:
numValue -= 4294967296
value = str(numValue)
elif row[1].get_text() == "float":
serMsg += '6'
elif row[1].get_text() == "reg":
serMsg += '7'
numValue = int(value)
numValue = numValue % 4294967296
value = str(numValue)
serMsg += row[5]
serMsg += value
serMsg += "\n"
print("MSG: SERIAL WRITE: "+serMsg)
ser.write(serMsg.encode())
def Void(self):
print("Void")
return True
win = MainGUI()
win.connect("destroy", Gtk.main_quit)
win.show_all()
Gtk.main()