From 3499ea624b67559f2c9530db4e36635106e22336 Mon Sep 17 00:00:00 2001 From: dennis Date: Tue, 31 Jan 2023 20:59:38 +0100 Subject: added queue, in case db-connection is down --- .gitignore | 5 ++++ mqtt2pgsql.py | 73 ----------------------------------------------------- mqtt2pgsql.service | 19 -------------- mqtt2queue.py | 53 ++++++++++++++++++++++++++++++++++++++ mqtt2queue.service | 17 +++++++++++++ queue2pgsql.py | 35 +++++++++++++++++++++++++ queue2pgsql.service | 17 +++++++++++++ requirements.txt | 4 ++- todb.py | 19 ++++++++++++++ 9 files changed, 149 insertions(+), 93 deletions(-) delete mode 100644 mqtt2pgsql.py delete mode 100644 mqtt2pgsql.service create mode 100644 mqtt2queue.py create mode 100644 mqtt2queue.service create mode 100644 queue2pgsql.py create mode 100644 queue2pgsql.service create mode 100644 todb.py diff --git a/.gitignore b/.gitignore index 8535f83..af68312 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ +# queue +litequeue.db +litequeue.db-shm +litequeue.db-wal + # Environments .env .venv diff --git a/mqtt2pgsql.py b/mqtt2pgsql.py deleted file mode 100644 index 004216e..0000000 --- a/mqtt2pgsql.py +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env python3 - -import os -import sys -import json -import psycopg2 -import paho.mqtt.client as mqtt - -mqtt_server = os.environ['el_mqtt_server'] -mqtt_port = int(os.environ['el_mqtt_port']) -keepalive = int(os.environ['el_mqtt_keepalive']) -mqtt_topic = os.environ['el_mqtt_topic'] - -pg_db = os.environ['el_pg_db'] -pg_host = os.environ['el_pg_host'] -pg_user = os.environ['el_pg_user'] -pg_pass = os.environ['el_pg_pass'] -pg_table = "mqtt_temps" - - -# The callback for when the client receives a CONNACK response from the server. -def on_connect(client, userdata, flags, rc): - print("Connected with result code "+str(rc)) - - # Subscribing in on_connect() means that if we lose the connection and - # reconnect then subscriptions will be renewed. - client.subscribe(mqtt_topic) - -# The callback for when a PUBLISH message is received from the server. -def on_message(client, userdata, msg): - name = msg.topic.split('/')[1] - data = json.loads(msg.payload) - if name.startswith('tmp') and 'temperature' in data and 'humidity' in data: - sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)" - values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage']) - todb(sql,values) - - elif name == 'HAN' and 'current' in data: - sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality) VALUES(%s,%s,%s,%s,%s)" - values = (name, data['current'], data['power'], data['voltage'], data['linkquality']) - todb(sql,values) - - -# Write values to database -def todb(sql, values): - print(values) - conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass) - cur = conn.cursor() - try: - cur.execute(sql, values) - conn.commit() - except (Exception, psycopg2.Error) as error: - conn.rollback() - print("Error while connecting to PostgreSQL", error) - raise error - finally: - cur.close() - conn.close() - - - -# mqtt -client = mqtt.Client() -client.on_connect = on_connect -client.on_message = on_message - -client.connect(mqtt_server, mqtt_port, keepalive) - -# Blocking call that processes network traffic, dispatches callbacks and -# handles reconnecting. -# Other loop*() functions are available that give a threaded interface and a -# manual interface. -client.loop_forever() diff --git a/mqtt2pgsql.service b/mqtt2pgsql.service deleted file mode 100644 index d400b10..0000000 --- a/mqtt2pgsql.service +++ /dev/null @@ -1,19 +0,0 @@ -[Unit] -Description=Simple service to start python-listener -StartLimitIntervalSec=100 -StartLimitBurst=5 - -[Service] -Type=simple -EnvironmentFile=%h/energyscripts/.env -WorkingDirectory=%h/energyscripts -ExecStart=%h/energyscripts/venv/bin/python3 -u mqtt2pgsql.py -Restart=on-failure -TimeoutStopSec=70 -RestartSec=30 -SyslogIdentifier=mqtt2pgsql -#StandardOutput=journal -#StandardError=journal - -[Install] -WantedBy=default.target diff --git a/mqtt2queue.py b/mqtt2queue.py new file mode 100644 index 0000000..102b1eb --- /dev/null +++ b/mqtt2queue.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 + +import os +import json +import pickle +from datetime import datetime +from litequeue import SQLQueue +import paho.mqtt.client as mqtt + +mqtt_server = os.environ['el_mqtt_server'] +mqtt_port = int(os.environ['el_mqtt_port']) +keepalive = int(os.environ['el_mqtt_keepalive']) +mqtt_topic = os.environ['el_mqtt_topic'] + +q = SQLQueue("litequeue.db", maxsize=None) + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe(mqtt_topic) + +# The callback for when a PUBLISH message is received from the server. +def on_message(client, userdata, msg): + name = msg.topic.split('/')[1] + data = json.loads(msg.payload) + + if name.startswith('tmp') and 'temperature' in data and 'humidity' in data: + sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)" + values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow()) + q.put(pickle.dumps([sql, values])) + + elif name == 'HAN' and 'current' in data: + sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)" + values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow()) + q.put(pickle.dumps([sql, values])) + + + +# mqtt +client = mqtt.Client() +client.on_connect = on_connect +client.on_message = on_message + +client.connect(mqtt_server, mqtt_port, keepalive) + +# Blocking call that processes network traffic, dispatches callbacks and +# handles reconnecting. +# Other loop*() functions are available that give a threaded interface and a +# manual interface. +client.loop_forever() diff --git a/mqtt2queue.service b/mqtt2queue.service new file mode 100644 index 0000000..a63c994 --- /dev/null +++ b/mqtt2queue.service @@ -0,0 +1,17 @@ +[Unit] +Description=Simple service to start python-listener +StartLimitIntervalSec=100 +StartLimitBurst=5 + +[Service] +Type=simple +EnvironmentFile=%h/energyscripts/.env +WorkingDirectory=%h/energyscripts +ExecStart=%h/energyscripts/venv/bin/python3 -u mqtt2queue.py +Restart=on-failure +TimeoutStopSec=70 +RestartSec=30 +SyslogIdentifier=mqtt2queue + +[Install] +WantedBy=default.target diff --git a/queue2pgsql.py b/queue2pgsql.py new file mode 100644 index 0000000..0c95f6d --- /dev/null +++ b/queue2pgsql.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 + +import os +import time +import pickle +import sqlite3 +from litequeue import SQLQueue +from datetime import datetime + +from todb import todb + +# Unlock all +con = sqlite3.connect("litequeue.db") +cur = con.cursor() +cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") +con.commit() +con.close() + +# Open Queue +q = SQLQueue("litequeue.db", maxsize=None) +q.prune() + +# Event loop +while True: + if q.empty() == True: + print("Queue empty. Sleeping 10s") + time.sleep(10) + continue + task = q.pop() + raw = pickle.loads(task['message']) + todb(raw[0],raw[1]) + print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left") + q.done(task['message_id']) + +print("END") diff --git a/queue2pgsql.service b/queue2pgsql.service new file mode 100644 index 0000000..49e4066 --- /dev/null +++ b/queue2pgsql.service @@ -0,0 +1,17 @@ +[Unit] +Description=Simple service to start python-listener +StartLimitIntervalSec=100 +StartLimitBurst=5 + +[Service] +Type=simple +EnvironmentFile=%h/energyscripts/.env +WorkingDirectory=%h/energyscripts +ExecStart=%h/energyscripts/venv/bin/python3 -u queue2pgsql.py +Restart=on-failure +TimeoutStopSec=70 +RestartSec=30 +SyslogIdentifier=queue2pgsql + +[Install] +WantedBy=default.target diff --git a/requirements.txt b/requirements.txt index aa7e4a9..0dd3ee4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ async-property==0.2.1 +async-timeout==4.0.2 certifi==2022.12.7 charset-normalizer==3.0.1 idna==3.4 +litequeue==0.5 neohubapi==1.0 paho-mqtt==1.6.1 -psycopg2-binary==2.9.5 +psycopg==3.1.8 requests==2.28.2 urllib3==1.26.14 websockets==10.4 diff --git a/todb.py b/todb.py new file mode 100644 index 0000000..d911a29 --- /dev/null +++ b/todb.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import os +import psycopg + +pg_db = os.environ['el_pg_db'] +pg_host = os.environ['el_pg_host'] +pg_user = os.environ['el_pg_user'] +pg_pass = os.environ['el_pg_pass'] + +def todb(sql, values): + with psycopg.connect(dbname=pg_db, host=pg_host, user=pg_user, password=pg_pass) as conn: + if type(values) == list: + conn.executemany(sql, values) + elif type(values) == tuple: + conn.execute(sql, values) + else: + print("OH SHIT") + exit() -- cgit v1.2.3