refactor usbcardreader into own file

This commit is contained in:
Martin Quarda 2025-04-01 09:07:51 +02:00
parent ca8d008e77
commit 2e9c11d73b
3 changed files with 75 additions and 108 deletions

View File

@ -11,11 +11,12 @@ import json
gi.require_version('Gtk', '4.0')
from gi.repository import Gtk, Gio, Gdk
from requests.adapters import HTTPAdapter, Retry
from usbcardreader import UsbCardReader
with open('config.json', 'r') as f:
config = json.load(f)
activeCards = []
@ -84,57 +85,23 @@ def sendQueueToSend():
threading.Thread(target=sendQueueToSend).start()
class UsbCardReader():
def __init__(self, **kargs):
# Find the HID device by vendor/product ID
self.dev = usb.core.find(idVendor=USB_VENDOR, idProduct=USB_PRODUCT)
# Get and store the endpoint
self.endpoint = self.dev[0][(0,0)][0]
if self.dev.is_kernel_driver_active(USB_IF) is True:
self.dev.detach_kernel_driver(USB_IF)
# Claim the device
usb.util.claim_interface(self.dev, USB_IF)
self.receivedNumber = 0
def callbackOnCard(card_id):
global win, activeCards, db
if card_id not in db:
refreshDb()
if card_id in db:
if card_id not in activeCards:
activeCards.append(card_id)
win.arb(db[card_id])
print('Card is active now')
else:
print('Card is already active')
else:
print(f'Card {card_id} not found in db')
cardread=threading.Thread(target=self.read)
cardread.start()
def read(self):
global win, activeCards, db
UsbCardReader(callbackOnCard)
print("Waiting for card")
receivedNumber = 0
while True:
control = None
try:
control = self.dev.read(self.endpoint.bEndpointAddress, self.endpoint.wMaxPacketSize, USB_TIMEOUT)
if (control[2] != 40) & (control[2] != 0):
receivedDigit = control[2] - 29
if receivedDigit == 10:
receivedDigit = 0
receivedNumber = 10 * receivedNumber + receivedDigit
if (( control[0] == 0 )) & (( control[2] == 40 )) & (( not receivedNumber == 0 )):
if receivedNumber not in db:
refreshDb()
if receivedNumber in db:
if receivedNumber not in activeCards:
activeCards.append(receivedNumber)
win.arb(db[receivedNumber])
print('Card is active now')
else:
print('Card is already active')
else:
print(f'Card {receivedNumber} not found in db')
receivedNumber = 0
except KeyboardInterrupt:
exit()
except:
pass
time.sleep(0.001)
class GridRow():
def __init__(self, parent, racer, index):

View File

@ -10,16 +10,10 @@ from PyQt6 import QtWidgets, uic, QtGui
from queue import Queue
from requests.adapters import HTTPAdapter, Retry
from usbcardreader import UsbCardReader
app = QtWidgets.QApplication(sys.argv)
TIME = 600
USB_IF = 0 # Interface
USB_TIMEOUT = 5 # Timeout in ms
USB_VENDOR = 0xffff # Vendor-ID:
USB_PRODUCT = 0x0035 # Product-ID
with open('config.json', 'r') as f:
config = json.load(f)
@ -37,7 +31,6 @@ session.mount('https://', HTTPAdapter(max_retries=retries))
last_card = 0
last_time = datetime.datetime.now()
response = session.get(config['host'] + '/api/racers')
racers = response.json()
@ -120,8 +113,10 @@ def findByCard(card_id):
def updateLastCard(card_id):
global last_card, last_time
time = last_time = datetime.datetime.now()
last_card = card_id
window.lastCard.setText(str(card_id))
time = last_time
racer = findByCard(card_id)
if racer:
@ -142,60 +137,14 @@ def updateLastCard(card_id):
#mb = QtWidgets.QMessageBox(text=f"Úspěšné odhlášení závodníka {starting_number}!")
#mb.exec()
else:
queue.put({
queue.put({
'url': config['host'] + '/api/station/register',
'json': {'card_id': card_id, 'time': last_time.strftime('%d.%m.%Y %H:%M:%S.%f'), 'station_id': 1},
'json': {'card_id': card_id, 'time': last_time.strftime('%d.%m.%Y %H:%M:%S.%f'), 'station_id': 1},
})
#mb = QtWidgets.QMessageBox(text=f"Úspěšné odstartování závodníka {racer['starting_number']}!")
#mb.exec()
racer['started'] = True
UsbCardReader(updateLastCard)
class UsbCardReader():
def __init__(self, **kargs):
# Find the HID device by vendor/product ID
self.dev = usb.core.find(idVendor=USB_VENDOR, idProduct=USB_PRODUCT)
# Get and store the endpoint
self.endpoint = self.dev[0][(0,0)][0]
if self.dev.is_kernel_driver_active(USB_IF) is True:
self.dev.detach_kernel_driver(USB_IF)
# Claim the device
usb.util.claim_interface(self.dev, USB_IF)
self.receivedNumber = 0
cardread=threading.Thread(target=self.read)
cardread.start()
def read(self):
global last_card, last_time
print("Waiting for card")
receivedNumber = 0
while True:
control = None
try:
control = self.dev.read(self.endpoint.bEndpointAddress, self.endpoint.wMaxPacketSize, USB_TIMEOUT)
if (control[2] != 40) & (control[2] != 0):
receivedDigit = control[2] - 29
if receivedDigit == 10:
receivedDigit = 0
receivedNumber = 10 * receivedNumber + receivedDigit
if (( control[0] == 0 )) & (( control[2] == 40 )) & (( not receivedNumber == 0 )):
if last_card != receivedNumber or datetime.datetime.now() - last_time > datetime.timedelta(seconds=60):
last_time = datetime.datetime.now()
last_card = receivedNumber
updateLastCard(receivedNumber)
receivedNumber = 0
except KeyboardInterrupt:
exit()
except:
pass
time.sleep(0.001)
UsbCardReader()
app.exec()
app.exec()

View File

@ -0,0 +1,51 @@
import usb.core
import usb.util
USB_IF = 0 # Interface
USB_TIMEOUT = 5 # Timeout in ms
USB_VENDOR = 0xffff # Vendor-ID:
USB_PRODUCT = 0x0035 # Product-ID
class UsbCardReader:
def __init__(self, callback):
#fn called on found card
self.callback = callback
# Find the HID device by vendor/product ID
self.dev = usb.core.find(idVendor=USB_VENDOR, idProduct=USB_PRODUCT)
# Get and store the endpoint
self.endpoint = self.dev[0][(0,0)][0]
if self.dev.is_kernel_driver_active(USB_IF) is True:
self.dev.detach_kernel_driver(USB_IF)
# Claim the device
usb.util.claim_interface(self.dev, USB_IF)
self.receivedNumber = 0
cardread=threading.Thread(target=self.read)
cardread.start()
def read(self):
receivedNumber = 0
while True:
control = None
try:
control = self.dev.read(self.endpoint.bEndpointAddress, self.endpoint.wMaxPacketSize, USB_TIMEOUT)
if (control[2] != 40) & (control[2] != 0):
receivedDigit = control[2] - 29
if receivedDigit == 10:
receivedDigit = 0
receivedNumber = 10 * receivedNumber + receivedDigit
if (( control[0] == 0 )) & (( control[2] == 40 )) & (( not receivedNumber == 0 )):
self.callback(receivedNumber)
receivedNumber = 0
except KeyboardInterrupt:
exit()
except:
pass
time.sleep(0.001)