diff options
author | Dennis Eriksen <d@ennis.no> | 2023-01-31 07:49:27 +0100 |
---|---|---|
committer | Dennis Eriksen <d@ennis.no> | 2023-01-31 07:49:27 +0100 |
commit | 4b9958dd25bf67ec2a9fb6bbb9f1e2c7e9ee9321 (patch) | |
tree | 507d6854eb0ad5495aef90be89b53df4bf253991 /mqtt2pgsql.py | |
parent | adding handler for tempsensors on mqtt (diff) | |
download | energyscripts-4b9958dd25bf67ec2a9fb6bbb9f1e2c7e9ee9321.tar.gz |
handle HAN as well
Diffstat (limited to 'mqtt2pgsql.py')
-rw-r--r-- | mqtt2pgsql.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/mqtt2pgsql.py b/mqtt2pgsql.py new file mode 100644 index 0000000..004216e --- /dev/null +++ b/mqtt2pgsql.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +import os +import sys +import json +import psycopg2 +import paho.mqtt.client as mqtt + +mqtt_server = os.environ['el_mqtt_server'] +mqtt_port = int(os.environ['el_mqtt_port']) +keepalive = int(os.environ['el_mqtt_keepalive']) +mqtt_topic = os.environ['el_mqtt_topic'] + +pg_db = os.environ['el_pg_db'] +pg_host = os.environ['el_pg_host'] +pg_user = os.environ['el_pg_user'] +pg_pass = os.environ['el_pg_pass'] +pg_table = "mqtt_temps" + + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe(mqtt_topic) + +# The callback for when a PUBLISH message is received from the server. +def on_message(client, userdata, msg): + name = msg.topic.split('/')[1] + data = json.loads(msg.payload) + if name.startswith('tmp') and 'temperature' in data and 'humidity' in data: + sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)" + values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage']) + todb(sql,values) + + elif name == 'HAN' and 'current' in data: + sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality) VALUES(%s,%s,%s,%s,%s)" + values = (name, data['current'], data['power'], data['voltage'], data['linkquality']) + todb(sql,values) + + +# Write values to database +def todb(sql, values): + print(values) + conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass) + cur = conn.cursor() + try: + cur.execute(sql, values) + conn.commit() + except (Exception, psycopg2.Error) as error: + conn.rollback() + print("Error while connecting to PostgreSQL", error) + raise error + finally: + cur.close() + conn.close() + + + +# mqtt +client = mqtt.Client() +client.on_connect = on_connect +client.on_message = on_message + +client.connect(mqtt_server, mqtt_port, keepalive) + +# Blocking call that processes network traffic, dispatches callbacks and +# handles reconnecting. +# Other loop*() functions are available that give a threaded interface and a +# manual interface. +client.loop_forever() |