572 lines
21 KiB
Python
572 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Mon Mar 22 11:33:14 2021
|
|
|
|
@author: angoosh
|
|
"""
|
|
|
|
from serial.tools import list_ports
|
|
import serial
|
|
import gi
|
|
from time import sleep
|
|
import threading
|
|
import os
|
|
|
|
gi.require_version("Gtk", "3.0")
|
|
gi.require_version('GLib', '2.0')
|
|
|
|
from gi.repository import Gtk, GLib, Gdk
|
|
|
|
|
|
HANDSHAKE_MSG = b'Hello\n'
|
|
VAR_DESIGNATOR = "V"
|
|
REG_DESIGNATOR = "R"
|
|
CSV_FILE = "MONITOR_DATA.csv"
|
|
|
|
|
|
ser = None
|
|
baud = 19200
|
|
readPortEnable = False
|
|
readCsvEnable = False
|
|
rows = []
|
|
csvContent = []
|
|
csvContentOverArr = ""
|
|
testCsvFile = "test.csv"
|
|
csvGenCsvContent = ""
|
|
firsCsvRead = True
|
|
csvIsWrite = False
|
|
csvIsRead = False
|
|
|
|
|
|
def CsvGen(dataForCsv):#vyroba csv pro tabulku promennych, nutno dodat dekodovana data
|
|
global csvGenCsvContent, csvIsRead, csvIsWrite, csvContentOverArr
|
|
|
|
newCsvGenContent = []
|
|
Passed = False
|
|
|
|
dataForCsv = csvContentOverArr + dataForCsv
|
|
rawData = dataForCsv.split('\n')
|
|
csvGenLine = csvGenCsvContent.split('\n')
|
|
csvContentOverArr = rawData[-1]
|
|
rawData.remove(rawData[-1])
|
|
rawDataLen = len(rawData)
|
|
for line in rawData:
|
|
splitted = line.split(',')
|
|
try:
|
|
#csvGenLine = csvGenCsvContent.split('\n')
|
|
VarPos = [i for i, s in enumerate(csvGenLine) if ","+splitted[2]+"," in s]
|
|
if VarPos != []:
|
|
csvGenLineParts = csvGenLine[VarPos[0]].split(",")
|
|
csvGenLineParts[3] = splitted[3]
|
|
string = ""
|
|
for j in csvGenLineParts:
|
|
string += j+','
|
|
string = string[:-1]
|
|
csvGenLine[VarPos[0]] = string
|
|
else:
|
|
csvGenLine.append(line)
|
|
csvGenCsvContent = ""
|
|
except:
|
|
pass
|
|
|
|
for l in csvGenLine:
|
|
if l != "":
|
|
csvGenCsvContent += l
|
|
csvGenCsvContent += '\n'
|
|
else:
|
|
pass
|
|
|
|
while csvIsRead == True:
|
|
pass
|
|
csvIsWrite = True
|
|
with open(CSV_FILE,'w') as dataFile:
|
|
dataFile.write(csvGenCsvContent)
|
|
dataFile.close()
|
|
csvIsWrite = False
|
|
|
|
|
|
#Funkce bezici na pozadi
|
|
def readPort():#Cte data ze serioveho portu
|
|
global readPortEnable
|
|
print("MSG: READ_PORT_START")
|
|
file = open("PORT_DATA.log","w")
|
|
file.write("AQ_START\n")
|
|
file.close()
|
|
while readPortEnable == True:
|
|
with open("PORT_DATA.log","a") as file:
|
|
s = ser.read(10000)
|
|
data = s.decode()
|
|
CsvGen(data)
|
|
file.write(data)
|
|
file.close()
|
|
|
|
#sleep(1)
|
|
print("MSG: READ_PORT_EXIT")
|
|
|
|
def readCsv():#Cte csv vygenerovane CsvGen()
|
|
global csvContent, testCsvFile, csvIsRead, csvIsWrite
|
|
|
|
#while readCsvEnable == True:
|
|
while csvIsWrite == True:
|
|
pass
|
|
csvIsRead = True
|
|
with open(CSV_FILE,"r") as file:
|
|
content = file.read()
|
|
lines = content.split("\n")
|
|
csvContent = lines
|
|
file.close()
|
|
csvIsRead = False
|
|
|
|
MainGUI.ApplyCsv()
|
|
|
|
if readCsvEnable == True :
|
|
return True
|
|
else:
|
|
print("MSG: READ_CSV_EXIT")
|
|
return False
|
|
|
|
def get_resource_path(rel_path):
|
|
dir_of_py_file = os.path.dirname(__file__)
|
|
rel_path_to_resource = os.path.join(dir_of_py_file, rel_path)
|
|
abs_path_to_resource = os.path.abspath(rel_path_to_resource)
|
|
return abs_path_to_resource
|
|
|
|
class MainGUI(Gtk.Window):
|
|
def __init__(self):
|
|
Gtk.Window.__init__(self, title="Monitor")
|
|
self.set_icon_from_file(get_resource_path("Icons/SerialPort.png"))
|
|
self.set_resizable(False)
|
|
|
|
#self.set_border_width(10)
|
|
|
|
self.tid = None
|
|
|
|
self.portList = Gtk.ListStore(str)
|
|
self.baudList = Gtk.ListStore(str)
|
|
|
|
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
|
|
self.add(vbox)
|
|
|
|
box = Gtk.Box(spacing=6)
|
|
vbox.pack_start(box, False, False, 0)
|
|
|
|
self.port_combo = Gtk.ComboBox.new_with_model_and_entry(self.portList)
|
|
self.port_combo.connect("changed", self.on_name_combo_changed)
|
|
self.port_combo.set_entry_text_column(0)
|
|
self.refresh("ignore")
|
|
try:
|
|
self.port_combo.set_active(0)
|
|
except:
|
|
pass
|
|
box.pack_start(self.port_combo, False, False, 0)
|
|
|
|
self.baud_combo = Gtk.ComboBox.new_with_model_and_entry(self.baudList)
|
|
self.baud_combo.connect("changed", self.baud_change)
|
|
self.baud_combo.set_entry_text_column(0)
|
|
|
|
bauds = ["9600",
|
|
"19200",
|
|
"38400",
|
|
"57600",
|
|
"115200",
|
|
"250000",
|
|
"500000",
|
|
"1000000"
|
|
]
|
|
|
|
for baud in bauds:
|
|
self.baudList.append([baud])
|
|
|
|
self.baud_combo.set_active(1)
|
|
box.pack_start(self.baud_combo, False, False, 0)
|
|
|
|
button1 = Gtk.Button()
|
|
button1.connect("clicked", self.refresh)
|
|
button1.set_image(Gtk.Image.new_from_icon_name("view-refresh",1))
|
|
button1.set_size_request(20,20)
|
|
button1.set_halign(Gtk.Align.END)
|
|
box.pack_start(button1, True, False, 0)
|
|
|
|
button2 = Gtk.Button(label="Connect")
|
|
button2.set_image(Gtk.Image.new_from_icon_name("Connect",1))
|
|
button2.connect("clicked", self.connect_disconnect)
|
|
box.pack_start(button2, True, True, 0)
|
|
|
|
tabs = Gtk.Notebook()
|
|
vbox.pack_start(tabs, False, False, 0)
|
|
|
|
varScroll = Gtk.ScrolledWindow()
|
|
varScroll.set_min_content_height(600)
|
|
self.variable_tab = Gtk.Grid()
|
|
varScroll.add_with_viewport(self.variable_tab)
|
|
tabs.append_page(varScroll, Gtk.Label("Variables"))#self.variable_tab
|
|
|
|
regScroll = Gtk.ScrolledWindow()
|
|
regScroll.set_min_content_height(600)
|
|
self.register_tab = Gtk.Grid()
|
|
regScroll.add_with_viewport(self.register_tab)
|
|
tabs.append_page(regScroll, Gtk.Label("Registers"))
|
|
|
|
#self.addRow("ignore")
|
|
|
|
|
|
def refresh(self, widget):
|
|
CsvGen("none")
|
|
ports = list_ports.comports()
|
|
self.portList.clear()
|
|
for port in ports:
|
|
self.portList.append([port[0]])
|
|
try:
|
|
self.port_combo.set_active(0)
|
|
except:
|
|
pass
|
|
|
|
def baud_change(self, combo):
|
|
global baud
|
|
tree_iter = combo.get_active_iter()
|
|
if tree_iter is not None:
|
|
model = combo.get_model()
|
|
baud = model[tree_iter][0]
|
|
print("Selected Baud: %s" % (baud))
|
|
else:
|
|
entry = combo.get_child()
|
|
print("Entered Baud: %s" % entry.get_text())
|
|
|
|
def connect_disconnect(self, butt):
|
|
global ser, baud, readPortEnable, HANDSHAKE_MSG, readCsvEnable, firsCsvRead
|
|
if butt.get_label() == "Connect":
|
|
try:
|
|
for i in range(0,len(rows)):
|
|
self.variable_tab.remove_row(0)
|
|
self.register_tab.remove_row(0)
|
|
except:
|
|
pass
|
|
firsCsvRead = True
|
|
|
|
self.port_combo.set_sensitive(False)
|
|
self.baud_combo.set_sensitive(False)
|
|
|
|
tree_iter = self.port_combo.get_active_iter()
|
|
model = self.port_combo.get_model()
|
|
port = model[tree_iter][0]
|
|
ser = serial.Serial(port, baud, timeout=1)
|
|
print("Connecting to:"+ port)
|
|
butt.set_label("Disconnect")
|
|
ser.write(b'Hello\n')
|
|
try:
|
|
os.remove(CSV_FILE)
|
|
except:
|
|
print("MSG: NO MONITOR FILE TO REMOVE")
|
|
open(CSV_FILE, 'w').close()
|
|
# while(ser.readline() != HANDSHAKE_MSG):
|
|
# sleep(0.5)
|
|
|
|
readPortEnable = True
|
|
Port_thread = threading.Thread(target=readPort)
|
|
Port_thread.start()
|
|
|
|
readCsvEnable = True
|
|
GLib.timeout_add_seconds(1, readCsv)
|
|
print("MSG: READ_CSV_START")
|
|
#Csv_thread = threading.Thread(target=readCsv)
|
|
#Csv_thread.start()
|
|
|
|
self.tid = GLib.timeout_add_seconds(1, self.addRow, None)
|
|
else:
|
|
ser.close()
|
|
self.port_combo.set_sensitive(True)
|
|
self.baud_combo.set_sensitive(True)
|
|
readPortEnable = False
|
|
readCsvEnable = False
|
|
for row in rows:
|
|
row[3].set_editable(False)
|
|
print("Disconnecting")
|
|
butt.set_label("Connect")
|
|
|
|
def on_name_combo_changed(self, combo):
|
|
tree_iter = combo.get_active_iter()
|
|
if tree_iter is not None:
|
|
model = combo.get_model()
|
|
port = model[tree_iter][0]
|
|
print("Selected Port: %s" % (port))
|
|
else:
|
|
entry = combo.get_child()
|
|
print("Entered: %s" % entry.get_text())
|
|
|
|
def addRow(self, widget):
|
|
global rows, csvContent, testCsvFile, firsCsvRead
|
|
iteration = 0
|
|
csvContentIsNone = True
|
|
|
|
with open(CSV_FILE,"r") as file:
|
|
content = file.read()
|
|
lines = content.split("\n")
|
|
file.close()
|
|
|
|
if firsCsvRead == False:
|
|
for line in lines:
|
|
x = line.split(',')
|
|
y = csvContent[iteration].split(',')
|
|
if csvContent[iteration] == "":
|
|
y = [0,0]
|
|
if line == "":
|
|
#print("BLANK_LINE!")
|
|
iteration += 1
|
|
pass
|
|
else:
|
|
if x[1] == y[1]:
|
|
#print("DUPE")
|
|
iteration += 1
|
|
else:
|
|
print("EDIT")
|
|
try:
|
|
csvContent.append(line)
|
|
inLine = line.split(",")
|
|
if inLine[0] == "V":
|
|
varType = inLine[1]
|
|
varName = inLine[2]
|
|
value = inLine[3]
|
|
address = inLine[4]
|
|
|
|
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
|
|
f = Gtk.Frame()
|
|
f.add(typeLabel)
|
|
self.variable_tab.attach(f,0,iteration,1,1)
|
|
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
|
|
f = Gtk.Frame()
|
|
f.add(nameLabel)
|
|
self.variable_tab.attach(f,1,iteration,1,1)
|
|
|
|
nentry = Gtk.Entry()
|
|
nentry.set_hexpand(False)
|
|
nentry.set_text(value)
|
|
nentry.set_editable(False)
|
|
self.variable_tab.attach(nentry,2,iteration,1,1)
|
|
|
|
edit = Gtk.Entry()
|
|
edit.set_hexpand(False)
|
|
edit.set_placeholder_text("Edit")
|
|
edit.set_max_length(16)
|
|
edit.connect("activate", self.SendData)
|
|
self.variable_tab.attach(edit,3,iteration,1,1)
|
|
|
|
self.variable_tab.show_all()
|
|
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
|
|
rows.append(rowContent)
|
|
elif inLine[0] == "R":
|
|
varType = inLine[1]
|
|
varName = inLine[2]
|
|
value = inLine[3]
|
|
value = int(value)
|
|
value = str(hex(value))
|
|
address = inLine[4]
|
|
|
|
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
|
|
f = Gtk.Frame()
|
|
f.add(typeLabel)
|
|
self.register_tab.attach(f,0,iteration,1,1)
|
|
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
|
|
f = Gtk.Frame()
|
|
f.add(nameLabel)
|
|
self.register_tab.attach(f,1,iteration,1,1)
|
|
|
|
nentry = Gtk.Entry()
|
|
nentry.set_hexpand(False)
|
|
nentry.set_text(value)
|
|
nentry.set_editable(False)
|
|
self.register_tab.attach(nentry,2,iteration,1,1)
|
|
|
|
edit = Gtk.Entry()
|
|
edit.set_hexpand(False)
|
|
edit.set_placeholder_text("Edit")
|
|
edit.set_max_length(10)
|
|
edit.connect("activate", self.SendData)
|
|
self.register_tab.attach(edit,3,iteration,1,1)
|
|
|
|
self.register_tab.show_all()
|
|
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
|
|
rows.append(rowContent)
|
|
else:
|
|
print("VAR_CLASS_ERROR!")
|
|
|
|
iteration += 1
|
|
except:
|
|
pass
|
|
else:
|
|
csvContent = []
|
|
rows = []
|
|
print(lines)
|
|
for line in lines:
|
|
if line == "":
|
|
print("BLANK_LINE!")
|
|
iteration += 1
|
|
pass
|
|
else:
|
|
try:
|
|
csvContent.append(line)
|
|
inLine = line.split(",")
|
|
if inLine[0] == "V":
|
|
varType = inLine[1]
|
|
varName = inLine[2]
|
|
value = inLine[3]
|
|
address = inLine[4]
|
|
|
|
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
|
|
f = Gtk.Frame()
|
|
f.add(typeLabel)
|
|
self.variable_tab.attach(f,0,iteration,1,1)
|
|
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
|
|
f = Gtk.Frame()
|
|
f.add(nameLabel)
|
|
self.variable_tab.attach(f,1,iteration,1,1)
|
|
|
|
nentry = Gtk.Entry()
|
|
nentry.set_hexpand(False)
|
|
nentry.set_text(value)
|
|
nentry.set_editable(False)
|
|
self.variable_tab.attach(nentry,2,iteration,1,1)
|
|
|
|
edit = Gtk.Entry()
|
|
edit.set_hexpand(False)
|
|
edit.set_placeholder_text("Edit")
|
|
edit.set_max_length(16)
|
|
edit.connect("activate", self.SendData)
|
|
self.variable_tab.attach(edit,3,iteration,1,1)
|
|
|
|
self.variable_tab.show_all()
|
|
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
|
|
rows.append(rowContent)
|
|
elif inLine[0] == "R":
|
|
varType = inLine[1]
|
|
varName = inLine[2]
|
|
value = inLine[3]
|
|
value = int(value)
|
|
value = str(hex(value))
|
|
address = inLine[4]
|
|
|
|
typeLabel = Gtk.Label(label=varType, halign=Gtk.Align.START)
|
|
f = Gtk.Frame()
|
|
f.add(typeLabel)
|
|
self.register_tab.attach(f,0,iteration,1,1)
|
|
nameLabel = Gtk.Label(label=varName+":", halign=Gtk.Align.END)
|
|
f = Gtk.Frame()
|
|
f.add(nameLabel)
|
|
self.register_tab.attach(f,1,iteration,1,1)
|
|
|
|
nentry = Gtk.Entry()
|
|
nentry.set_hexpand(False)
|
|
nentry.set_text(value)
|
|
nentry.set_editable(False)
|
|
self.register_tab.attach(nentry,2,iteration,1,1)
|
|
|
|
edit = Gtk.Entry()
|
|
edit.set_hexpand(False)
|
|
edit.set_placeholder_text("Edit")
|
|
edit.set_max_length(10)
|
|
edit.connect("activate", self.SendData)
|
|
self.register_tab.attach(edit,3,iteration,1,1)
|
|
|
|
self.register_tab.show_all()
|
|
rowContent = [len(rows),typeLabel, nameLabel, nentry, edit, address]
|
|
rows.append(rowContent)
|
|
else:
|
|
print("VAR_CLASS_ERROR!")
|
|
|
|
iteration += 1
|
|
except:
|
|
pass
|
|
#GLib.idle_add(self.Void)
|
|
firsCsvRead = False
|
|
return readCsvEnable
|
|
|
|
def ApplyCsv():
|
|
global csvContent, rows
|
|
values = []
|
|
isReg = False
|
|
iteration = 0
|
|
for content in csvContent:
|
|
try:
|
|
bits = content.split(",")
|
|
if bits[0] == 'V':
|
|
values.append(bits[3])
|
|
elif bits[0] == 'R':
|
|
isReg = True
|
|
x = int(bits[3])
|
|
x = str(hex(x))
|
|
values.append(x)
|
|
except:
|
|
pass
|
|
for row in rows:
|
|
row[3].set_text(values[iteration])
|
|
iteration += 1
|
|
|
|
def SendData(self, entry):
|
|
global ser
|
|
serMsg = "A"
|
|
value = entry.get_text()
|
|
numValue = 0
|
|
|
|
print("MSG: ENTERED="+value)
|
|
for row in rows:
|
|
if row[4] == entry:
|
|
if row[1].get_text() == "uint8_t":
|
|
serMsg += '0'
|
|
numValue = int(value)
|
|
numValue = numValue % 256
|
|
value = str(numValue)
|
|
elif row[1].get_text() == "int8_t":
|
|
serMsg += '1'
|
|
numValue = int(value)
|
|
numValue = numValue % 256
|
|
if numValue > 127:
|
|
numValue -= 256
|
|
value = str(numValue)
|
|
elif row[1].get_text() == "uint16_t":
|
|
serMsg += '2'
|
|
numValue = int(value)
|
|
numValue = numValue % 65536
|
|
value = str(numValue)
|
|
elif row[1].get_text() == "int16_t":
|
|
serMsg += '3'
|
|
numValue = int(value)
|
|
numValue = numValue % 65536
|
|
if numValue > (65536/2)-1:
|
|
numValue -= 65536
|
|
value = str(numValue)
|
|
elif row[1].get_text() == "uint32_t":
|
|
serMsg += '4'
|
|
numValue = int(value)
|
|
numValue = numValue % 4294967296
|
|
value = str(numValue)
|
|
elif row[1].get_text() == "int32_t":
|
|
serMsg += '5'
|
|
numValue = int(value)
|
|
numValue = numValue % 4294967296
|
|
if numValue > (4294967296/2)-1:
|
|
numValue -= 4294967296
|
|
value = str(numValue)
|
|
elif row[1].get_text() == "float":
|
|
serMsg += '6'
|
|
elif row[1].get_text() == "reg":
|
|
serMsg += '7'
|
|
numValue = int(value)
|
|
numValue = numValue % 4294967296
|
|
value = str(numValue)
|
|
|
|
serMsg += row[5]
|
|
serMsg += value
|
|
serMsg += "\n"
|
|
print("MSG: SERIAL WRITE: "+serMsg)
|
|
|
|
ser.write(serMsg.encode())
|
|
|
|
def Void(self):
|
|
print("Void")
|
|
return True
|
|
|
|
|
|
win = MainGUI()
|
|
win.connect("destroy", Gtk.main_quit)
|
|
win.show_all()
|
|
Gtk.main() |