#!/usr/bin/env python3 import os import sys import json import psycopg2 import paho.mqtt.client as mqtt mqtt_server = os.environ['el_mqtt_server'] mqtt_port = int(os.environ['el_mqtt_port']) keepalive = int(os.environ['el_mqtt_keepalive']) mqtt_topic = os.environ['el_mqtt_topic'] pg_db = os.environ['el_pg_db'] pg_host = os.environ['el_pg_host'] pg_user = os.environ['el_pg_user'] pg_pass = os.environ['el_pg_pass'] pg_table = "mqtt_temps" # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe(mqtt_topic) # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): if msg.topic.startswith('zigbee2mqtt/tmp'): #print("TMP!! - " + msg.topic.split('/')[1]) data = json.loads(msg.payload) if 'temperature' in data and 'humidity' in data: values = (msg.topic.split('/')[1], data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage']) print(values) todb(values) # Write values to database def todb(values): conn = psycopg2.connect(database=pg_db, host=pg_host, user=pg_user, password=pg_pass) cur = conn.cursor() try: cur.execute("INSERT INTO " + pg_table + " (name, temperature, humidity, battery, linkquality, voltage) VALUES(%s,%s,%s,%s,%s,%s)", values) conn.commit() except (Exception, psycopg2.Error) as error: conn.rollback() print("Error while connecting to PostgreSQL", error) raise error finally: cur.close() conn.close() # mqtt client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(mqtt_server, mqtt_port, keepalive) # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. client.loop_forever()