245 lines
6.8 KiB
Python
245 lines
6.8 KiB
Python
import gi
|
|
import threading
|
|
import time
|
|
import datetime
|
|
import collections
|
|
import usb.core
|
|
import usb.util
|
|
import requests
|
|
import json
|
|
|
|
gi.require_version('Gtk', '4.0')
|
|
from gi.repository import Gtk, Gio, Gdk
|
|
from requests.adapters import HTTPAdapter, Retry
|
|
|
|
activeCards = []
|
|
|
|
TIME = 600
|
|
|
|
USB_IF = 0 # Interface
|
|
USB_TIMEOUT = 5 # Timeout in ms
|
|
USB_VENDOR = 0xffff # Vendor-ID:
|
|
USB_PRODUCT = 0x0035 # Product-ID
|
|
|
|
db = {}
|
|
|
|
queue_to_send = []
|
|
|
|
with open('config.json', 'r') as f:
|
|
config = json.load(f)
|
|
|
|
session = requests.Session()
|
|
|
|
|
|
retries = Retry(total=10,
|
|
backoff_factor=1,
|
|
status_forcelist=[ 500, 502, 503, 504 ])
|
|
|
|
session.mount('https://', HTTPAdapter(max_retries=retries))
|
|
|
|
|
|
session.post(config['host'] + '/api/login', {'email':config['login'], 'password': config['password']})
|
|
|
|
def refreshDb():
|
|
global db
|
|
resp = session.get(config['host'] + '/api/racers')
|
|
if resp.status_code == 200:
|
|
json = resp.json()
|
|
db = {}
|
|
for racer in json:
|
|
if racer["card_id"]:
|
|
db[racer["card_id"]] = racer
|
|
|
|
|
|
def refreshDbEvery3mins():
|
|
while True:
|
|
refreshDb()
|
|
time.sleep(3 * 60)
|
|
|
|
|
|
Thread(target=refreshDbEvery3mins).start()
|
|
|
|
|
|
def sendQueueToSend():
|
|
global queue_to_send
|
|
while True:
|
|
if len(queue_to_send) > 0:
|
|
to_send = queue_to_send.pop()
|
|
|
|
response = session.post(config['host'] + '/api/station/register',
|
|
json=to_send
|
|
)
|
|
if response.status_code != 200:
|
|
if response.status_code == 400:
|
|
continue
|
|
queue_to_send.append(to_send) #try again later
|
|
time.sleep(1)
|
|
else:
|
|
time.sleep(1)
|
|
|
|
|
|
Thread(target=sendQueueToSend).start()
|
|
|
|
|
|
class UsbCardReader():
|
|
def __init__(self, **kargs):
|
|
# Find the HID device by vendor/product ID
|
|
self.dev = usb.core.find(idVendor=USB_VENDOR, idProduct=USB_PRODUCT)
|
|
# Get and store the endpoint
|
|
self.endpoint = self.dev[0][(0,0)][0]
|
|
if self.dev.is_kernel_driver_active(USB_IF) is True:
|
|
self.dev.detach_kernel_driver(USB_IF)
|
|
# Claim the device
|
|
usb.util.claim_interface(self.dev, USB_IF)
|
|
self.receivedNumber = 0
|
|
|
|
cardread=threading.Thread(target=self.read)
|
|
cardread.start()
|
|
|
|
def read(self):
|
|
global win, activeCards, db
|
|
|
|
print("Waiting for card")
|
|
receivedNumber = 0
|
|
|
|
while True:
|
|
control = None
|
|
try:
|
|
control = self.dev.read(self.endpoint.bEndpointAddress, self.endpoint.wMaxPacketSize, USB_TIMEOUT)
|
|
if (control[2] != 40) & (control[2] != 0):
|
|
receivedDigit = control[2] - 29
|
|
if receivedDigit == 10:
|
|
receivedDigit = 0
|
|
receivedNumber = 10 * receivedNumber + receivedDigit
|
|
|
|
if (( control[0] == 0 )) & (( control[2] == 40 )) & (( not receivedNumber == 0 )):
|
|
if receivedNumber not in db:
|
|
refreshDb()
|
|
if receivedNumber in db:
|
|
if receivedNumber not in activeCards:
|
|
activeCards.append(receivedNumber)
|
|
win.arb(db[receivedNumber])
|
|
print('Card is active now')
|
|
else:
|
|
print('Card is already active')
|
|
else:
|
|
print(f'Card {receivedNumber} not found in db')
|
|
receivedNumber = 0
|
|
|
|
except KeyboardInterrupt:
|
|
exit()
|
|
except:
|
|
pass
|
|
|
|
time.sleep(0.001)
|
|
|
|
class GridRow():
|
|
def __init__(self, parent, racer, index):
|
|
global queue_to_send
|
|
self.id = racer['starting_number']
|
|
self.parent = parent
|
|
|
|
self.grid = Gtk.Grid()
|
|
|
|
if index % 2 == 0:
|
|
self.grid.set_name("lightgrid")
|
|
else:
|
|
self.grid.set_name("darkgrid")
|
|
|
|
self.runid = Gtk.Label(label=str(racer["starting_number"]), name="gridlabel")
|
|
self.runid.set_justify(2)
|
|
self.runid.set_width_chars(10)
|
|
|
|
self.runb = Gtk.Label(label='00:00', name="gridlabel")
|
|
self.runb.set_hexpand(1)
|
|
|
|
self.grid.attach(self.runid, 0, 0, 1, 1)
|
|
self.grid.attach(self.runb, 1, 0, 1, 1)
|
|
|
|
parent.grid.attach(self.grid, 0, index, 2, 1)
|
|
|
|
countdown=threading.Thread(target=self.updateTime)
|
|
countdown.start()
|
|
|
|
queue_to_send.append({
|
|
'card_id': racer['card_id'],
|
|
'station_id': config['station_id'],
|
|
'time': datetime.datetime.now().strftime('%d.%m.%Y %H:%M:%S.%f')
|
|
})
|
|
|
|
def printIndex(self, button):
|
|
print(self.index)
|
|
|
|
def updateTime(self):
|
|
t = TIME
|
|
while t >= -10:
|
|
if t < 0:
|
|
mins, secs = divmod((t * -1), 60)
|
|
timer = '-{:02d}:{:02d}'.format(mins, secs)
|
|
else:
|
|
mins, secs = divmod(t, 60)
|
|
timer = '{:02d}:{:02d}'.format(mins, secs)
|
|
self.runb.set_label(timer)
|
|
time.sleep(1)
|
|
t -= 1
|
|
if t == 10:
|
|
if self.grid.get_name() == "lightgrid":
|
|
self.grid.set_name("lredgrid")
|
|
else:
|
|
self.grid.set_name("dredgrid")
|
|
|
|
GridWindow.remRow(self.parent)
|
|
|
|
class GridWindow(Gtk.ApplicationWindow):
|
|
def __init__(self, **kargs):
|
|
super().__init__(**kargs, title='Alkator Clock')
|
|
|
|
css_provider = Gtk.CssProvider()
|
|
css_provider.load_from_file(Gio.File.new_for_path("style.css"))
|
|
Gtk.StyleContext.add_provider_for_display(Gdk.Display.get_default(), css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
|
|
|
|
runlabel = Gtk.Label(label="ID", name="gridlabel")
|
|
runlabel.set_justify(2)
|
|
cntlabel = Gtk.Label(label="Countdown", name="gridlabel")
|
|
cntlabel.set_justify(2)
|
|
|
|
self.runners = []
|
|
|
|
self.grid = Gtk.Grid()
|
|
self.grid.attach(runlabel, 0, 0, 1, 1)
|
|
self.grid.attach(cntlabel, 1, 0, 2, 1)
|
|
|
|
self.set_child(self.grid)
|
|
|
|
def arb(self, racer):
|
|
self.runners.append(GridRow(self, racer, len(activeCards)))
|
|
|
|
def rrb(self, button):
|
|
self.remRow()
|
|
|
|
def addRow(self, runner, index):
|
|
runid = Gtk.Label(label=str(runner))
|
|
runb = Gtk.Button(label='time_placeholder')
|
|
|
|
self.grid.attach(runid, 0, index, 1, 1)
|
|
self.grid.attach(runb, 1, index, 1, 1)
|
|
|
|
def remRow(self):
|
|
global activeCards
|
|
if len(activeCards) > 0:
|
|
self.grid.remove_row(1)
|
|
del activeCards[0]
|
|
|
|
|
|
def on_activate(app):
|
|
# Create window
|
|
global win
|
|
win = GridWindow(application=app)
|
|
win.present()
|
|
|
|
UsbCardReader()
|
|
app = Gtk.Application(application_id='com.example.App')
|
|
app.connect('activate', on_activate)
|
|
|
|
app.run(None)
|