From b5319ac549f0f2a43e143f8a447f63a10d3f0ec5 Mon Sep 17 00:00:00 2001 From: dennis Date: Sat, 4 Feb 2023 07:28:00 +0100 Subject: queue sleep as variable --- scripts/mqtt_watch.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ scripts/queue_runner.py | 3 ++- 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 scripts/mqtt_watch.py diff --git a/scripts/mqtt_watch.py b/scripts/mqtt_watch.py new file mode 100644 index 0000000..e460f13 --- /dev/null +++ b/scripts/mqtt_watch.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +''' Listen for mqtt-events, and trigger for some ''' + +import os +import json +from datetime import datetime +import paho.mqtt.client as mqtt + +from common import dbi + +mqtt_server = os.environ['el_mqtt_server'] +mqtt_port = int(os.environ['el_mqtt_port']) +keepalive = int(os.environ['el_mqtt_keepalive']) +mqtt_topic = os.environ['el_mqtt_topic'] + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe('#') + +# The callback for when a PUBLISH message is received from the server. +def on_message(client, userdata, msg): + name = msg.topic.split('/')[1] + #data = json.loads(msg.payload) + + print(msg.topic, msg.payload) + + #if name.startswith('tmp') and 'temperature' in data and 'humidity' in data: + # sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)" + # values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow()) + + #elif name == 'HAN' and 'current' in data: + # sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)" + # values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow()) + + #else: + # return + + #dbi(sql, values, verbose=True) + + + +# mqtt +client = mqtt.Client() +client.on_connect = on_connect +client.on_message = on_message + +client.connect(mqtt_server, mqtt_port, keepalive) + +# Blocking call that processes network traffic, dispatches callbacks and +# handles reconnecting. +# Other loop*() functions are available that give a threaded interface and a +# manual interface. +client.loop_forever() diff --git a/scripts/queue_runner.py b/scripts/queue_runner.py index fd4812e..62467b5 100644 --- a/scripts/queue_runner.py +++ b/scripts/queue_runner.py @@ -10,6 +10,7 @@ from litequeue import SQLQueue from common.postgres import dbi QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db') +QUEUE_SLEEP = int(os.environ.get('el_QUEUE_sleep', 15)) # Default sleep 15 seconds when queue empty # Unlock all con = sqlite3.connect(QUEUE_DB) @@ -31,7 +32,7 @@ i = 0 while True: # Just sleep if queue is empty if q.empty() is True: - time.sleep(10) + time.sleep(QUEUE_SLEEP) continue # get message -- cgit v1.2.3