aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--.gitignore5
-rw-r--r--mqtt2queue.py (renamed from mqtt2pgsql.py)42
-rw-r--r--mqtt2queue.service (renamed from mqtt2pgsql.service)6
-rw-r--r--queue2pgsql.py35
-rw-r--r--queue2pgsql.service17
-rw-r--r--requirements.txt4
-rw-r--r--todb.py19
7 files changed, 92 insertions, 36 deletions
diff --git a/.gitignore b/.gitignore
index 8535f83..af68312 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,8 @@
+# queue
+litequeue.db
+litequeue.db-shm
+litequeue.db-wal
+
# Environments
.env
.venv
diff --git a/mqtt2pgsql.py b/mqtt2queue.py
index 004216e..102b1eb 100644
--- a/mqtt2pgsql.py
+++ b/mqtt2queue.py
@@ -1,9 +1,10 @@
#!/usr/bin/env python3
import os
-import sys
import json
-import psycopg2
+import pickle
+from datetime import datetime
+from litequeue import SQLQueue
import paho.mqtt.client as mqtt
mqtt_server = os.environ['el_mqtt_server']
@@ -11,12 +12,7 @@ mqtt_port = int(os.environ['el_mqtt_port'])
keepalive = int(os.environ['el_mqtt_keepalive'])
mqtt_topic = os.environ['el_mqtt_topic']
-pg_db = os.environ['el_pg_db']
-pg_host = os.environ['el_pg_host']
-pg_user = os.environ['el_pg_user']
-pg_pass = os.environ['el_pg_pass']
-pg_table = "mqtt_temps"
-
+q = SQLQueue("litequeue.db", maxsize=None)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
@@ -30,32 +26,16 @@ def on_connect(client, userdata, flags, rc):
def on_message(client, userdata, msg):
name = msg.topic.split('/')[1]
data = json.loads(msg.payload)
+
if name.startswith('tmp') and 'temperature' in data and 'humidity' in data:
- sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)"
- values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'])
- todb(sql,values)
+ sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)"
+ values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow())
+ q.put(pickle.dumps([sql, values]))
elif name == 'HAN' and 'current' in data:
- sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality) VALUES(%s,%s,%s,%s,%s)"
- values = (name, data['current'], data['power'], data['voltage'], data['linkquality'])
- todb(sql,values)
-
-
-# Write values to database
-def todb(sql, values):
- print(values)
- conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass)
- cur = conn.cursor()
- try:
- cur.execute(sql, values)
- conn.commit()
- except (Exception, psycopg2.Error) as error:
- conn.rollback()
- print("Error while connecting to PostgreSQL", error)
- raise error
- finally:
- cur.close()
- conn.close()
+ sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)"
+ values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow())
+ q.put(pickle.dumps([sql, values]))
diff --git a/mqtt2pgsql.service b/mqtt2queue.service
index d400b10..a63c994 100644
--- a/mqtt2pgsql.service
+++ b/mqtt2queue.service
@@ -7,13 +7,11 @@ StartLimitBurst=5
Type=simple
EnvironmentFile=%h/energyscripts/.env
WorkingDirectory=%h/energyscripts
-ExecStart=%h/energyscripts/venv/bin/python3 -u mqtt2pgsql.py
+ExecStart=%h/energyscripts/venv/bin/python3 -u mqtt2queue.py
Restart=on-failure
TimeoutStopSec=70
RestartSec=30
-SyslogIdentifier=mqtt2pgsql
-#StandardOutput=journal
-#StandardError=journal
+SyslogIdentifier=mqtt2queue
[Install]
WantedBy=default.target
diff --git a/queue2pgsql.py b/queue2pgsql.py
new file mode 100644
index 0000000..0c95f6d
--- /dev/null
+++ b/queue2pgsql.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python3
+
+import os
+import time
+import pickle
+import sqlite3
+from litequeue import SQLQueue
+from datetime import datetime
+
+from todb import todb
+
+# Unlock all
+con = sqlite3.connect("litequeue.db")
+cur = con.cursor()
+cur.execute("UPDATE Queue SET status = 0 WHERE status = 1")
+con.commit()
+con.close()
+
+# Open Queue
+q = SQLQueue("litequeue.db", maxsize=None)
+q.prune()
+
+# Event loop
+while True:
+ if q.empty() == True:
+ print("Queue empty. Sleeping 10s")
+ time.sleep(10)
+ continue
+ task = q.pop()
+ raw = pickle.loads(task['message'])
+ todb(raw[0],raw[1])
+ print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left")
+ q.done(task['message_id'])
+
+print("END")
diff --git a/queue2pgsql.service b/queue2pgsql.service
new file mode 100644
index 0000000..49e4066
--- /dev/null
+++ b/queue2pgsql.service
@@ -0,0 +1,17 @@
+[Unit]
+Description=Simple service to start python-listener
+StartLimitIntervalSec=100
+StartLimitBurst=5
+
+[Service]
+Type=simple
+EnvironmentFile=%h/energyscripts/.env
+WorkingDirectory=%h/energyscripts
+ExecStart=%h/energyscripts/venv/bin/python3 -u queue2pgsql.py
+Restart=on-failure
+TimeoutStopSec=70
+RestartSec=30
+SyslogIdentifier=queue2pgsql
+
+[Install]
+WantedBy=default.target
diff --git a/requirements.txt b/requirements.txt
index aa7e4a9..0dd3ee4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,12 @@
async-property==0.2.1
+async-timeout==4.0.2
certifi==2022.12.7
charset-normalizer==3.0.1
idna==3.4
+litequeue==0.5
neohubapi==1.0
paho-mqtt==1.6.1
-psycopg2-binary==2.9.5
+psycopg==3.1.8
requests==2.28.2
urllib3==1.26.14
websockets==10.4
diff --git a/todb.py b/todb.py
new file mode 100644
index 0000000..d911a29
--- /dev/null
+++ b/todb.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python3
+
+import os
+import psycopg
+
+pg_db = os.environ['el_pg_db']
+pg_host = os.environ['el_pg_host']
+pg_user = os.environ['el_pg_user']
+pg_pass = os.environ['el_pg_pass']
+
+def todb(sql, values):
+ with psycopg.connect(dbname=pg_db, host=pg_host, user=pg_user, password=pg_pass) as conn:
+ if type(values) == list:
+ conn.executemany(sql, values)
+ elif type(values) == tuple:
+ conn.execute(sql, values)
+ else:
+ print("OH SHIT")
+ exit()