import sys import datetime import requests import json import usb.core import usb.util from PyQt6 import QtWidgets, uic, QtGui from requests.adapters import HTTPAdapter, Retry app = QtWidgets.QApplication(sys.argv) TIME = 600 USB_IF = 0 # Interface USB_TIMEOUT = 5 # Timeout in ms USB_VENDOR = 0xffff # Vendor-ID: USB_PRODUCT = 0x0035 # Product-ID with open('config.json', 'r') as f: config = json.load(f) session = requests.Session() session.post(config['host'] + '/api/login', {'email':config['login'], 'password': config['password']}) retries = Retry(total=3, backoff_factor=1, status_forcelist=[ 500, 502, 503, 504 ],) session.mount('https://', HTTPAdapter(max_retries=retries)) last_card = 0 last_time = datetime.datetime.now() response = session.get(config['host'] + '/api/racers') racers = response.json() def register_racer(): card_id = last_card index = window.racers.currentIndex() racer_id = int(index.data().split(' ')[-1]) try: starting_number = max(filter(lambda r: r['starting_number'], racers), key=lambda racer: racer['starting_number'])['starting_number'] + 1 except ValueError: starting_number = 1 response = session.post(config['host'] + '/api/card/register', json={'racer_id': racer_id, 'starting_number': starting_number, 'card_id': card_id}) if response.status_code == 400: mb = QtWidgets.QMessageBox(text="Zaregistrování selhalo! Je špatně vyplněné startovní číslo nebo karta už je přiřazená k jinému závodníku.") mb.exec() return if response.status_code != 200: mb = QtWidgets.QMessageBox(text=f"Neznámá chyba! {response.status_code} {response.content}") mb.exec() return racer = list(filter(lambda x: x['racer_id'] == racer_id, racers))[0] racer['card_id'] = card_id racer['starting_number'] = starting_number racer['started'] = False updateRacers() mb = QtWidgets.QMessageBox(text=f"Úspěšné zaregistrování závodníka se startovním číslem {starting_number}!") mb.exec() window = uic.loadUi("registrace.ui") window.register_racer.clicked.connect(register_racer) window.show() model = QtGui.QStandardItemModel() window.racers.setModel(model) def updateRacers(): model.clear() for racer in racers: if racer['starting_number'] or racer['card_id']: continue item = QtGui.QStandardItem(f"{racer['last_name']} {racer['first_name']} {racer['date_of_birth']} {racer['racer_id']}") model.appendRow(item) updateRacers() def findByCard(card_id): for racer in racers: if racer['card_id'] == card_id: return racer def updateLastCard(card_id): window.lastCard.setText(str(card_id)) time = last_time racer = findByCard(card_id) if racer: if racer['started']: response = session.post(config['host'] + '/api/station/register', json={'card_id': card_id, 'time': time.strftime('%d.%m.%Y %H:%M:%S.%f'), 'station_id': 6}) if response != 200: mb = QtWidgets.QMessageBox(text="Selhalo zapsání na Stanici!") mb.exec() return racer = list(filter(lambda x: x['card_id'] == card_id, racers))[0] starting_number = racer['starting_number'] racer_id = racer['racer_id'] racer['card_id'] = None response = session.post(config['host'] + '/api/card/unregister', json={'racer_id': racer_id, 'starting_number': starting_number, 'card_id': card_id}) if response.status_code != 200: mb = QtWidgets.QMessageBox(text="Registrace na stanici proběhla úspěšně, ale karta nebyla odhlášena!") mb.exec() return json = response.json() mb = QtWidgets.QMessageBox(text=f"Úspěšné odhlášení závodníka! Finální čas je: {json['time']}") mb.exec() else: response = session.post(config['host'] + '/api/station/register', json={'card_id': card_id, 'time': last_time.strftime('%d.%m.%Y %H:%M:%S.%f'), 'station_id': 1}) if response.status_code != 200: mb = QtWidgets.QMessageBox(text="Nastala nějaká chyba na serveru!") mb.exec() return mb = QtWidgets.QMessageBox(text=f"Úspěšné odstartování závodníka {racer['starting_number']}!") mb.exec() racer['started'] = True class UsbCardReader(): def __init__(self, **kargs): # Find the HID device by vendor/product ID self.dev = usb.core.find(idVendor=USB_VENDOR, idProduct=USB_PRODUCT) # Get and store the endpoint self.endpoint = self.dev[0][(0,0)][0] if self.dev.is_kernel_driver_active(USB_IF) is True: self.dev.detach_kernel_driver(USB_IF) # Claim the device usb.util.claim_interface(self.dev, USB_IF) self.receivedNumber = 0 cardread=threading.Thread(target=self.read) cardread.start() def read(self): global last_card, last_time print("Waiting for card") receivedNumber = 0 while True: control = None try: control = self.dev.read(self.endpoint.bEndpointAddress, self.endpoint.wMaxPacketSize, USB_TIMEOUT) if (control[2] != 40) & (control[2] != 0): receivedDigit = control[2] - 29 if receivedDigit == 10: receivedDigit = 0 receivedNumber = 10 * receivedNumber + receivedDigit if (( control[0] == 0 )) & (( control[2] == 40 )) & (( not receivedNumber == 0 )): if last_card != receivedNumber or datetime.datetime.now() - last_time > datetime.timedelta(seconds=10): updateLastCard(receivedNumber) last_time = datetime.datetime.now() last_card = receivedNumber receivedNumber = 0 except KeyboardInterrupt: exit() except: pass time.sleep(0.001) UsbCardReader() app.exec()