diff options
author | dennis <d@ennis.no> | 2023-01-31 20:59:38 +0100 |
---|---|---|
committer | dennis <d@ennis.no> | 2023-01-31 20:59:38 +0100 |
commit | 3499ea624b67559f2c9530db4e36635106e22336 (patch) | |
tree | e2a81e6ed2368df7f538dacd479e3d2b5f4dba8a /mqtt2pgsql.py | |
parent | handle HAN as well (diff) | |
download | energyscripts-3499ea624b67559f2c9530db4e36635106e22336.tar.gz |
added queue, in case db-connection is down
Diffstat (limited to '')
-rw-r--r-- | mqtt2queue.py (renamed from mqtt2pgsql.py) | 42 |
1 files changed, 11 insertions, 31 deletions
diff --git a/mqtt2pgsql.py b/mqtt2queue.py index 004216e..102b1eb 100644 --- a/mqtt2pgsql.py +++ b/mqtt2queue.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 import os -import sys import json -import psycopg2 +import pickle +from datetime import datetime +from litequeue import SQLQueue import paho.mqtt.client as mqtt mqtt_server = os.environ['el_mqtt_server'] @@ -11,12 +12,7 @@ mqtt_port = int(os.environ['el_mqtt_port']) keepalive = int(os.environ['el_mqtt_keepalive']) mqtt_topic = os.environ['el_mqtt_topic'] -pg_db = os.environ['el_pg_db'] -pg_host = os.environ['el_pg_host'] -pg_user = os.environ['el_pg_user'] -pg_pass = os.environ['el_pg_pass'] -pg_table = "mqtt_temps" - +q = SQLQueue("litequeue.db", maxsize=None) # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): @@ -30,32 +26,16 @@ def on_connect(client, userdata, flags, rc): def on_message(client, userdata, msg): name = msg.topic.split('/')[1] data = json.loads(msg.payload) + if name.startswith('tmp') and 'temperature' in data and 'humidity' in data: - sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)" - values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage']) - todb(sql,values) + sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)" + values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow()) + q.put(pickle.dumps([sql, values])) elif name == 'HAN' and 'current' in data: - sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality) VALUES(%s,%s,%s,%s,%s)" - values = (name, data['current'], data['power'], data['voltage'], data['linkquality']) - todb(sql,values) - - -# Write values to database -def todb(sql, values): - print(values) - conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass) - cur = conn.cursor() - try: - cur.execute(sql, values) - conn.commit() - except (Exception, psycopg2.Error) as error: - conn.rollback() - print("Error while connecting to PostgreSQL", error) - raise error - finally: - cur.close() - conn.close() + sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)" + values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow()) + q.put(pickle.dumps([sql, values])) |