2025-04-09 23:18:11 +02:00

417 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 28 19:30:56 2022
@author: angoosh
"""
import paho.mqtt.client as mqtt #knihovna pro prijem mqtt packetu
import paho.mqtt.publish as publish
import json #knihovna pro praci s json formatem
import datetime #knihovna pro nacteni casu
import os, os.path #os knihovna
import errno #knihovna pro chyby
import sys #knihovna pro praci s interpretrem
import psycopg2 #knihovna pro praci s postgresql databazi
import re #regex
#nacti aktualni pracovni slozku
WORKDIR = os.getcwd()
DEVICES=["2cf7f1203230a02e"]
#MQTT port
PORT = 1884
APPID = ""
PSW = ""
#promenne pro jednotlive moznosti zapisu
write_json = 0
write_db = 0
keep_last = 0;
#nacteni argumentu
arguments = sys.argv
argument_index = 0;
#kam se maji ukladat jednotliva csv a json data a logy
JSON_PATH = WORKDIR+'/data/json/'
LOG_PATH = WORKDIR+'/logs/'
#funkce pro povoleni zapisu dat do json
def enableJson():
global write_json
write_json = 1
return 0
#funkce pro povoleni zapisu dat do databaze
def enableDB():
global write_db
write_db = 1
return 0
#funkce pro vypnuti historie json souboru
def discardOldJsonData():
global keep_last
keep_last = 1
return 0
#funkce pro nacteni prihlasovacich udaju a dalsich metadat ze souboru
def jsonLogin(index):
global APPEUI, APPID, PSW, DB_HOST, DB_NAME, DB_USER, DB_PASS, DB_TABLE
loaded_json = ''
login_json = arguments[index + 1]
with open(login_json, 'r') as file:
for line in file:
loaded_json += line
login_creds = json.loads(loaded_json)
APPEUI = login_creds['appeui']
APPID = login_creds['appid']
PSW = login_creds['psw']
try:
DB_HOST = login_creds['DB_HOST']
DB_NAME = login_creds['DB_NAME']
DB_USER = login_creds['DB_USER']
DB_PASS = login_creds['DB_PASS']
DB_TABLE = login_creds['DB_TABLE']
except:
pass
return 1
#nastav APPID
def setAppId(index):
global APPID
APPID = arguments[index + 1]
return 1
#nastav APPEUI
def setAppEui(index):
global APPEUI
APPEUI = arguments[index + 1]
return 1
#nastav APPKEY
def setPsw(index):
global PSW
PSW = arguments[index + 1]
return 1
#nastav adresu databazoveho serveru
def set_dbHost(index):
global DB_HOST
DB_HOST = arguments[index + 1]
return 1
#nastav jmeno databaze
def set_dbName(index):
global DB_NAME
DB_NAME = arguments[index + 1]
return 1
#nastav uzivatele databaze
def set_dbUser(index):
global DB_USER
DB_USER = arguments[index + 1]
return 1
#nastav heslo pro databazoveho uzivatele
def set_dbPass(index):
global DB_PASS
DB_PASS = arguments[index + 1]
return 1
#zobraz help pro vsecky argumenty
def appHelp():
print('mqtt_sub.py [OPTIONS]')
print(' -j Save messages to json, json per message')
print(' -do Discard old jsons, also json is not saved with time in name')
print(' -db Save data to postgres database')
print(' -dbhost Database hostmane')
print(' -dbname Database name')
print(' -dbuser Database user')
print(' -dbpass Database password')
print(' -l [FILE] Load login data from specified')
print(' -s [SERVER] MQTT server address')
print(' -a [ID] Id of application on server')
print(' -p [PASS] API key for mqtt')
print(' -h print this help message')
exit()
#zkontroluj, zda souborova cesta existuje
def checkPath(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
#konverze bajtu na cislo
def bytes_to_decimal(i,d):
xx = i - 127
dec = (-d if xx < 0 else d)/100
return xx + dec
#co se ma provest pri pripojeni na mqtt server
def on_connect(client, userdata, flags, rc):
client.subscribe('application/+/device/+/event/up')
#co se ma stat, kdyz prijme mqtt packet
def on_message(client, userdata, msg):
#nacteni zpravy
j_msg = json.loads(msg.payload.decode('utf-8'))
#nacteni identifikace zarizeni, ze ktereho zprava prisla
app_name = j_msg['deviceInfo']['applicationName']
dev_name = j_msg['deviceInfo']['deviceName']
app_id = j_msg['deviceInfo']['applicationId']
dev_eui = j_msg['deviceInfo']['devEui']
msg_b64_data = j_msg['data']
if app_name != "comms":
return
#nacteni metadat zpravy
rx_metadata_in = j_msg['rxInfo']
rx_metadata = []
for gateway in range(len(rx_metadata_in)):
rx_metadata.append({})
try:
rx_metadata[gateway]['gateway'] = rx_metadata_in[gateway]['gatewayId']
except:
rx_metadata[gateway]['gateway'] = 'Unknown'
try:
rx_metadata[gateway]['time'] = rx_metadata_in[gateway]['gwTime']
except:
rx_metadata[gateway]['time'] = 'Unknown'
try:
rx_metadata[gateway]['rssi'] = rx_metadata_in[gateway]['rssi']
except:
rx_metadata[gateway]['rssi'] = 'Unknown'
try:
rx_metadata[gateway]['snr'] = rx_metadata_in[gateway]['snr']
except:
rx_metadata[gateway]['snr'] = 'Unknown'
try:
frame_port = j_msg['fPort']
frame_count = j_msg['fCnt']
except:
frame_port = 0;
frame_count = 0;
#nacteni obsahu zpravy
decoded_payload_dict = j_msg['object']
#priprava pro json, co se ulozi
message_dict = {
'device': dev_name,
'devEui': dev_eui,
'app': app_name,
'appId': app_id,
'rx_metadata': rx_metadata,
'frame_port': frame_port,
'frame_count': frame_count,
'data': msg_b64_data,
'payload': decoded_payload_dict
}
#print(message_dict)
#ulozeni zpravy pro preposlani do json
json_out = json.dumps(message_dict, indent = 4)
with open('ZKJ_tmp_message_'+str(int(decoded_payload_dict["msgno"]))+'.json','w') as file:
file.write(json_out)
if decoded_payload_dict["msgno"] == decoded_payload_dict["msgprts"]:
for i in range(1,33):
try:
with open('ZKJ_tmp_message_'+str(i)+'.json','r') as file:
loaded_json = ''
for line in file:
loaded_json += line
msg = json.loads(loaded_json)
#print(msg)
downlink = {
'devEui': msg['devEui'],
'confirmed': False,
'fPort': 1,
'data': msg['data']
}
try:
for device in DEVICES:
lst = list(downlink['data'])
lst[7] = 'f'
downlink['data'] = ''.join(lst)
ttn_client.publish("application/"+msg['appId']+"/device/"+device+"/command/down", json.dumps(downlink, indent = 4))
except Exception as e: print(e)
except:
pass
#ulozeni dat do json
if write_json == 1:
json_out = json.dumps(message_dict, indent = 4)
checkPath(JSON_PATH)
if keep_last == 1:
with open(JSON_PATH+'ZKJ_Comms_'+dev_id+'.json','w') as file:
file.write(json_out)
else:
with open(JSON_PATH+'ZKJ_Comms_'+dev_id+'_'+str(datetime.datetime.now())+'.json','w') as file:
file.write(json_out)
#ulozeni dat do databaze
if write_db == 1:
#orezani jmena zarizeni na pouze modelovy nazev
with open(LOG_PATH+'app.log','a') as log:
log.write("dev_name: "+dev_name+"\n")
#datovy string co se ma ulozit do databaze
db_string = dev_name+","+decoded_payload_dict['message']
#header sloupccu databazove tabulky, pouzije se pouze, kdyz tabulka nebude existovat
db_columns = "Id SERIAL PRIMARY KEY, "+\
"DevName VARCHAR(100), "+\
"Message VARCHAR(100), "+\
"created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP"
#header sloupcu do kterych se ma zapsat db_string. db_string a db_col_wrt musi mit stejne poradi headeru a dat
db_col_wrt = "DevName,"+\
"Message"
#pripojeni na databazi, kontrola zda tabulka existuje a nasledne zapsani dat
con = psycopg2.connect("host="+DB_HOST+" dbname="+DB_NAME+" user="+DB_USER+" password="+DB_PASS)
cur = con.cursor()
cur.execute("SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = \'"+DB_TABLE+"\');")
if not cur.fetchone()[0]:
try:
cur.execute("CREATE TABLE "+DB_TABLE+"("+db_columns+")")
cur.execute("INSERT INTO "+DB_TABLE+"("+db_col_wrt+") VALUES("+db_string+")")
con.commit()
except (psycopg2.DatabaseError):
if con:
con.rollback()
sys.exit(1)
finally:
if con:
con.close()
else:
try:
cur.execute("INSERT INTO "+DB_TABLE+"("+db_col_wrt+") VALUES("+db_string+")")
con.commit()
except (psycopg2.DatabaseError):
if con:
con.rollback()
sys.exit(1)
finally:
if con:
con.close()
print("Starting app")
#kontrola, zda application log soubor existuje
checkPath(LOG_PATH)
try:
with open(LOG_PATH+"app.log","a") as log:
pass
except:
with open(LOG_PATH+"app.log","w") as log:
log.write('LOG_START\n')
#proskenovani jednotlivych argumentu
for arg in range(1, len(sys.argv)):
if arguments[arg] == '-j':
arg += enableJson()
elif arguments[arg] == '-do':
arg += discardOldJsonData()
elif arguments[arg] == '-l':
arg += jsonLogin(arg)
elif arguments[arg] == '-a':
arg += setAppId(arg)
elif arguments[arg] == '-s':
arg += setAppEui(arg)
elif arguments[arg] == '-p':
arg += setPsw(arg)
elif arguments[arg] == '-h':
appHelp()
elif arguments[arg] == '-db':
arg += enableDB()
elif arguments[arg] == '-dbhost':
arg += set_dbHost(arg)
elif arguments[arg] == '-dbname':
arg += set_dbName(arg)
elif arguments[arg] == '-dbuser':
arg += set_dbUser(arg)
elif arguments[arg] == '-dbpass':
arg += set_dbPass(arg)
else:
pass
#Init message
print('MQTT Subscribtion started')
print('Time: ', str(datetime.datetime.now()))
print('Server: ', APPEUI)
print('appId: ', APPID)
print('psw: ', PSW)
print('')
with open(LOG_PATH+'app.log','a') as log:
log.write("\n\n["+str(datetime.datetime.now())+"] MQTT Subscribtion started\n")
log.write("["+str(datetime.datetime.now())+"] Time: "+str(datetime.datetime.now())+"\n")
log.write("["+str(datetime.datetime.now())+"] Server: "+APPEUI+"\n")
log.write("["+str(datetime.datetime.now())+"] appId: "+APPID+"\n")
log.write("["+str(datetime.datetime.now())+"] psw: "+PSW+"\n")
if write_json == 1:
print('Saving data to json')
if keep_last == 0:
print('Keeping only the newest json')
else:
print('Keeping all jsons')
print('Json path: '+JSON_PATH)
print('')
with open(LOG_PATH+'app.log','a') as log:
log.write("["+str(datetime.datetime.now())+"] Saving data to json\n")
if keep_last == 0:
log.write("["+str(datetime.datetime.now())+"] Keeping only the newest json\n")
else:
log.write("["+str(datetime.datetime.now())+"] Keeping all jsons\n")
log.write("["+str(datetime.datetime.now())+"] Json path: "+JSON_PATH+"\n")
if write_db == 1:
print('Saving data to postgreSQL database')
print('Server: ', DB_HOST)
print('Database: ', DB_NAME)
print('User: ', DB_USER)
print('Password: ', DB_PASS)
print('')
with open(LOG_PATH+'app.log','a') as log:
log.write("["+str(datetime.datetime.now())+"] Saving data to postgreSQL database\n")
log.write("["+str(datetime.datetime.now())+"] Server: "+DB_HOST+"\n")
log.write("["+str(datetime.datetime.now())+"] Database: "+DB_NAME+"\n")
log.write("["+str(datetime.datetime.now())+"] User: "+DB_USER+"\n")
log.write("["+str(datetime.datetime.now())+"] Password: "+DB_PASS+"\n")
# set paho.mqtt callback
ttn_client = mqtt.Client()
ttn_client.on_connect = on_connect
ttn_client.on_message = on_message
ttn_client.username_pw_set(APPID, PSW)
ttn_client.connect(APPEUI, PORT, 60) #MQTT port over TLS
try:
ttn_client.loop_forever()
except KeyboardInterrupt:
print('disconnect')
with open(LOG_PATH+'app.log','a') as log:
log.write("["+str(datetime.datetime.now())+"] disconnect\n")
ttn_client.disconnect()