Alkator/SW/PC/Stopwatch/registrace.py
2025-04-01 10:47:02 +02:00

201 lines
6.1 KiB
Python

import sys
import datetime
import requests
import threading
import time
import json
import usb.core
import usb.util
from PyQt6 import QtWidgets, uic, QtGui
from queue import Queue
from requests.adapters import HTTPAdapter, Retry
app = QtWidgets.QApplication(sys.argv)
TIME = 600
USB_IF = 0 # Interface
USB_TIMEOUT = 5 # Timeout in ms
USB_VENDOR = 0xffff # Vendor-ID:
USB_PRODUCT = 0x0035 # Product-ID
with open('config.json', 'r') as f:
config = json.load(f)
session = requests.Session()
session.post(config['host'] + '/api/login', {'email':config['login'], 'password': config['password']})
retries = Retry(total=3,
backoff_factor=1,
status_forcelist=[ 500, 502, 503, 504 ],)
session.mount('https://', HTTPAdapter(max_retries=retries))
last_card = 0
last_time = datetime.datetime.now()
response = session.get(config['host'] + '/api/racers')
racers = response.json()
db = []
with open('db.json', 'r') as f:
db = json.loads(f.read())
queue = Queue()
def worker():
while True:
item = queue.get()
db.append(item)
with open('db.json', 'w') as f:
f.write(json.dumps(db))
response = session.post(item['url'], json=item['json'])
print(item, response)
threading.Thread(target=worker).start()
def register_racer():
card_id = last_card
index = window.racers.currentIndex()
if not index:
mb = QtWidgets.QMessageBox(text='Prosím vyberte závodníka!')
mb.exec()
return
racer_id = int(index.data().split(' ')[-1])
try:
starting_number = max(filter(lambda r: r['starting_number'], racers), key=lambda racer: racer['starting_number'])['starting_number'] + 1
except ValueError:
starting_number = 1
queue.put({
'url': config['host'] + '/api/card/register',
'json': {'racer_id': racer_id, 'starting_number': starting_number, 'card_id': card_id}
})
racer = list(filter(lambda x: x['racer_id'] == racer_id, racers))[0]
racer['card_id'] = card_id
racer['starting_number'] = starting_number
racer['started'] = False
updateRacers()
mb = QtWidgets.QMessageBox(text=f"Úspěšné zaregistrování závodníka {racer['first_name']} {racer['last_name']} se startovním číslem {starting_number}!")
mb.exec()
window = uic.loadUi("registrace.ui")
window.register_racer.clicked.connect(register_racer)
window.show()
model = QtGui.QStandardItemModel()
window.racers.setModel(model)
def updateRacers():
model.clear()
for racer in racers:
if racer['starting_number'] or racer['card_id']:
continue
item = QtGui.QStandardItem(f"{racer['last_name']} {racer['first_name']} {racer['date_of_birth']} {racer['racer_id']}")
model.appendRow(item)
updateRacers()
def findByCard(card_id):
for racer in racers:
if racer['card_id'] == card_id:
return racer
def updateLastCard(card_id):
window.lastCard.setText(str(card_id))
time = last_time
racer = findByCard(card_id)
if racer:
if racer['started']:
queue.put({
'url': config['host'] + '/api/station/register',
'json': {'card_id': card_id, 'time': time.strftime('%d.%m.%Y %H:%M:%S.%f'), 'station_id': 6}
})
racer = list(filter(lambda x: x['card_id'] == card_id, racers))[0]
starting_number = racer['starting_number']
racer_id = racer['racer_id']
racer['card_id'] = None
queue.put({
'url': config['host'] + '/api/card/unregister',
'json': {'racer_id': racer_id, 'starting_number': starting_number, 'card_id': card_id},
})
#mb = QtWidgets.QMessageBox(text=f"Úspěšné odhlášení závodníka {starting_number}!")
#mb.exec()
else:
queue.put({
'url': config['host'] + '/api/station/register',
'json': {'card_id': card_id, 'time': last_time.strftime('%d.%m.%Y %H:%M:%S.%f'), 'station_id': 1},
})
#mb = QtWidgets.QMessageBox(text=f"Úspěšné odstartování závodníka {racer['starting_number']}!")
#mb.exec()
racer['started'] = True
class UsbCardReader():
def __init__(self, **kargs):
# Find the HID device by vendor/product ID
self.dev = usb.core.find(idVendor=USB_VENDOR, idProduct=USB_PRODUCT)
# Get and store the endpoint
self.endpoint = self.dev[0][(0,0)][0]
if self.dev.is_kernel_driver_active(USB_IF) is True:
self.dev.detach_kernel_driver(USB_IF)
# Claim the device
usb.util.claim_interface(self.dev, USB_IF)
self.receivedNumber = 0
cardread=threading.Thread(target=self.read)
cardread.start()
def read(self):
global last_card, last_time
print("Waiting for card")
receivedNumber = 0
while True:
control = None
try:
control = self.dev.read(self.endpoint.bEndpointAddress, self.endpoint.wMaxPacketSize, USB_TIMEOUT)
if (control[2] != 40) & (control[2] != 0):
receivedDigit = control[2] - 29
if receivedDigit == 10:
receivedDigit = 0
receivedNumber = 10 * receivedNumber + receivedDigit
if (( control[0] == 0 )) & (( control[2] == 40 )) & (( not receivedNumber == 0 )):
if last_card != receivedNumber or datetime.datetime.now() - last_time > datetime.timedelta(seconds=60):
last_time = datetime.datetime.now()
last_card = receivedNumber
updateLastCard(receivedNumber)
receivedNumber = 0
except KeyboardInterrupt:
exit()
except:
pass
time.sleep(0.001)
UsbCardReader()
app.exec()