aboutsummaryrefslogtreecommitdiffstats
path: root/mqtt2pgsql.py
diff options
context:
space:
mode:
authordennis <d@ennis.no>2023-01-31 20:59:38 +0100
committerdennis <d@ennis.no>2023-01-31 20:59:38 +0100
commit3499ea624b67559f2c9530db4e36635106e22336 (patch)
treee2a81e6ed2368df7f538dacd479e3d2b5f4dba8a /mqtt2pgsql.py
parenthandle HAN as well (diff)
downloadenergyscripts-3499ea624b67559f2c9530db4e36635106e22336.tar.gz
added queue, in case db-connection is down
Diffstat (limited to '')
-rw-r--r--mqtt2queue.py (renamed from mqtt2pgsql.py)42
1 files changed, 11 insertions, 31 deletions
diff --git a/mqtt2pgsql.py b/mqtt2queue.py
index 004216e..102b1eb 100644
--- a/mqtt2pgsql.py
+++ b/mqtt2queue.py
@@ -1,9 +1,10 @@
#!/usr/bin/env python3
import os
-import sys
import json
-import psycopg2
+import pickle
+from datetime import datetime
+from litequeue import SQLQueue
import paho.mqtt.client as mqtt
mqtt_server = os.environ['el_mqtt_server']
@@ -11,12 +12,7 @@ mqtt_port = int(os.environ['el_mqtt_port'])
keepalive = int(os.environ['el_mqtt_keepalive'])
mqtt_topic = os.environ['el_mqtt_topic']
-pg_db = os.environ['el_pg_db']
-pg_host = os.environ['el_pg_host']
-pg_user = os.environ['el_pg_user']
-pg_pass = os.environ['el_pg_pass']
-pg_table = "mqtt_temps"
-
+q = SQLQueue("litequeue.db", maxsize=None)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
@@ -30,32 +26,16 @@ def on_connect(client, userdata, flags, rc):
def on_message(client, userdata, msg):
name = msg.topic.split('/')[1]
data = json.loads(msg.payload)
+
if name.startswith('tmp') and 'temperature' in data and 'humidity' in data:
- sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)"
- values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'])
- todb(sql,values)
+ sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)"
+ values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow())
+ q.put(pickle.dumps([sql, values]))
elif name == 'HAN' and 'current' in data:
- sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality) VALUES(%s,%s,%s,%s,%s)"
- values = (name, data['current'], data['power'], data['voltage'], data['linkquality'])
- todb(sql,values)
-
-
-# Write values to database
-def todb(sql, values):
- print(values)
- conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass)
- cur = conn.cursor()
- try:
- cur.execute(sql, values)
- conn.commit()
- except (Exception, psycopg2.Error) as error:
- conn.rollback()
- print("Error while connecting to PostgreSQL", error)
- raise error
- finally:
- cur.close()
- conn.close()
+ sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)"
+ values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow())
+ q.put(pickle.dumps([sql, values]))