#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Mar 28 19:30:56 2022 @author: angoosh """ import paho.mqtt.client as mqtt #knihovna pro prijem mqtt packetu import paho.mqtt.publish as publish import json #knihovna pro praci s json formatem import datetime #knihovna pro nacteni casu import os, os.path #os knihovna import errno #knihovna pro chyby import sys #knihovna pro praci s interpretrem import psycopg2 #knihovna pro praci s postgresql databazi import re #regex #nacti aktualni pracovni slozku WORKDIR = os.getcwd() DEVICES=["2cf7f1203230a02e"] #MQTT port PORT = 1884 APPID = "" PSW = "" #promenne pro jednotlive moznosti zapisu write_json = 0 write_db = 0 keep_last = 0; #nacteni argumentu arguments = sys.argv argument_index = 0; #kam se maji ukladat jednotliva csv a json data a logy JSON_PATH = WORKDIR+'/data/json/' LOG_PATH = WORKDIR+'/logs/' #funkce pro povoleni zapisu dat do json def enableJson(): global write_json write_json = 1 return 0 #funkce pro povoleni zapisu dat do databaze def enableDB(): global write_db write_db = 1 return 0 #funkce pro vypnuti historie json souboru def discardOldJsonData(): global keep_last keep_last = 1 return 0 #funkce pro nacteni prihlasovacich udaju a dalsich metadat ze souboru def jsonLogin(index): global APPEUI, APPID, PSW, DB_HOST, DB_NAME, DB_USER, DB_PASS, DB_TABLE loaded_json = '' login_json = arguments[index + 1] with open(login_json, 'r') as file: for line in file: loaded_json += line login_creds = json.loads(loaded_json) APPEUI = login_creds['appeui'] APPID = login_creds['appid'] PSW = login_creds['psw'] try: DB_HOST = login_creds['DB_HOST'] DB_NAME = login_creds['DB_NAME'] DB_USER = login_creds['DB_USER'] DB_PASS = login_creds['DB_PASS'] DB_TABLE = login_creds['DB_TABLE'] except: pass return 1 #nastav APPID def setAppId(index): global APPID APPID = arguments[index + 1] return 1 #nastav APPEUI def setAppEui(index): global APPEUI APPEUI = arguments[index + 1] return 1 #nastav APPKEY def setPsw(index): global PSW PSW = arguments[index + 1] return 1 #nastav adresu databazoveho serveru def set_dbHost(index): global DB_HOST DB_HOST = arguments[index + 1] return 1 #nastav jmeno databaze def set_dbName(index): global DB_NAME DB_NAME = arguments[index + 1] return 1 #nastav uzivatele databaze def set_dbUser(index): global DB_USER DB_USER = arguments[index + 1] return 1 #nastav heslo pro databazoveho uzivatele def set_dbPass(index): global DB_PASS DB_PASS = arguments[index + 1] return 1 #zobraz help pro vsecky argumenty def appHelp(): print('mqtt_sub.py [OPTIONS]') print(' -j Save messages to json, json per message') print(' -do Discard old jsons, also json is not saved with time in name') print(' -db Save data to postgres database') print(' -dbhost Database hostmane') print(' -dbname Database name') print(' -dbuser Database user') print(' -dbpass Database password') print(' -l [FILE] Load login data from specified') print(' -s [SERVER] MQTT server address') print(' -a [ID] Id of application on server') print(' -p [PASS] API key for mqtt') print(' -h print this help message') exit() #zkontroluj, zda souborova cesta existuje def checkPath(path): try: os.makedirs(path) except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise #konverze bajtu na cislo def bytes_to_decimal(i,d): xx = i - 127 dec = (-d if xx < 0 else d)/100 return xx + dec #co se ma provest pri pripojeni na mqtt server def on_connect(client, userdata, flags, rc): client.subscribe('application/+/device/+/event/up') #co se ma stat, kdyz prijme mqtt packet def on_message(client, userdata, msg): #nacteni zpravy j_msg = json.loads(msg.payload.decode('utf-8')) #nacteni identifikace zarizeni, ze ktereho zprava prisla app_name = j_msg['deviceInfo']['applicationName'] dev_name = j_msg['deviceInfo']['deviceName'] app_id = j_msg['deviceInfo']['applicationId'] dev_eui = j_msg['deviceInfo']['devEui'] msg_b64_data = j_msg['data'] if app_name != "comms": return #nacteni metadat zpravy rx_metadata_in = j_msg['rxInfo'] rx_metadata = [] for gateway in range(len(rx_metadata_in)): rx_metadata.append({}) try: rx_metadata[gateway]['gateway'] = rx_metadata_in[gateway]['gatewayId'] except: rx_metadata[gateway]['gateway'] = 'Unknown' try: rx_metadata[gateway]['time'] = rx_metadata_in[gateway]['gwTime'] except: rx_metadata[gateway]['time'] = 'Unknown' try: rx_metadata[gateway]['rssi'] = rx_metadata_in[gateway]['rssi'] except: rx_metadata[gateway]['rssi'] = 'Unknown' try: rx_metadata[gateway]['snr'] = rx_metadata_in[gateway]['snr'] except: rx_metadata[gateway]['snr'] = 'Unknown' try: frame_port = j_msg['fPort'] frame_count = j_msg['fCnt'] except: frame_port = 0; frame_count = 0; #nacteni obsahu zpravy decoded_payload_dict = j_msg['object'] #priprava pro json, co se ulozi message_dict = { 'device': dev_name, 'devEui': dev_eui, 'app': app_name, 'appId': app_id, 'rx_metadata': rx_metadata, 'frame_port': frame_port, 'frame_count': frame_count, 'data': msg_b64_data, 'payload': decoded_payload_dict } #print(message_dict) #ulozeni zpravy pro preposlani do json json_out = json.dumps(message_dict, indent = 4) with open('ZKJ_tmp_message_'+str(int(decoded_payload_dict["msgno"]))+'.json','w') as file: file.write(json_out) if decoded_payload_dict["msgno"] == decoded_payload_dict["msgprts"]: for i in range(1,33): try: with open('ZKJ_tmp_message_'+str(i)+'.json','r') as file: loaded_json = '' for line in file: loaded_json += line msg = json.loads(loaded_json) #print(msg) downlink = { 'devEui': msg['devEui'], 'confirmed': False, 'fPort': 1, 'data': msg['data'] } try: for device in DEVICES: lst = list(downlink['data']) lst[7] = 'f' downlink['data'] = ''.join(lst) ttn_client.publish("application/"+msg['appId']+"/device/"+device+"/command/down", json.dumps(downlink, indent = 4)) except Exception as e: print(e) except: pass #ulozeni dat do json if write_json == 1: json_out = json.dumps(message_dict, indent = 4) checkPath(JSON_PATH) if keep_last == 1: with open(JSON_PATH+'ZKJ_Comms_'+dev_id+'.json','w') as file: file.write(json_out) else: with open(JSON_PATH+'ZKJ_Comms_'+dev_id+'_'+str(datetime.datetime.now())+'.json','w') as file: file.write(json_out) #ulozeni dat do databaze if write_db == 1: #orezani jmena zarizeni na pouze modelovy nazev with open(LOG_PATH+'app.log','a') as log: log.write("dev_name: "+dev_name+"\n") #datovy string co se ma ulozit do databaze db_string = dev_name+","+decoded_payload_dict['message'] #header sloupccu databazove tabulky, pouzije se pouze, kdyz tabulka nebude existovat db_columns = "Id SERIAL PRIMARY KEY, "+\ "DevName VARCHAR(100), "+\ "Message VARCHAR(100), "+\ "created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP" #header sloupcu do kterych se ma zapsat db_string. db_string a db_col_wrt musi mit stejne poradi headeru a dat db_col_wrt = "DevName,"+\ "Message" #pripojeni na databazi, kontrola zda tabulka existuje a nasledne zapsani dat con = psycopg2.connect("host="+DB_HOST+" dbname="+DB_NAME+" user="+DB_USER+" password="+DB_PASS) cur = con.cursor() cur.execute("SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = \'"+DB_TABLE+"\');") if not cur.fetchone()[0]: try: cur.execute("CREATE TABLE "+DB_TABLE+"("+db_columns+")") cur.execute("INSERT INTO "+DB_TABLE+"("+db_col_wrt+") VALUES("+db_string+")") con.commit() except (psycopg2.DatabaseError): if con: con.rollback() sys.exit(1) finally: if con: con.close() else: try: cur.execute("INSERT INTO "+DB_TABLE+"("+db_col_wrt+") VALUES("+db_string+")") con.commit() except (psycopg2.DatabaseError): if con: con.rollback() sys.exit(1) finally: if con: con.close() print("Starting app") #kontrola, zda application log soubor existuje checkPath(LOG_PATH) try: with open(LOG_PATH+"app.log","a") as log: pass except: with open(LOG_PATH+"app.log","w") as log: log.write('LOG_START\n') #proskenovani jednotlivych argumentu for arg in range(1, len(sys.argv)): if arguments[arg] == '-j': arg += enableJson() elif arguments[arg] == '-do': arg += discardOldJsonData() elif arguments[arg] == '-l': arg += jsonLogin(arg) elif arguments[arg] == '-a': arg += setAppId(arg) elif arguments[arg] == '-s': arg += setAppEui(arg) elif arguments[arg] == '-p': arg += setPsw(arg) elif arguments[arg] == '-h': appHelp() elif arguments[arg] == '-db': arg += enableDB() elif arguments[arg] == '-dbhost': arg += set_dbHost(arg) elif arguments[arg] == '-dbname': arg += set_dbName(arg) elif arguments[arg] == '-dbuser': arg += set_dbUser(arg) elif arguments[arg] == '-dbpass': arg += set_dbPass(arg) else: pass #Init message print('MQTT Subscribtion started') print('Time: ', str(datetime.datetime.now())) print('Server: ', APPEUI) print('appId: ', APPID) print('psw: ', PSW) print('') with open(LOG_PATH+'app.log','a') as log: log.write("\n\n["+str(datetime.datetime.now())+"] MQTT Subscribtion started\n") log.write("["+str(datetime.datetime.now())+"] Time: "+str(datetime.datetime.now())+"\n") log.write("["+str(datetime.datetime.now())+"] Server: "+APPEUI+"\n") log.write("["+str(datetime.datetime.now())+"] appId: "+APPID+"\n") log.write("["+str(datetime.datetime.now())+"] psw: "+PSW+"\n") if write_json == 1: print('Saving data to json') if keep_last == 0: print('Keeping only the newest json') else: print('Keeping all jsons') print('Json path: '+JSON_PATH) print('') with open(LOG_PATH+'app.log','a') as log: log.write("["+str(datetime.datetime.now())+"] Saving data to json\n") if keep_last == 0: log.write("["+str(datetime.datetime.now())+"] Keeping only the newest json\n") else: log.write("["+str(datetime.datetime.now())+"] Keeping all jsons\n") log.write("["+str(datetime.datetime.now())+"] Json path: "+JSON_PATH+"\n") if write_db == 1: print('Saving data to postgreSQL database') print('Server: ', DB_HOST) print('Database: ', DB_NAME) print('User: ', DB_USER) print('Password: ', DB_PASS) print('') with open(LOG_PATH+'app.log','a') as log: log.write("["+str(datetime.datetime.now())+"] Saving data to postgreSQL database\n") log.write("["+str(datetime.datetime.now())+"] Server: "+DB_HOST+"\n") log.write("["+str(datetime.datetime.now())+"] Database: "+DB_NAME+"\n") log.write("["+str(datetime.datetime.now())+"] User: "+DB_USER+"\n") log.write("["+str(datetime.datetime.now())+"] Password: "+DB_PASS+"\n") # set paho.mqtt callback ttn_client = mqtt.Client() ttn_client.on_connect = on_connect ttn_client.on_message = on_message ttn_client.username_pw_set(APPID, PSW) ttn_client.connect(APPEUI, PORT, 60) #MQTT port over TLS try: ttn_client.loop_forever() except KeyboardInterrupt: print('disconnect') with open(LOG_PATH+'app.log','a') as log: log.write("["+str(datetime.datetime.now())+"] disconnect\n") ttn_client.disconnect()