417 lines
13 KiB
Python
Executable File
417 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Mon Mar 28 19:30:56 2022
|
|
|
|
@author: angoosh
|
|
"""
|
|
|
|
import paho.mqtt.client as mqtt #knihovna pro prijem mqtt packetu
|
|
import paho.mqtt.publish as publish
|
|
import json #knihovna pro praci s json formatem
|
|
import datetime #knihovna pro nacteni casu
|
|
import os, os.path #os knihovna
|
|
import errno #knihovna pro chyby
|
|
import sys #knihovna pro praci s interpretrem
|
|
import psycopg2 #knihovna pro praci s postgresql databazi
|
|
import re #regex
|
|
|
|
#nacti aktualni pracovni slozku
|
|
WORKDIR = os.getcwd()
|
|
|
|
DEVICES=["2cf7f1203230a02e"]
|
|
|
|
#MQTT port
|
|
PORT = 1884
|
|
APPID = ""
|
|
PSW = ""
|
|
#promenne pro jednotlive moznosti zapisu
|
|
write_json = 0
|
|
write_db = 0
|
|
keep_last = 0;
|
|
|
|
#nacteni argumentu
|
|
arguments = sys.argv
|
|
argument_index = 0;
|
|
|
|
#kam se maji ukladat jednotliva csv a json data a logy
|
|
JSON_PATH = WORKDIR+'/data/json/'
|
|
LOG_PATH = WORKDIR+'/logs/'
|
|
|
|
#funkce pro povoleni zapisu dat do json
|
|
def enableJson():
|
|
global write_json
|
|
write_json = 1
|
|
return 0
|
|
|
|
#funkce pro povoleni zapisu dat do databaze
|
|
def enableDB():
|
|
global write_db
|
|
write_db = 1
|
|
return 0
|
|
|
|
#funkce pro vypnuti historie json souboru
|
|
def discardOldJsonData():
|
|
global keep_last
|
|
keep_last = 1
|
|
return 0
|
|
|
|
#funkce pro nacteni prihlasovacich udaju a dalsich metadat ze souboru
|
|
def jsonLogin(index):
|
|
global APPEUI, APPID, PSW, DB_HOST, DB_NAME, DB_USER, DB_PASS, DB_TABLE
|
|
loaded_json = ''
|
|
login_json = arguments[index + 1]
|
|
|
|
with open(login_json, 'r') as file:
|
|
for line in file:
|
|
loaded_json += line
|
|
login_creds = json.loads(loaded_json)
|
|
APPEUI = login_creds['appeui']
|
|
APPID = login_creds['appid']
|
|
PSW = login_creds['psw']
|
|
try:
|
|
DB_HOST = login_creds['DB_HOST']
|
|
DB_NAME = login_creds['DB_NAME']
|
|
DB_USER = login_creds['DB_USER']
|
|
DB_PASS = login_creds['DB_PASS']
|
|
DB_TABLE = login_creds['DB_TABLE']
|
|
except:
|
|
pass
|
|
return 1
|
|
|
|
#nastav APPID
|
|
def setAppId(index):
|
|
global APPID
|
|
APPID = arguments[index + 1]
|
|
return 1
|
|
|
|
#nastav APPEUI
|
|
def setAppEui(index):
|
|
global APPEUI
|
|
APPEUI = arguments[index + 1]
|
|
return 1
|
|
|
|
#nastav APPKEY
|
|
def setPsw(index):
|
|
global PSW
|
|
PSW = arguments[index + 1]
|
|
return 1
|
|
|
|
#nastav adresu databazoveho serveru
|
|
def set_dbHost(index):
|
|
global DB_HOST
|
|
DB_HOST = arguments[index + 1]
|
|
return 1
|
|
|
|
#nastav jmeno databaze
|
|
def set_dbName(index):
|
|
global DB_NAME
|
|
DB_NAME = arguments[index + 1]
|
|
return 1
|
|
|
|
#nastav uzivatele databaze
|
|
def set_dbUser(index):
|
|
global DB_USER
|
|
DB_USER = arguments[index + 1]
|
|
return 1
|
|
|
|
#nastav heslo pro databazoveho uzivatele
|
|
def set_dbPass(index):
|
|
global DB_PASS
|
|
DB_PASS = arguments[index + 1]
|
|
return 1
|
|
|
|
#zobraz help pro vsecky argumenty
|
|
def appHelp():
|
|
print('mqtt_sub.py [OPTIONS]')
|
|
print(' -j Save messages to json, json per message')
|
|
print(' -do Discard old jsons, also json is not saved with time in name')
|
|
print(' -db Save data to postgres database')
|
|
print(' -dbhost Database hostmane')
|
|
print(' -dbname Database name')
|
|
print(' -dbuser Database user')
|
|
print(' -dbpass Database password')
|
|
print(' -l [FILE] Load login data from specified')
|
|
print(' -s [SERVER] MQTT server address')
|
|
print(' -a [ID] Id of application on server')
|
|
print(' -p [PASS] API key for mqtt')
|
|
print(' -h print this help message')
|
|
exit()
|
|
|
|
#zkontroluj, zda souborova cesta existuje
|
|
def checkPath(path):
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as exc: # Python >2.5
|
|
if exc.errno == errno.EEXIST and os.path.isdir(path):
|
|
pass
|
|
else: raise
|
|
|
|
#konverze bajtu na cislo
|
|
def bytes_to_decimal(i,d):
|
|
xx = i - 127
|
|
dec = (-d if xx < 0 else d)/100
|
|
return xx + dec
|
|
|
|
#co se ma provest pri pripojeni na mqtt server
|
|
def on_connect(client, userdata, flags, rc):
|
|
client.subscribe('application/+/device/+/event/up')
|
|
|
|
#co se ma stat, kdyz prijme mqtt packet
|
|
def on_message(client, userdata, msg):
|
|
|
|
#nacteni zpravy
|
|
j_msg = json.loads(msg.payload.decode('utf-8'))
|
|
|
|
#nacteni identifikace zarizeni, ze ktereho zprava prisla
|
|
app_name = j_msg['deviceInfo']['applicationName']
|
|
dev_name = j_msg['deviceInfo']['deviceName']
|
|
app_id = j_msg['deviceInfo']['applicationId']
|
|
dev_eui = j_msg['deviceInfo']['devEui']
|
|
|
|
msg_b64_data = j_msg['data']
|
|
|
|
if app_name != "comms":
|
|
return
|
|
|
|
#nacteni metadat zpravy
|
|
rx_metadata_in = j_msg['rxInfo']
|
|
rx_metadata = []
|
|
for gateway in range(len(rx_metadata_in)):
|
|
rx_metadata.append({})
|
|
try:
|
|
rx_metadata[gateway]['gateway'] = rx_metadata_in[gateway]['gatewayId']
|
|
except:
|
|
rx_metadata[gateway]['gateway'] = 'Unknown'
|
|
try:
|
|
rx_metadata[gateway]['time'] = rx_metadata_in[gateway]['gwTime']
|
|
except:
|
|
rx_metadata[gateway]['time'] = 'Unknown'
|
|
try:
|
|
rx_metadata[gateway]['rssi'] = rx_metadata_in[gateway]['rssi']
|
|
except:
|
|
rx_metadata[gateway]['rssi'] = 'Unknown'
|
|
try:
|
|
rx_metadata[gateway]['snr'] = rx_metadata_in[gateway]['snr']
|
|
except:
|
|
rx_metadata[gateway]['snr'] = 'Unknown'
|
|
|
|
try:
|
|
frame_port = j_msg['fPort']
|
|
frame_count = j_msg['fCnt']
|
|
except:
|
|
frame_port = 0;
|
|
frame_count = 0;
|
|
|
|
#nacteni obsahu zpravy
|
|
decoded_payload_dict = j_msg['object']
|
|
|
|
#priprava pro json, co se ulozi
|
|
message_dict = {
|
|
'device': dev_name,
|
|
'devEui': dev_eui,
|
|
'app': app_name,
|
|
'appId': app_id,
|
|
'rx_metadata': rx_metadata,
|
|
'frame_port': frame_port,
|
|
'frame_count': frame_count,
|
|
'data': msg_b64_data,
|
|
'payload': decoded_payload_dict
|
|
}
|
|
|
|
#print(message_dict)
|
|
#ulozeni zpravy pro preposlani do json
|
|
json_out = json.dumps(message_dict, indent = 4)
|
|
with open('ZKJ_tmp_message_'+str(int(decoded_payload_dict["msgno"]))+'.json','w') as file:
|
|
file.write(json_out)
|
|
|
|
if decoded_payload_dict["msgno"] == decoded_payload_dict["msgprts"]:
|
|
for i in range(1,33):
|
|
try:
|
|
with open('ZKJ_tmp_message_'+str(i)+'.json','r') as file:
|
|
loaded_json = ''
|
|
for line in file:
|
|
loaded_json += line
|
|
msg = json.loads(loaded_json)
|
|
#print(msg)
|
|
downlink = {
|
|
'devEui': msg['devEui'],
|
|
'confirmed': False,
|
|
'fPort': 1,
|
|
'data': msg['data']
|
|
}
|
|
try:
|
|
for device in DEVICES:
|
|
lst = list(downlink['data'])
|
|
lst[7] = 'f'
|
|
downlink['data'] = ''.join(lst)
|
|
ttn_client.publish("application/"+msg['appId']+"/device/"+device+"/command/down", json.dumps(downlink, indent = 4))
|
|
except Exception as e: print(e)
|
|
except:
|
|
pass
|
|
|
|
#ulozeni dat do json
|
|
if write_json == 1:
|
|
json_out = json.dumps(message_dict, indent = 4)
|
|
|
|
checkPath(JSON_PATH)
|
|
if keep_last == 1:
|
|
with open(JSON_PATH+'ZKJ_Comms_'+dev_id+'.json','w') as file:
|
|
file.write(json_out)
|
|
else:
|
|
with open(JSON_PATH+'ZKJ_Comms_'+dev_id+'_'+str(datetime.datetime.now())+'.json','w') as file:
|
|
file.write(json_out)
|
|
|
|
#ulozeni dat do databaze
|
|
if write_db == 1:
|
|
#orezani jmena zarizeni na pouze modelovy nazev
|
|
|
|
with open(LOG_PATH+'app.log','a') as log:
|
|
log.write("dev_name: "+dev_name+"\n")
|
|
#datovy string co se ma ulozit do databaze
|
|
db_string = dev_name+","+decoded_payload_dict['message']
|
|
|
|
#header sloupccu databazove tabulky, pouzije se pouze, kdyz tabulka nebude existovat
|
|
db_columns = "Id SERIAL PRIMARY KEY, "+\
|
|
"DevName VARCHAR(100), "+\
|
|
"Message VARCHAR(100), "+\
|
|
"created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP"
|
|
|
|
#header sloupcu do kterych se ma zapsat db_string. db_string a db_col_wrt musi mit stejne poradi headeru a dat
|
|
db_col_wrt = "DevName,"+\
|
|
"Message"
|
|
|
|
|
|
#pripojeni na databazi, kontrola zda tabulka existuje a nasledne zapsani dat
|
|
con = psycopg2.connect("host="+DB_HOST+" dbname="+DB_NAME+" user="+DB_USER+" password="+DB_PASS)
|
|
cur = con.cursor()
|
|
cur.execute("SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = \'"+DB_TABLE+"\');")
|
|
if not cur.fetchone()[0]:
|
|
try:
|
|
cur.execute("CREATE TABLE "+DB_TABLE+"("+db_columns+")")
|
|
cur.execute("INSERT INTO "+DB_TABLE+"("+db_col_wrt+") VALUES("+db_string+")")
|
|
con.commit()
|
|
except (psycopg2.DatabaseError):
|
|
if con:
|
|
con.rollback()
|
|
|
|
sys.exit(1)
|
|
|
|
finally:
|
|
if con:
|
|
con.close()
|
|
else:
|
|
try:
|
|
cur.execute("INSERT INTO "+DB_TABLE+"("+db_col_wrt+") VALUES("+db_string+")")
|
|
con.commit()
|
|
except (psycopg2.DatabaseError):
|
|
if con:
|
|
con.rollback()
|
|
|
|
sys.exit(1)
|
|
|
|
finally:
|
|
if con:
|
|
con.close()
|
|
|
|
print("Starting app")
|
|
#kontrola, zda application log soubor existuje
|
|
checkPath(LOG_PATH)
|
|
try:
|
|
with open(LOG_PATH+"app.log","a") as log:
|
|
pass
|
|
except:
|
|
with open(LOG_PATH+"app.log","w") as log:
|
|
log.write('LOG_START\n')
|
|
|
|
#proskenovani jednotlivych argumentu
|
|
for arg in range(1, len(sys.argv)):
|
|
if arguments[arg] == '-j':
|
|
arg += enableJson()
|
|
elif arguments[arg] == '-do':
|
|
arg += discardOldJsonData()
|
|
elif arguments[arg] == '-l':
|
|
arg += jsonLogin(arg)
|
|
elif arguments[arg] == '-a':
|
|
arg += setAppId(arg)
|
|
elif arguments[arg] == '-s':
|
|
arg += setAppEui(arg)
|
|
elif arguments[arg] == '-p':
|
|
arg += setPsw(arg)
|
|
elif arguments[arg] == '-h':
|
|
appHelp()
|
|
elif arguments[arg] == '-db':
|
|
arg += enableDB()
|
|
elif arguments[arg] == '-dbhost':
|
|
arg += set_dbHost(arg)
|
|
elif arguments[arg] == '-dbname':
|
|
arg += set_dbName(arg)
|
|
elif arguments[arg] == '-dbuser':
|
|
arg += set_dbUser(arg)
|
|
elif arguments[arg] == '-dbpass':
|
|
arg += set_dbPass(arg)
|
|
else:
|
|
pass
|
|
|
|
#Init message
|
|
print('MQTT Subscribtion started')
|
|
print('Time: ', str(datetime.datetime.now()))
|
|
print('Server: ', APPEUI)
|
|
print('appId: ', APPID)
|
|
print('psw: ', PSW)
|
|
print('')
|
|
|
|
with open(LOG_PATH+'app.log','a') as log:
|
|
log.write("\n\n["+str(datetime.datetime.now())+"] MQTT Subscribtion started\n")
|
|
log.write("["+str(datetime.datetime.now())+"] Time: "+str(datetime.datetime.now())+"\n")
|
|
log.write("["+str(datetime.datetime.now())+"] Server: "+APPEUI+"\n")
|
|
log.write("["+str(datetime.datetime.now())+"] appId: "+APPID+"\n")
|
|
log.write("["+str(datetime.datetime.now())+"] psw: "+PSW+"\n")
|
|
|
|
if write_json == 1:
|
|
print('Saving data to json')
|
|
if keep_last == 0:
|
|
print('Keeping only the newest json')
|
|
else:
|
|
print('Keeping all jsons')
|
|
print('Json path: '+JSON_PATH)
|
|
print('')
|
|
|
|
with open(LOG_PATH+'app.log','a') as log:
|
|
log.write("["+str(datetime.datetime.now())+"] Saving data to json\n")
|
|
if keep_last == 0:
|
|
log.write("["+str(datetime.datetime.now())+"] Keeping only the newest json\n")
|
|
else:
|
|
log.write("["+str(datetime.datetime.now())+"] Keeping all jsons\n")
|
|
log.write("["+str(datetime.datetime.now())+"] Json path: "+JSON_PATH+"\n")
|
|
|
|
if write_db == 1:
|
|
print('Saving data to postgreSQL database')
|
|
print('Server: ', DB_HOST)
|
|
print('Database: ', DB_NAME)
|
|
print('User: ', DB_USER)
|
|
print('Password: ', DB_PASS)
|
|
print('')
|
|
|
|
with open(LOG_PATH+'app.log','a') as log:
|
|
log.write("["+str(datetime.datetime.now())+"] Saving data to postgreSQL database\n")
|
|
log.write("["+str(datetime.datetime.now())+"] Server: "+DB_HOST+"\n")
|
|
log.write("["+str(datetime.datetime.now())+"] Database: "+DB_NAME+"\n")
|
|
log.write("["+str(datetime.datetime.now())+"] User: "+DB_USER+"\n")
|
|
log.write("["+str(datetime.datetime.now())+"] Password: "+DB_PASS+"\n")
|
|
|
|
# set paho.mqtt callback
|
|
ttn_client = mqtt.Client()
|
|
ttn_client.on_connect = on_connect
|
|
ttn_client.on_message = on_message
|
|
ttn_client.username_pw_set(APPID, PSW)
|
|
ttn_client.connect(APPEUI, PORT, 60) #MQTT port over TLS
|
|
|
|
try:
|
|
ttn_client.loop_forever()
|
|
except KeyboardInterrupt:
|
|
print('disconnect')
|
|
with open(LOG_PATH+'app.log','a') as log:
|
|
log.write("["+str(datetime.datetime.now())+"] disconnect\n")
|
|
ttn_client.disconnect()
|