diff options
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | mqtt2queue.py (renamed from mqtt2pgsql.py) | 42 | ||||
-rw-r--r-- | mqtt2queue.service (renamed from mqtt2pgsql.service) | 6 | ||||
-rw-r--r-- | queue2pgsql.py | 35 | ||||
-rw-r--r-- | queue2pgsql.service | 17 | ||||
-rw-r--r-- | requirements.txt | 4 | ||||
-rw-r--r-- | todb.py | 19 |
7 files changed, 92 insertions, 36 deletions
@@ -1,3 +1,8 @@ +# queue +litequeue.db +litequeue.db-shm +litequeue.db-wal + # Environments .env .venv diff --git a/mqtt2pgsql.py b/mqtt2queue.py index 004216e..102b1eb 100644 --- a/mqtt2pgsql.py +++ b/mqtt2queue.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 import os -import sys import json -import psycopg2 +import pickle +from datetime import datetime +from litequeue import SQLQueue import paho.mqtt.client as mqtt mqtt_server = os.environ['el_mqtt_server'] @@ -11,12 +12,7 @@ mqtt_port = int(os.environ['el_mqtt_port']) keepalive = int(os.environ['el_mqtt_keepalive']) mqtt_topic = os.environ['el_mqtt_topic'] -pg_db = os.environ['el_pg_db'] -pg_host = os.environ['el_pg_host'] -pg_user = os.environ['el_pg_user'] -pg_pass = os.environ['el_pg_pass'] -pg_table = "mqtt_temps" - +q = SQLQueue("litequeue.db", maxsize=None) # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): @@ -30,32 +26,16 @@ def on_connect(client, userdata, flags, rc): def on_message(client, userdata, msg): name = msg.topic.split('/')[1] data = json.loads(msg.payload) + if name.startswith('tmp') and 'temperature' in data and 'humidity' in data: - sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)" - values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage']) - todb(sql,values) + sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)" + values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow()) + q.put(pickle.dumps([sql, values])) elif name == 'HAN' and 'current' in data: - sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality) VALUES(%s,%s,%s,%s,%s)" - values = (name, data['current'], data['power'], data['voltage'], data['linkquality']) - todb(sql,values) - - -# Write values to database -def todb(sql, values): - print(values) - conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass) - cur = conn.cursor() - try: - cur.execute(sql, values) - conn.commit() - except (Exception, psycopg2.Error) as error: - conn.rollback() - print("Error while connecting to PostgreSQL", error) - raise error - finally: - cur.close() - conn.close() + sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)" + values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow()) + q.put(pickle.dumps([sql, values])) diff --git a/mqtt2pgsql.service b/mqtt2queue.service index d400b10..a63c994 100644 --- a/mqtt2pgsql.service +++ b/mqtt2queue.service @@ -7,13 +7,11 @@ StartLimitBurst=5 Type=simple EnvironmentFile=%h/energyscripts/.env WorkingDirectory=%h/energyscripts -ExecStart=%h/energyscripts/venv/bin/python3 -u mqtt2pgsql.py +ExecStart=%h/energyscripts/venv/bin/python3 -u mqtt2queue.py Restart=on-failure TimeoutStopSec=70 RestartSec=30 -SyslogIdentifier=mqtt2pgsql -#StandardOutput=journal -#StandardError=journal +SyslogIdentifier=mqtt2queue [Install] WantedBy=default.target diff --git a/queue2pgsql.py b/queue2pgsql.py new file mode 100644 index 0000000..0c95f6d --- /dev/null +++ b/queue2pgsql.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 + +import os +import time +import pickle +import sqlite3 +from litequeue import SQLQueue +from datetime import datetime + +from todb import todb + +# Unlock all +con = sqlite3.connect("litequeue.db") +cur = con.cursor() +cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") +con.commit() +con.close() + +# Open Queue +q = SQLQueue("litequeue.db", maxsize=None) +q.prune() + +# Event loop +while True: + if q.empty() == True: + print("Queue empty. Sleeping 10s") + time.sleep(10) + continue + task = q.pop() + raw = pickle.loads(task['message']) + todb(raw[0],raw[1]) + print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left") + q.done(task['message_id']) + +print("END") diff --git a/queue2pgsql.service b/queue2pgsql.service new file mode 100644 index 0000000..49e4066 --- /dev/null +++ b/queue2pgsql.service @@ -0,0 +1,17 @@ +[Unit] +Description=Simple service to start python-listener +StartLimitIntervalSec=100 +StartLimitBurst=5 + +[Service] +Type=simple +EnvironmentFile=%h/energyscripts/.env +WorkingDirectory=%h/energyscripts +ExecStart=%h/energyscripts/venv/bin/python3 -u queue2pgsql.py +Restart=on-failure +TimeoutStopSec=70 +RestartSec=30 +SyslogIdentifier=queue2pgsql + +[Install] +WantedBy=default.target diff --git a/requirements.txt b/requirements.txt index aa7e4a9..0dd3ee4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ async-property==0.2.1 +async-timeout==4.0.2 certifi==2022.12.7 charset-normalizer==3.0.1 idna==3.4 +litequeue==0.5 neohubapi==1.0 paho-mqtt==1.6.1 -psycopg2-binary==2.9.5 +psycopg==3.1.8 requests==2.28.2 urllib3==1.26.14 websockets==10.4 @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import os +import psycopg + +pg_db = os.environ['el_pg_db'] +pg_host = os.environ['el_pg_host'] +pg_user = os.environ['el_pg_user'] +pg_pass = os.environ['el_pg_pass'] + +def todb(sql, values): + with psycopg.connect(dbname=pg_db, host=pg_host, user=pg_user, password=pg_pass) as conn: + if type(values) == list: + conn.executemany(sql, values) + elif type(values) == tuple: + conn.execute(sql, values) + else: + print("OH SHIT") + exit() |